diff --git a/exporter/internal/queue/batcher.go b/exporter/internal/queue/batcher.go index df45a7b97c9..c3ee20c40b9 100644 --- a/exporter/internal/queue/batcher.go +++ b/exporter/internal/queue/batcher.go @@ -13,9 +13,9 @@ import ( "go.opentelemetry.io/collector/exporter/internal" ) -type batch struct { +type batch[K any] struct { ctx context.Context - req internal.Request + req K idxList []uint64 } @@ -24,13 +24,14 @@ type Batcher interface { component.Component } -type BaseBatcher struct { +type BaseBatcher[K any] struct { batchCfg exporterbatcher.Config - queue exporterqueue.Queue[internal.Request] + queue exporterqueue.Queue[K] // TODO: Remove when the -1 hack for testing is removed. + merger Merger[K] maxWorkers int workerPool chan bool - exportFunc func(ctx context.Context, req internal.Request) error + exportFunc func(ctx context.Context, req K) error stopWG sync.WaitGroup } @@ -40,16 +41,16 @@ func NewBatcher(batchCfg exporterbatcher.Config, maxWorkers int, ) (Batcher, error) { if !batchCfg.Enabled { - return &DisabledBatcher{BaseBatcher: newBaseBatcher(batchCfg, queue, exportFunc, maxWorkers)}, nil + return &DisabledBatcher[internal.Request]{BaseBatcher: newBaseBatcher(batchCfg, queue, exportFunc, maxWorkers)}, nil } - return &DefaultBatcher{BaseBatcher: newBaseBatcher(batchCfg, queue, exportFunc, maxWorkers)}, nil + return &DefaultBatcher[internal.Request]{BaseBatcher: newBaseBatcher(batchCfg, queue, exportFunc, maxWorkers)}, nil } func newBaseBatcher(batchCfg exporterbatcher.Config, queue exporterqueue.Queue[internal.Request], exportFunc func(ctx context.Context, req internal.Request) error, maxWorkers int, -) BaseBatcher { +) BaseBatcher[internal.Request] { var workerPool chan bool if maxWorkers > 0 { workerPool = make(chan bool, maxWorkers) @@ -57,9 +58,10 @@ func newBaseBatcher(batchCfg exporterbatcher.Config, workerPool <- true } } - return BaseBatcher{ + return BaseBatcher[internal.Request]{ batchCfg: batchCfg, queue: queue, + merger: requestMerger{}, maxWorkers: maxWorkers, workerPool: workerPool, exportFunc: exportFunc, @@ -68,19 +70,19 @@ func newBaseBatcher(batchCfg exporterbatcher.Config, } // flush starts a goroutine that calls exportFunc. It blocks until a worker is available if necessary. -func (qb *BaseBatcher) flush(batchToFlush batch) { - qb.stopWG.Add(1) - if qb.workerPool != nil { - <-qb.workerPool +func (qb *BaseBatcher[K]) flush(batchToFlush batch[K]) { + err := qb.exportFunc(batchToFlush.ctx, batchToFlush.req) + for _, idx := range batchToFlush.idxList { + qb.queue.OnProcessingFinished(idx, err) } - go func() { - defer qb.stopWG.Done() - err := qb.exportFunc(batchToFlush.ctx, batchToFlush.req) - for _, idx := range batchToFlush.idxList { - qb.queue.OnProcessingFinished(idx, err) - } - if qb.workerPool != nil { - qb.workerPool <- true - } - }() +} + +type Merger[K any] interface { + MergeSplit(src, dst K, cfg exporterbatcher.MaxSizeConfig) ([]K, error) +} + +type requestMerger struct{} + +func (requestMerger) MergeSplit(src, dst internal.Request, cfg exporterbatcher.MaxSizeConfig) ([]internal.Request, error) { + return src.MergeSplit(context.Background(), cfg, dst) } diff --git a/exporter/internal/queue/default_batcher.go b/exporter/internal/queue/default_batcher.go index dced12e9e2b..123e7b962b3 100644 --- a/exporter/internal/queue/default_batcher.go +++ b/exporter/internal/queue/default_batcher.go @@ -14,22 +14,22 @@ import ( ) // DefaultBatcher continuously reads from the queue and flushes asynchronously if size limit is met or on timeout. -type DefaultBatcher struct { - BaseBatcher +type DefaultBatcher[K internal.Request] struct { + BaseBatcher[K] currentBatchMu sync.Mutex - currentBatch *batch + currentBatch *batch[K] timer *time.Timer shutdownCh chan bool } -func (qb *DefaultBatcher) resetTimer() { +func (qb *DefaultBatcher[K]) resetTimer() { if qb.batchCfg.FlushTimeout != 0 { qb.timer.Reset(qb.batchCfg.FlushTimeout) } } // startReadingFlushingGoroutine starts a goroutine that reads and then flushes. -func (qb *DefaultBatcher) startReadingFlushingGoroutine() { +func (qb *DefaultBatcher[K]) startReadingFlushingGoroutine() { qb.stopWG.Add(1) go func() { defer qb.stopWG.Done() @@ -44,13 +44,13 @@ func (qb *DefaultBatcher) startReadingFlushingGoroutine() { qb.currentBatchMu.Lock() if qb.batchCfg.MaxSizeItems > 0 { - var reqList []internal.Request + var reqList []K var mergeSplitErr error - if qb.currentBatch == nil || qb.currentBatch.req == nil { + if qb.currentBatch == nil { qb.resetTimer() - reqList, mergeSplitErr = req.MergeSplit(ctx, qb.batchCfg.MaxSizeConfig, nil) + reqList, mergeSplitErr = qb.merger.MergeSplit(req, nil, qb.batchCfg.MaxSizeConfig) } else { - reqList, mergeSplitErr = qb.currentBatch.req.MergeSplit(ctx, qb.batchCfg.MaxSizeConfig, req) + reqList, mergeSplitErr = qb.merger.MergeSplit(qb.currentBatch.req, req, qb.batchCfg.MaxSizeConfig) } if mergeSplitErr != nil || reqList == nil { @@ -64,7 +64,7 @@ func (qb *DefaultBatcher) startReadingFlushingGoroutine() { qb.currentBatch = nil qb.currentBatchMu.Unlock() for i := 0; i < len(reqList); i++ { - qb.flush(batch{ + qb.flush(batch[K]{ req: reqList[i], ctx: ctx, idxList: []uint64{idx}, @@ -73,7 +73,7 @@ func (qb *DefaultBatcher) startReadingFlushingGoroutine() { } qb.resetTimer() } else { - qb.currentBatch = &batch{ + qb.currentBatch = &batch[K]{ req: reqList[0], ctx: ctx, idxList: []uint64{idx}, @@ -81,22 +81,22 @@ func (qb *DefaultBatcher) startReadingFlushingGoroutine() { qb.currentBatchMu.Unlock() } } else { - if qb.currentBatch == nil || qb.currentBatch.req == nil { + if qb.currentBatch == nil { qb.resetTimer() - qb.currentBatch = &batch{ + qb.currentBatch = &batch[K]{ req: req, ctx: ctx, idxList: []uint64{idx}, } } else { // TODO: consolidate implementation for the cases where MaxSizeConfig is specified and the case where it is not specified - mergedReq, mergeErr := qb.currentBatch.req.MergeSplit(qb.currentBatch.ctx, qb.batchCfg.MaxSizeConfig, req) + mergedReq, mergeErr := qb.merger.MergeSplit(qb.currentBatch.req, req, qb.batchCfg.MaxSizeConfig) if mergeErr != nil { qb.queue.OnProcessingFinished(idx, mergeErr) qb.currentBatchMu.Unlock() continue } - qb.currentBatch = &batch{ + qb.currentBatch = &batch[K]{ req: mergedReq[0], ctx: qb.currentBatch.ctx, idxList: append(qb.currentBatch.idxList, idx), @@ -120,7 +120,7 @@ func (qb *DefaultBatcher) startReadingFlushingGoroutine() { } // startTimeBasedFlushingGoroutine starts a goroutine that flushes on timeout. -func (qb *DefaultBatcher) startTimeBasedFlushingGoroutine() { +func (qb *DefaultBatcher[K]) startTimeBasedFlushingGoroutine() { qb.stopWG.Add(1) go func() { defer qb.stopWG.Done() @@ -136,7 +136,7 @@ func (qb *DefaultBatcher) startTimeBasedFlushingGoroutine() { } // Start starts the goroutine that reads from the queue and flushes asynchronously. -func (qb *DefaultBatcher) Start(_ context.Context, _ component.Host) error { +func (qb *DefaultBatcher[K]) Start(_ context.Context, _ component.Host) error { // maxWorker being -1 means batcher is disabled. This is for testing queue sender metrics. if qb.maxWorkers == -1 { return nil @@ -157,9 +157,9 @@ func (qb *DefaultBatcher) Start(_ context.Context, _ component.Host) error { } // flushCurrentBatchIfNecessary sends out the current request batch if it is not nil -func (qb *DefaultBatcher) flushCurrentBatchIfNecessary() { +func (qb *DefaultBatcher[K]) flushCurrentBatchIfNecessary() { qb.currentBatchMu.Lock() - if qb.currentBatch == nil || qb.currentBatch.req == nil { + if qb.currentBatch == nil { qb.currentBatchMu.Unlock() return } @@ -173,7 +173,7 @@ func (qb *DefaultBatcher) flushCurrentBatchIfNecessary() { } // Shutdown ensures that queue and all Batcher are stopped. -func (qb *DefaultBatcher) Shutdown(_ context.Context) error { +func (qb *DefaultBatcher[K]) Shutdown(_ context.Context) error { qb.flushCurrentBatchIfNecessary() qb.stopWG.Wait() return nil diff --git a/exporter/internal/queue/disabled_batcher.go b/exporter/internal/queue/disabled_batcher.go index be89f58e011..63deb934bb0 100644 --- a/exporter/internal/queue/disabled_batcher.go +++ b/exporter/internal/queue/disabled_batcher.go @@ -11,12 +11,12 @@ import ( // DisabledBatcher is a special-case of Batcher that has no size limit for sending. Any items read from the queue will // be sent out (asynchronously) immediately regardless of the size. -type DisabledBatcher struct { - BaseBatcher +type DisabledBatcher[K any] struct { + BaseBatcher[K] } // Start starts the goroutine that reads from the queue and flushes asynchronously. -func (qb *DisabledBatcher) Start(_ context.Context, _ component.Host) error { +func (qb *DisabledBatcher[K]) Start(_ context.Context, _ component.Host) error { // maxWorker being -1 means batcher is disabled. This is for testing queue sender metrics. if qb.maxWorkers == -1 { return nil @@ -33,7 +33,7 @@ func (qb *DisabledBatcher) Start(_ context.Context, _ component.Host) error { if !ok { return } - qb.flush(batch{ + qb.flush(batch[K]{ req: req, ctx: context.Background(), idxList: []uint64{idx}, @@ -44,7 +44,7 @@ func (qb *DisabledBatcher) Start(_ context.Context, _ component.Host) error { } // Shutdown ensures that queue and all Batcher are stopped. -func (qb *DisabledBatcher) Shutdown(_ context.Context) error { +func (qb *DisabledBatcher[K]) Shutdown(_ context.Context) error { qb.stopWG.Wait() return nil }