Skip to content

Commit

Permalink
Make batcher generic which will allow to make exporterhelper generic
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Jan 6, 2025
1 parent 4387045 commit 0ecff06
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 37 deletions.
36 changes: 24 additions & 12 deletions exporter/internal/queue/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ import (
"go.opentelemetry.io/collector/exporter/internal"
)

type batch struct {
type batch[K any] struct {
ctx context.Context
req internal.Request
req K
idxList []uint64
}

Expand All @@ -23,12 +23,13 @@ type Batcher interface {
component.Component
}

type BaseBatcher struct {
type BaseBatcher[K any] struct {
batchCfg exporterbatcher.Config
queue Queue[internal.Request]
queue Queue[K]
merger Merger[K]
maxWorkers int
workerPool chan bool
exportFunc func(ctx context.Context, req internal.Request) error
exportFunc func(ctx context.Context, req K) error
stopWG sync.WaitGroup
}

Expand All @@ -38,19 +39,20 @@ func NewBatcher(batchCfg exporterbatcher.Config,
maxWorkers int,
) (Batcher, error) {
if !batchCfg.Enabled {
return &DisabledBatcher{
BaseBatcher{
return &DisabledBatcher[internal.Request]{
BaseBatcher[internal.Request]{
batchCfg: batchCfg,
queue: queue,
merger: requestMerger{},
maxWorkers: maxWorkers,
exportFunc: exportFunc,
stopWG: sync.WaitGroup{},
},
}, nil
}

return &DefaultBatcher{
BaseBatcher: BaseBatcher{
return &DefaultBatcher[internal.Request]{
BaseBatcher: BaseBatcher[internal.Request]{
batchCfg: batchCfg,
queue: queue,
maxWorkers: maxWorkers,
Expand All @@ -60,7 +62,7 @@ func NewBatcher(batchCfg exporterbatcher.Config,
}, nil
}

func (qb *BaseBatcher) startWorkerPool() {
func (qb *BaseBatcher[K]) startWorkerPool() {
if qb.maxWorkers == 0 {
return
}
Expand All @@ -71,15 +73,15 @@ func (qb *BaseBatcher) startWorkerPool() {
}

// flush exports the incoming batch synchronously.
func (qb *BaseBatcher) flush(batchToFlush batch) {
func (qb *BaseBatcher[K]) flush(batchToFlush batch[K]) {
err := qb.exportFunc(batchToFlush.ctx, batchToFlush.req)
for _, idx := range batchToFlush.idxList {
qb.queue.OnProcessingFinished(idx, err)
}
}

// flushAsync starts a goroutine that calls flushIfNecessary. It blocks until a worker is available.
func (qb *BaseBatcher) flushAsync(batchToFlush batch) {
func (qb *BaseBatcher[K]) flushAsync(batchToFlush batch[K]) {
qb.stopWG.Add(1)
if qb.maxWorkers == 0 {
go func() {
Expand All @@ -95,3 +97,13 @@ func (qb *BaseBatcher) flushAsync(batchToFlush batch) {
qb.workerPool <- true
}()
}

type Merger[K any] interface {
MergeSplit(src, dst K, cfg exporterbatcher.MaxSizeConfig) ([]K, error)
}

type requestMerger struct{}

func (requestMerger) MergeSplit(src, dst internal.Request, cfg exporterbatcher.MaxSizeConfig) ([]internal.Request, error) {
return src.MergeSplit(context.Background(), cfg, dst)
}
40 changes: 20 additions & 20 deletions exporter/internal/queue/default_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,22 @@ import (
)

// DefaultBatcher continuously reads from the queue and flushes asynchronously if size limit is met or on timeout.
type DefaultBatcher struct {
BaseBatcher
type DefaultBatcher[K internal.Request] struct {
BaseBatcher[K]
currentBatchMu sync.Mutex
currentBatch *batch
currentBatch *batch[K]
timer *time.Timer
shutdownCh chan bool
}

func (qb *DefaultBatcher) resetTimer() {
func (qb *DefaultBatcher[K]) resetTimer() {
if qb.batchCfg.FlushTimeout != 0 {
qb.timer.Reset(qb.batchCfg.FlushTimeout)
}
}

// startReadingFlushingGoroutine starts a goroutine that reads and then flushes.
func (qb *DefaultBatcher) startReadingFlushingGoroutine() {
func (qb *DefaultBatcher[K]) startReadingFlushingGoroutine() {
qb.stopWG.Add(1)
go func() {
defer qb.stopWG.Done()
Expand All @@ -44,13 +44,13 @@ func (qb *DefaultBatcher) startReadingFlushingGoroutine() {
qb.currentBatchMu.Lock()

if qb.batchCfg.MaxSizeItems > 0 {
var reqList []internal.Request
var reqList []K
var mergeSplitErr error
if qb.currentBatch == nil || qb.currentBatch.req == nil {
if qb.currentBatch == nil {
qb.resetTimer()
reqList, mergeSplitErr = req.MergeSplit(ctx, qb.batchCfg.MaxSizeConfig, nil)
reqList, mergeSplitErr = qb.merger.MergeSplit(req, nil, qb.batchCfg.MaxSizeConfig)

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / unittest-matrix (ubuntu-latest, ~1.22)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / Integration test

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / test-coverage

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / unittest-matrix (ubuntu-latest, ~1.23)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (pkg)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (pkg)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-0)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-0)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-1)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-1)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-3)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-3)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (connector)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (connector)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (other)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (other)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-1)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-1)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / windows-unittest (windows-2025)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (internal)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (internal)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (processor)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (processor)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / windows-unittest (windows-2022)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-2)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-2)

cannot use nil as K value in argument to qb.merger.MergeSplit
} else {
reqList, mergeSplitErr = qb.currentBatch.req.MergeSplit(ctx, qb.batchCfg.MaxSizeConfig, req)
reqList, mergeSplitErr = qb.merger.MergeSplit(qb.currentBatch.req, req, qb.batchCfg.MaxSizeConfig)
}

if mergeSplitErr != nil || reqList == nil {
Expand All @@ -64,7 +64,7 @@ func (qb *DefaultBatcher) startReadingFlushingGoroutine() {
qb.currentBatch = nil
qb.currentBatchMu.Unlock()
for i := 0; i < len(reqList); i++ {
qb.flushAsync(batch{
qb.flushAsync(batch[K]{
req: reqList[i],
ctx: ctx,
idxList: []uint64{idx},
Expand All @@ -73,30 +73,30 @@ func (qb *DefaultBatcher) startReadingFlushingGoroutine() {
}
qb.resetTimer()
} else {
qb.currentBatch = &batch{
qb.currentBatch = &batch[K]{
req: reqList[0],
ctx: ctx,
idxList: []uint64{idx},
}
qb.currentBatchMu.Unlock()
}
} else {
if qb.currentBatch == nil || qb.currentBatch.req == nil {
if qb.currentBatch == nil {
qb.resetTimer()
qb.currentBatch = &batch{
qb.currentBatch = &batch[K]{
req: req,
ctx: ctx,
idxList: []uint64{idx},
}
} else {
// TODO: consolidate implementation for the cases where MaxSizeConfig is specified and the case where it is not specified
mergedReq, mergeErr := qb.currentBatch.req.MergeSplit(qb.currentBatch.ctx, qb.batchCfg.MaxSizeConfig, req)
mergedReq, mergeErr := qb.merger.MergeSplit(qb.currentBatch.req, req, qb.batchCfg.MaxSizeConfig)
if mergeErr != nil {
qb.queue.OnProcessingFinished(idx, mergeErr)
qb.currentBatchMu.Unlock()
continue
}
qb.currentBatch = &batch{
qb.currentBatch = &batch[K]{
req: mergedReq[0],
ctx: qb.currentBatch.ctx,
idxList: append(qb.currentBatch.idxList, idx),
Expand All @@ -120,7 +120,7 @@ func (qb *DefaultBatcher) startReadingFlushingGoroutine() {
}

// startTimeBasedFlushingGoroutine starts a goroutine that flushes on timeout.
func (qb *DefaultBatcher) startTimeBasedFlushingGoroutine() {
func (qb *DefaultBatcher[K]) startTimeBasedFlushingGoroutine() {
qb.stopWG.Add(1)
go func() {
defer qb.stopWG.Done()
Expand All @@ -136,7 +136,7 @@ func (qb *DefaultBatcher) startTimeBasedFlushingGoroutine() {
}

// Start starts the goroutine that reads from the queue and flushes asynchronously.
func (qb *DefaultBatcher) Start(_ context.Context, _ component.Host) error {
func (qb *DefaultBatcher[K]) Start(_ context.Context, _ component.Host) error {
// maxWorker being -1 means batcher is disabled. This is for testing queue sender metrics.
if qb.maxWorkers == -1 {
return nil
Expand All @@ -158,9 +158,9 @@ func (qb *DefaultBatcher) Start(_ context.Context, _ component.Host) error {
}

// flushCurrentBatchIfNecessary sends out the current request batch if it is not nil
func (qb *DefaultBatcher) flushCurrentBatchIfNecessary() {
func (qb *DefaultBatcher[K]) flushCurrentBatchIfNecessary() {
qb.currentBatchMu.Lock()
if qb.currentBatch == nil || qb.currentBatch.req == nil {
if qb.currentBatch == nil {
qb.currentBatchMu.Unlock()
return
}
Expand All @@ -174,7 +174,7 @@ func (qb *DefaultBatcher) flushCurrentBatchIfNecessary() {
}

// Shutdown ensures that queue and all Batcher are stopped.
func (qb *DefaultBatcher) Shutdown(_ context.Context) error {
func (qb *DefaultBatcher[K]) Shutdown(_ context.Context) error {
qb.flushCurrentBatchIfNecessary()
qb.stopWG.Wait()
return nil
Expand Down
10 changes: 5 additions & 5 deletions exporter/internal/queue/disabled_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ import (

// DisabledBatcher is a special-case of Batcher that has no size limit for sending. Any items read from the queue will
// be sent out (asynchronously) immediately regardless of the size.
type DisabledBatcher struct {
BaseBatcher
type DisabledBatcher[K any] struct {
BaseBatcher[K]
}

// Start starts the goroutine that reads from the queue and flushes asynchronously.
func (qb *DisabledBatcher) Start(_ context.Context, _ component.Host) error {
func (qb *DisabledBatcher[K]) Start(_ context.Context, _ component.Host) error {
// maxWorker being -1 means batcher is disabled. This is for testing queue sender metrics.
if qb.maxWorkers == -1 {
return nil
Expand All @@ -35,7 +35,7 @@ func (qb *DisabledBatcher) Start(_ context.Context, _ component.Host) error {
if !ok {
return
}
qb.flushAsync(batch{
qb.flushAsync(batch[K]{
req: req,
ctx: context.Background(),
idxList: []uint64{idx},
Expand All @@ -46,7 +46,7 @@ func (qb *DisabledBatcher) Start(_ context.Context, _ component.Host) error {
}

// Shutdown ensures that queue and all Batcher are stopped.
func (qb *DisabledBatcher) Shutdown(_ context.Context) error {
func (qb *DisabledBatcher[K]) Shutdown(_ context.Context) error {
qb.stopWG.Wait()
return nil
}

0 comments on commit 0ecff06

Please sign in to comment.