diff --git a/exporter/internal/queue/batcher.go b/exporter/internal/queue/batcher.go index 2250b27d1b1e..d6cb2e233c2b 100644 --- a/exporter/internal/queue/batcher.go +++ b/exporter/internal/queue/batcher.go @@ -12,9 +12,9 @@ import ( "go.opentelemetry.io/collector/exporter/internal" ) -type batch struct { +type batch[K any] struct { ctx context.Context - req internal.Request + req K idxList []uint64 } @@ -23,12 +23,13 @@ type Batcher interface { component.Component } -type BaseBatcher struct { +type BaseBatcher[K any] struct { batchCfg exporterbatcher.Config - queue Queue[internal.Request] + queue Queue[K] + merger Merger[K] maxWorkers int workerPool chan bool - exportFunc func(ctx context.Context, req internal.Request) error + exportFunc func(ctx context.Context, req K) error stopWG sync.WaitGroup } @@ -38,10 +39,11 @@ func NewBatcher(batchCfg exporterbatcher.Config, maxWorkers int, ) (Batcher, error) { if !batchCfg.Enabled { - return &DisabledBatcher{ - BaseBatcher{ + return &DisabledBatcher[internal.Request]{ + BaseBatcher[internal.Request]{ batchCfg: batchCfg, queue: queue, + merger: requestMerger{}, maxWorkers: maxWorkers, exportFunc: exportFunc, stopWG: sync.WaitGroup{}, @@ -49,8 +51,8 @@ func NewBatcher(batchCfg exporterbatcher.Config, }, nil } - return &DefaultBatcher{ - BaseBatcher: BaseBatcher{ + return &DefaultBatcher[internal.Request]{ + BaseBatcher: BaseBatcher[internal.Request]{ batchCfg: batchCfg, queue: queue, maxWorkers: maxWorkers, @@ -60,7 +62,7 @@ func NewBatcher(batchCfg exporterbatcher.Config, }, nil } -func (qb *BaseBatcher) startWorkerPool() { +func (qb *BaseBatcher[K]) startWorkerPool() { if qb.maxWorkers == 0 { return } @@ -71,7 +73,7 @@ func (qb *BaseBatcher) startWorkerPool() { } // flush exports the incoming batch synchronously. -func (qb *BaseBatcher) flush(batchToFlush batch) { +func (qb *BaseBatcher[K]) flush(batchToFlush batch[K]) { err := qb.exportFunc(batchToFlush.ctx, batchToFlush.req) for _, idx := range batchToFlush.idxList { qb.queue.OnProcessingFinished(idx, err) @@ -79,7 +81,7 @@ func (qb *BaseBatcher) flush(batchToFlush batch) { } // flushAsync starts a goroutine that calls flushIfNecessary. It blocks until a worker is available. -func (qb *BaseBatcher) flushAsync(batchToFlush batch) { +func (qb *BaseBatcher[K]) flushAsync(batchToFlush batch[K]) { qb.stopWG.Add(1) if qb.maxWorkers == 0 { go func() { @@ -95,3 +97,13 @@ func (qb *BaseBatcher) flushAsync(batchToFlush batch) { qb.workerPool <- true }() } + +type Merger[K any] interface { + MergeSplit(src, dst K, cfg exporterbatcher.MaxSizeConfig) ([]K, error) +} + +type requestMerger struct{} + +func (requestMerger) MergeSplit(src, dst internal.Request, cfg exporterbatcher.MaxSizeConfig) ([]internal.Request, error) { + return src.MergeSplit(context.Background(), cfg, dst) +} diff --git a/exporter/internal/queue/default_batcher.go b/exporter/internal/queue/default_batcher.go index 7e4e9273ed93..76e0c50e372b 100644 --- a/exporter/internal/queue/default_batcher.go +++ b/exporter/internal/queue/default_batcher.go @@ -14,22 +14,22 @@ import ( ) // DefaultBatcher continuously reads from the queue and flushes asynchronously if size limit is met or on timeout. -type DefaultBatcher struct { - BaseBatcher +type DefaultBatcher[K internal.Request] struct { + BaseBatcher[K] currentBatchMu sync.Mutex - currentBatch *batch + currentBatch *batch[K] timer *time.Timer shutdownCh chan bool } -func (qb *DefaultBatcher) resetTimer() { +func (qb *DefaultBatcher[K]) resetTimer() { if qb.batchCfg.FlushTimeout != 0 { qb.timer.Reset(qb.batchCfg.FlushTimeout) } } // startReadingFlushingGoroutine starts a goroutine that reads and then flushes. -func (qb *DefaultBatcher) startReadingFlushingGoroutine() { +func (qb *DefaultBatcher[K]) startReadingFlushingGoroutine() { qb.stopWG.Add(1) go func() { defer qb.stopWG.Done() @@ -44,13 +44,13 @@ func (qb *DefaultBatcher) startReadingFlushingGoroutine() { qb.currentBatchMu.Lock() if qb.batchCfg.MaxSizeItems > 0 { - var reqList []internal.Request + var reqList []K var mergeSplitErr error - if qb.currentBatch == nil || qb.currentBatch.req == nil { + if qb.currentBatch == nil { qb.resetTimer() - reqList, mergeSplitErr = req.MergeSplit(ctx, qb.batchCfg.MaxSizeConfig, nil) + reqList, mergeSplitErr = qb.merger.MergeSplit(req, nil, qb.batchCfg.MaxSizeConfig) } else { - reqList, mergeSplitErr = qb.currentBatch.req.MergeSplit(ctx, qb.batchCfg.MaxSizeConfig, req) + reqList, mergeSplitErr = qb.merger.MergeSplit(qb.currentBatch.req, req, qb.batchCfg.MaxSizeConfig) } if mergeSplitErr != nil || reqList == nil { @@ -64,7 +64,7 @@ func (qb *DefaultBatcher) startReadingFlushingGoroutine() { qb.currentBatch = nil qb.currentBatchMu.Unlock() for i := 0; i < len(reqList); i++ { - qb.flushAsync(batch{ + qb.flushAsync(batch[K]{ req: reqList[i], ctx: ctx, idxList: []uint64{idx}, @@ -73,7 +73,7 @@ func (qb *DefaultBatcher) startReadingFlushingGoroutine() { } qb.resetTimer() } else { - qb.currentBatch = &batch{ + qb.currentBatch = &batch[K]{ req: reqList[0], ctx: ctx, idxList: []uint64{idx}, @@ -81,22 +81,22 @@ func (qb *DefaultBatcher) startReadingFlushingGoroutine() { qb.currentBatchMu.Unlock() } } else { - if qb.currentBatch == nil || qb.currentBatch.req == nil { + if qb.currentBatch == nil { qb.resetTimer() - qb.currentBatch = &batch{ + qb.currentBatch = &batch[K]{ req: req, ctx: ctx, idxList: []uint64{idx}, } } else { // TODO: consolidate implementation for the cases where MaxSizeConfig is specified and the case where it is not specified - mergedReq, mergeErr := qb.currentBatch.req.MergeSplit(qb.currentBatch.ctx, qb.batchCfg.MaxSizeConfig, req) + mergedReq, mergeErr := qb.merger.MergeSplit(qb.currentBatch.req, req, qb.batchCfg.MaxSizeConfig) if mergeErr != nil { qb.queue.OnProcessingFinished(idx, mergeErr) qb.currentBatchMu.Unlock() continue } - qb.currentBatch = &batch{ + qb.currentBatch = &batch[K]{ req: mergedReq[0], ctx: qb.currentBatch.ctx, idxList: append(qb.currentBatch.idxList, idx), @@ -120,7 +120,7 @@ func (qb *DefaultBatcher) startReadingFlushingGoroutine() { } // startTimeBasedFlushingGoroutine starts a goroutine that flushes on timeout. -func (qb *DefaultBatcher) startTimeBasedFlushingGoroutine() { +func (qb *DefaultBatcher[K]) startTimeBasedFlushingGoroutine() { qb.stopWG.Add(1) go func() { defer qb.stopWG.Done() @@ -136,7 +136,7 @@ func (qb *DefaultBatcher) startTimeBasedFlushingGoroutine() { } // Start starts the goroutine that reads from the queue and flushes asynchronously. -func (qb *DefaultBatcher) Start(_ context.Context, _ component.Host) error { +func (qb *DefaultBatcher[K]) Start(_ context.Context, _ component.Host) error { // maxWorker being -1 means batcher is disabled. This is for testing queue sender metrics. if qb.maxWorkers == -1 { return nil @@ -158,9 +158,9 @@ func (qb *DefaultBatcher) Start(_ context.Context, _ component.Host) error { } // flushCurrentBatchIfNecessary sends out the current request batch if it is not nil -func (qb *DefaultBatcher) flushCurrentBatchIfNecessary() { +func (qb *DefaultBatcher[K]) flushCurrentBatchIfNecessary() { qb.currentBatchMu.Lock() - if qb.currentBatch == nil || qb.currentBatch.req == nil { + if qb.currentBatch == nil { qb.currentBatchMu.Unlock() return } @@ -174,7 +174,7 @@ func (qb *DefaultBatcher) flushCurrentBatchIfNecessary() { } // Shutdown ensures that queue and all Batcher are stopped. -func (qb *DefaultBatcher) Shutdown(_ context.Context) error { +func (qb *DefaultBatcher[K]) Shutdown(_ context.Context) error { qb.flushCurrentBatchIfNecessary() qb.stopWG.Wait() return nil diff --git a/exporter/internal/queue/disabled_batcher.go b/exporter/internal/queue/disabled_batcher.go index 250b38e7640d..13b07a826666 100644 --- a/exporter/internal/queue/disabled_batcher.go +++ b/exporter/internal/queue/disabled_batcher.go @@ -11,12 +11,12 @@ import ( // DisabledBatcher is a special-case of Batcher that has no size limit for sending. Any items read from the queue will // be sent out (asynchronously) immediately regardless of the size. -type DisabledBatcher struct { - BaseBatcher +type DisabledBatcher[K any] struct { + BaseBatcher[K] } // Start starts the goroutine that reads from the queue and flushes asynchronously. -func (qb *DisabledBatcher) Start(_ context.Context, _ component.Host) error { +func (qb *DisabledBatcher[K]) Start(_ context.Context, _ component.Host) error { // maxWorker being -1 means batcher is disabled. This is for testing queue sender metrics. if qb.maxWorkers == -1 { return nil @@ -35,7 +35,7 @@ func (qb *DisabledBatcher) Start(_ context.Context, _ component.Host) error { if !ok { return } - qb.flushAsync(batch{ + qb.flushAsync(batch[K]{ req: req, ctx: context.Background(), idxList: []uint64{idx}, @@ -46,7 +46,7 @@ func (qb *DisabledBatcher) Start(_ context.Context, _ component.Host) error { } // Shutdown ensures that queue and all Batcher are stopped. -func (qb *DisabledBatcher) Shutdown(_ context.Context) error { +func (qb *DisabledBatcher[K]) Shutdown(_ context.Context) error { qb.stopWG.Wait() return nil }