Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[processor/lsminterval] Optimize memory usage for each aggregation interval #211

Open
lahsivjar opened this issue Oct 24, 2024 · 15 comments
Assignees

Comments

@lahsivjar
Copy link
Contributor

lahsivjar commented Oct 24, 2024

As per the current logic, the processor uses aggregation interval and processing time as the key in the LSM database. On harvest, each key is decoded and loaded into memory. Due to this, all of a specific aggregation interval will be loaded into memory -- this piece of logic is also required to do overflows during merge as we will require ALL of the data to be available at once.

Above said, we could reduce the memory usage by relaxing the overflow during merge requirement and partitioning the metrics. We do something similar in apm-aggregation (ref). The side-effect of adopting this approach will be that if the partition count is greater than 1 (or partition is being used) then the overflow will be indeterministic and there will be more than 1 aggregated metric with same dimensions. The duplicate metrics could be avoided by offsetting the timestamps on the aggregated metrics based on the partition count.

@lahsivjar
Copy link
Contributor Author

lahsivjar commented Dec 27, 2024

Compared to apm-aggregation, the performance of the merge operations using the opentelemetry data model is too expensive. As per some of the benchmarking, the biggest culprit is merge operations performed for exponential histograms (we use a copy of the contrib implementation). Here are some benchmarking results:

Image Image

Playing around with GOGC and GOMEMLIMIT does produce some positive results but as a whole, performance is still marred by the huge number of allocations required for the merge operation as a whole. Note that merge is the biggest culprit but not the only one - running some experiments to dig deeper.

@lahsivjar
Copy link
Contributor Author

lahsivjar commented Dec 27, 2024

Diving into the source of high allocations, I ran an experiment to replace exponential histogram with histogram. While the number of buckets for the histogram was kept to the default value (which is quite less than exponential histogram default) the observed GC was much lower and the throughput was stable and in a good range:

Image

(Above was ran with go mem limit set to 4GB and normal gc disabled i.e. GOGC=off GOMEMLIMIT=4GiB to keep GC factors to a minimum for now, however, I don't think the results would be much different even with default GC configurations)

While in another experiment, reducing the max buckets for the exponential histogram from the default of 160 to 20 didn't result in much improvement. Looking at the code for merging histograms (which is taken from the upstream deltatocumulative processor), the why is quite evident - the merge creates new slices for every merge operation on an exponential histogram datapoint (ref). While there are some low hanging fruits to reduce allocations in the lsminterval processor as a whole but since exponential histogram's footprint is much larger than anything else, I don't expect them to have any substantial impact.

Looking at ways to circumvent this, there are a couple I can think of:

  1. Use normal histograms as default - we could use this with the same buckets as what we had in HDR histograms implementation in APM aggregation, however, this needs to be benchmarked too since HDR histograms has a high number of buckets.
  2. Convert delta exponential histogram to cumulative before merging. Since memory requirement for delta to cumulative will not be amplified due to the merge operations performed by LMS database we should see net positive in terms of allocations. Benchmarking required.
  3. Optimize the merge for exponential histograms. This is a bit tricky to do because there are not many options, like pooling, available to us due to usage of the pdata model. We could look into having our own data model for merging and converting between pdata and the the custom model as/when required.
  4. Reduce the number of merge operations. For every merge operation, each exponential histogram datapoint will create allocation of a slice. If we can reduce the number of merge operations (by doing pre-merges or batching in the hot path) we might be able to get to a net positive).

I am working on investigating the above options next.

@felixbarny
Copy link
Member

the merge creates new slices for every merge operation on an exponential histogram datapoint

Sounds like this is where we could optimize. If the bucket range of the target histogram encompasses the range of the other histogram (in other words, if it its wide enough), there's no need to create a new slice. We could then just increment the corresponding bucket counts, based on the ones from the other histogram.

Maybe we could also look into pre-allocating the target histogram with the max bucket size and de-serialize the histograms directly into the pre-allocated target, without needing to allocate intermediate histograms. That would probably require changes to pdata but I think we shouldn't shy away from that.

@lahsivjar
Copy link
Contributor Author

Sounds like this is where we could optimize. If the bucket range of the target histogram encompasses the range of the other histogram (in other words, if it its wide enough), there's no need to create a new slice

Yeah, this is what I have been working on. One of the issue here is that the current code (which was adopted from otel-contrib) does not mutate the other histogram (which is being merged into the source histogram). For our use case however, we don't have any such constraint so we have some room here to optimize. For exponential histograms however, we can't always use one or the other slice because of negative offsets which might need to be moved when merging.

Maybe we could also look into pre-allocating the target histogram with the max bucket size and de-serialize the histograms directly into the pre-allocated target, without needing to allocate intermediate histograms. That would probably require changes to pdata but I think we shouldn't shy away from that.

Not sure if I understood this correctly but the intermediate histograms are not required during deserialization - a new slice is created during merging histograms.

@felixbarny
Copy link
Member

Not sure if I understood this correctly but the intermediate histograms are not required during deserialization - a new slice is created during merging histograms.

I was assuming that most of the time, before merging two histograms, they need to be deserialized from a pebble layer file. But maybe most of the merges actually happen for pmetric histogram instances that are on the heap (like instances that the signaltometricsconnector has already allocated)?

Given that allocating the slices for the counts seems to be the most expensive part, I was thinking of multiple additive optimization steps when merging histograms a and b:

  • Don't allocate a new slice if b "fits into" a, merge b into a and return a
  • Ensure a is "wide enough" so that b is guaranteed to fit into a
    • Assuming a is re-used as the same target in consecutive merge operations, widen a on the first merge operation and pre-allocate all 160 buckets
  • When deserializing histograms from a pebble file, combine deserialization with merging
    • Instead of deserializing and then merging short-lived histogram instances, use a single pre-allocated histogram instance as a container in the deserialization process.
    • This is potentially a lot more involved and the first two steps may yield enough speedup already. That's especially true if most of the bottleneck is due to merging histograms that didn't need to be deserialized.

@lahsivjar
Copy link
Contributor Author

I was assuming that most of the time, before merging two histograms, they need to be deserialized from a pebble layer file.

Yeah, correct!

But maybe most of the merges actually happen for pmetric histogram instances that are on the heap (like instances that the signaltometricsconnector has already allocated)?

Hmm, not quite sure how we could share allocated slices between signal to metrics connector to the LSM interval processor. Also, pooling doesn't seem to be an option with pdata models. Maybe we can pitch pool methods to be added to proto definitions for the pmetric??

Don't allocate a new slice if b "fits into" a, merge b into a and return a
Ensure a is "wide enough" so that b is guaranteed to fit into a
Assuming a is re-used as the same target in consecutive merge operations, widen a on the first merge operation and pre-allocate all 160 buckets

Some bits of this I have added in this PR, however, it is difficult to pre-allocate stuff with the restrictions on the exposed API from pdata. I have opened an issue in otel-collector to have reslice methods in the API which will help us in optimizing some bits and pieces.

When deserializing histograms from a pebble file, combine deserialization with merging
Instead of deserializing and then merging short-lived histogram instances, use a single pre-allocated histogram instance as a container in the deserialization process.
This is potentially a lot more involved and the first two steps may yield enough speedup already. That's especially true if most of the bottleneck is due to merging histograms that didn't need to be deserialized.

Good idea but this is also restricted by the pdata model I think.

With the PR I shared above, I might have found another bottleneck but I am still looking into it and it might turn out to be due to high GC as a result of so much allocations.

@felixbarny
Copy link
Member

Hmm, not quite sure how we could share allocated slices between signal to metrics connector to the LSM interval processor.

Isn't that what's happening by default? The LSM interval processor will receive the pmetrics including slices that are allocated by the signal2metrics connector and pushed to the downstream consumer (Metrics#ConsumeMetrics).

Does the LSM interval processor immediately serialize each individual metric by writing it into Pebble, or is there a period of time where actual pmetric instances are held in memory and merged? That may be important to reduce overhead of serializing and de-serializing, which adds to allocation/GC pressure.

@lahsivjar
Copy link
Contributor Author

With the PR I shared above, I might have found another bottleneck but I am still looking into it and it might turn out to be due to high GC as a result of so much allocations.

After the optimizations to exponential histograms and tweaking pebble configurations, the allocations are somewhat under control. The next major bottleneck seems to be pebble flushes which seem to lock the database. I have a few ideas to tackle this and fortunately all these should further optimize allocations too. The idea basically boils down to reducing the merge operations performed by pebble but there are a few ways to go about this. The simplest is to simply batch payloads before they are processed by lsmintervalprocessor. I will start working on this next.

@felixbarny
Copy link
Member

The simplest is to simply batch payloads before they are processed by lsmintervalprocessor. I will start working on this next.

Great idea 👍
Pairing the lsmintervalprocessor with a batch processor should also alleviate this concern, without having to add any code to the processor itself:

Does the LSM interval processor immediately serialize each individual metric by writing it into Pebble, or is there a period of time where actual pmetric instances are held in memory and merged? That may be important to reduce overhead of serializing and de-serializing, which adds to allocation/GC pressure.

The only thing we need to ensure we're doing is to merge the histograms from a batch before handing them over to pebble.

@raultorrecilla
Copy link

moving this to it 106

@raultorrecilla
Copy link

moving this to it 106 and adding due date (HOtel tech preview)

@lahsivjar
Copy link
Contributor Author

The simplest is to simply batch payloads before they are processed by lsmintervalprocessor. I will start working on this next.

The results look positive.

Image

The above is from 2 separate runs with different batch sizes (i.e. the collector was stopped where you see the gap between two clusters of data) - the results look like there are no pauses and throughput is fairly consistent. The memory consumed is also under control along with the GC.


Image

Diving into the traces with the batch processor, we see the same conclusion i.e. lsm interval is properly aggregating data without much issues (the green blocks in the traces are basically calls to batchProcessor#startLoop which wraps the Consume* methods for the lsm interval processor and other downstream processors. The batchProcessor#startLoop goroutines are getting preempted as they are long-running goroutines - which I think is a good indication.

Start Stack Trace for goroutine running `batchProcessor#startLoop`
runtime.asyncPreempt:47
--
go.opentelemetry.io/collector/pdata/pcommon.UInt64Slice.Len:49
github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor/internal/data/expo.Absolute.idx:71
github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor/internal/data/expo.Absolute.Abs:53
github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor/internal/data.ExpHistogram.Add:104
github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor/internal/merger.mergeDeltaExponentialHistogramDP:132
github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor/internal/merger.mergeDelta[...]:92
github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor/internal/merger.mergeDataPoints[...]:54
github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor/internal/merger.(*Value).mergeMetric:713
github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor/internal/merger.(*Value).MergeMetric:239
github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor.(*Processor).ConsumeMetrics.func1.1.1:275
go.opentelemetry.io/collector/pdata/pmetric.MetricSlice.RemoveIf:111
github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor.(*Processor).ConsumeMetrics.func1.1:261
go.opentelemetry.io/collector/pdata/pmetric.ScopeMetricsSlice.RemoveIf:111
github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor.(*Processor).ConsumeMetrics.func1:260
go.opentelemetry.io/collector/pdata/pmetric.ResourceMetricsSlice.RemoveIf:111
github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor.(*Processor).ConsumeMetrics:259
go.opentelemetry.io/collector/processor/batchprocessor.(*batchMetrics).export:486
go.opentelemetry.io/collector/processor/batchprocessor.(*shard[...]).sendItems:261
go.opentelemetry.io/collector/processor/batchprocessor.(*shard[...]).processItem:233
go.opentelemetry.io/collector/processor/batchprocessor.(*shard[...]).startLoop:218
End Stack Trace for goroutine running `batchProcessor#startLoop`
runtime.asyncPreempt:47
--
github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor/internal/data/expo.Absolute.Abs:52
github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor/internal/data.ExpHistogram.Add:104
github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor/internal/merger.mergeDeltaExponentialHistogramDP:132
github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor/internal/merger.mergeDelta[...]:92
github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor/internal/merger.mergeDataPoints[...]:54
github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor/internal/merger.(*Value).mergeMetric:713
github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor/internal/merger.(*Value).MergeMetric:239
github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor.(*Processor).ConsumeMetrics.func1.1.1:275
go.opentelemetry.io/collector/pdata/pmetric.MetricSlice.RemoveIf:111
github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor.(*Processor).ConsumeMetrics.func1.1:261
go.opentelemetry.io/collector/pdata/pmetric.ScopeMetricsSlice.RemoveIf:111
github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor.(*Processor).ConsumeMetrics.func1:260
go.opentelemetry.io/collector/pdata/pmetric.ResourceMetricsSlice.RemoveIf:111
github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor.(*Processor).ConsumeMetrics:259
go.opentelemetry.io/collector/processor/batchprocessor.(*batchMetrics).export:486
go.opentelemetry.io/collector/processor/batchprocessor.(*shard[...]).sendItems:261
go.opentelemetry.io/collector/processor/batchprocessor.(*shard[...]).processItem:233
go.opentelemetry.io/collector/processor/batchprocessor.(*shard[...]).startLoop:218

Looking further into more traces, I could find a trace which commits to pebble on hot path (the code path is basically if the batch reaches a threshold then commit in memory: ref).

End Stack Trace for `batchProcessor#startLoop` due to commit to pebble
sync.(*WaitGroup).Wait:118
--
github.com/cockroachdb/pebble.(*commitPipeline).publish:490
github.com/cockroachdb/pebble.(*commitPipeline).Commit:334
github.com/cockroachdb/pebble.(*DB).applyInternal:854
github.com/cockroachdb/pebble.(*DB).Apply:780
github.com/cockroachdb/pebble.(*Batch).Commit:1329
github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor.(*Processor).ConsumeMetrics:321
go.opentelemetry.io/collector/processor/batchprocessor.(*batchMetrics).export:486
go.opentelemetry.io/collector/processor/batchprocessor.(*shard[...]).sendItems:261
go.opentelemetry.io/collector/processor/batchprocessor.(*shard[...]).processItem:233
go.opentelemetry.io/collector/processor/batchprocessor.(*shard[...]).startLoop:218

This gives us another opportunity for optimization as I think we should be able to do the commit outside the lock which should avoid contention and hopefully doesn't pause the consumption as a whole by the lsm interval processor.

@felixbarny
Copy link
Member

Great progress! Do you know where this roughly puts us in comparison with the APM Server aggregations? Or is it too early to say?

@lahsivjar
Copy link
Contributor Author

Do you know where this roughly puts us in comparison with the APM Server aggregations? Or is it too early to say?

I haven't been comparing it with APM-Server so far. Doing a quick average computation based on APM-Server numbers I got early on in the process, we do get very optimistic numbers (like 70-80% of APM Server's throughput) but we can't be sure until we do a side-by-side comparison so don't quote me on this yet.

@lahsivjar
Copy link
Contributor Author

Here is the comparison of performance for OTel Collector vs APM-Server which includes the recent fixes in the lsm interval processor (including the overhead due to overflow handling introduced recently) + the addition of batch processor before lsm interval processor configured with max_batch_size: 16384 and timeout: 10s.

Image _(Blue bars in the graph represent APM Server and green ones represent OTel collector)_

Both APM-Server and OTel collector were running on my local PC (no VMs/docker) with GOMAXPROCS=4 GOMEMLIMIT=4GiB and were also configured to be completely in-memory. The graph only measures traces/sec ingested by ES even though we are ingesting other signal types. That being said, both APM-Server and OTel collector are ingesting the same data emitted from this loadgenerator.

The OTel collector performed at about 60% of APM-Server’s throughput, however, required around 3 times more memory (APM Server was running just below 900MB at peak and collector was around 3GB) - one point to note here is that OTel collector produces exponential histogram whereas APM-Server uses hdr histograms.

Also compared OTel Collector with lsm interval processor configured with disk storage and stricter GOMEMLIMIT (2.5GB). It results in similar throughput and the memory is < 2.8 GB under all cases (no OOMs) - tbh, I was expecting memory to be a lot less but it is probably due to pdata models. Here is the graph for this:

Image

There are 3 runs in the above graph with stricter GOMEMLIMITS but all results are similar. I should probably run the collector with in-memory for an hour (our biggest aggregation interval) to get a better estimate but it does seem so far that even doing in-memory aggregations should be okay for now since the bottlenecks are probably not here.


I think now we should be unblocked as we are in a good enough state. We could gradually introduce more optimizations and improvements.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants