Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make batcher generic which will allow to make exporterhelper generic #12019

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 24 additions & 12 deletions exporter/internal/queue/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ import (
"go.opentelemetry.io/collector/exporter/internal"
)

type batch struct {
type batch[K any] struct {
ctx context.Context
req internal.Request
req K
idxList []uint64
}

Expand All @@ -23,12 +23,13 @@ type Batcher interface {
component.Component
}

type BaseBatcher struct {
type BaseBatcher[K any] struct {
batchCfg exporterbatcher.Config
queue Queue[internal.Request]
queue Queue[K]
merger Merger[K]
maxWorkers int
workerPool chan bool
exportFunc func(ctx context.Context, req internal.Request) error
exportFunc func(ctx context.Context, req K) error
stopWG sync.WaitGroup
}

Expand All @@ -38,19 +39,20 @@ func NewBatcher(batchCfg exporterbatcher.Config,
maxWorkers int,
) (Batcher, error) {
if !batchCfg.Enabled {
return &DisabledBatcher{
BaseBatcher{
return &DisabledBatcher[internal.Request]{
BaseBatcher[internal.Request]{
batchCfg: batchCfg,
queue: queue,
merger: requestMerger{},
maxWorkers: maxWorkers,
exportFunc: exportFunc,
stopWG: sync.WaitGroup{},
},
}, nil
}

return &DefaultBatcher{
BaseBatcher: BaseBatcher{
return &DefaultBatcher[internal.Request]{
BaseBatcher: BaseBatcher[internal.Request]{
batchCfg: batchCfg,
queue: queue,
maxWorkers: maxWorkers,
Expand All @@ -60,7 +62,7 @@ func NewBatcher(batchCfg exporterbatcher.Config,
}, nil
}

func (qb *BaseBatcher) startWorkerPool() {
func (qb *BaseBatcher[K]) startWorkerPool() {
if qb.maxWorkers == 0 {
return
}
Expand All @@ -71,15 +73,15 @@ func (qb *BaseBatcher) startWorkerPool() {
}

// flush exports the incoming batch synchronously.
func (qb *BaseBatcher) flush(batchToFlush batch) {
func (qb *BaseBatcher[K]) flush(batchToFlush batch[K]) {
err := qb.exportFunc(batchToFlush.ctx, batchToFlush.req)
for _, idx := range batchToFlush.idxList {
qb.queue.OnProcessingFinished(idx, err)
}
}

// flushAsync starts a goroutine that calls flushIfNecessary. It blocks until a worker is available.
func (qb *BaseBatcher) flushAsync(batchToFlush batch) {
func (qb *BaseBatcher[K]) flushAsync(batchToFlush batch[K]) {
qb.stopWG.Add(1)
if qb.maxWorkers == 0 {
go func() {
Expand All @@ -95,3 +97,13 @@ func (qb *BaseBatcher) flushAsync(batchToFlush batch) {
qb.workerPool <- true
}()
}

type Merger[K any] interface {
MergeSplit(src, dst K, cfg exporterbatcher.MaxSizeConfig) ([]K, error)
}

type requestMerger struct{}

func (requestMerger) MergeSplit(src, dst internal.Request, cfg exporterbatcher.MaxSizeConfig) ([]internal.Request, error) {
return src.MergeSplit(context.Background(), cfg, dst)
}
40 changes: 20 additions & 20 deletions exporter/internal/queue/default_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,22 @@
)

// DefaultBatcher continuously reads from the queue and flushes asynchronously if size limit is met or on timeout.
type DefaultBatcher struct {
BaseBatcher
type DefaultBatcher[K internal.Request] struct {
BaseBatcher[K]
currentBatchMu sync.Mutex
currentBatch *batch
currentBatch *batch[K]
timer *time.Timer
shutdownCh chan bool
}

func (qb *DefaultBatcher) resetTimer() {
func (qb *DefaultBatcher[K]) resetTimer() {
if qb.batchCfg.FlushTimeout != 0 {
qb.timer.Reset(qb.batchCfg.FlushTimeout)
}
}

// startReadingFlushingGoroutine starts a goroutine that reads and then flushes.
func (qb *DefaultBatcher) startReadingFlushingGoroutine() {
func (qb *DefaultBatcher[K]) startReadingFlushingGoroutine() {
qb.stopWG.Add(1)
go func() {
defer qb.stopWG.Done()
Expand All @@ -44,13 +44,13 @@
qb.currentBatchMu.Lock()

if qb.batchCfg.MaxSizeItems > 0 {
var reqList []internal.Request
var reqList []K
var mergeSplitErr error
if qb.currentBatch == nil || qb.currentBatch.req == nil {
if qb.currentBatch == nil {
qb.resetTimer()
reqList, mergeSplitErr = req.MergeSplit(ctx, qb.batchCfg.MaxSizeConfig, nil)
reqList, mergeSplitErr = qb.merger.MergeSplit(req, nil, qb.batchCfg.MaxSizeConfig)

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / unittest-matrix (ubuntu-latest, ~1.22)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / Integration test

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / test-coverage

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / unittest-matrix (ubuntu-latest, ~1.23)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (pkg)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (pkg)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-0)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-0)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-1)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-1)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-3)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-3)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (connector)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (connector)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (other)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (other)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-1)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-1)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / windows-unittest (windows-2025)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (internal)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (internal)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (processor)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (processor)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / windows-unittest (windows-2022)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-2)

cannot use nil as K value in argument to qb.merger.MergeSplit

Check failure on line 51 in exporter/internal/queue/default_batcher.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-2)

cannot use nil as K value in argument to qb.merger.MergeSplit
} else {
reqList, mergeSplitErr = qb.currentBatch.req.MergeSplit(ctx, qb.batchCfg.MaxSizeConfig, req)
reqList, mergeSplitErr = qb.merger.MergeSplit(qb.currentBatch.req, req, qb.batchCfg.MaxSizeConfig)
}

if mergeSplitErr != nil || reqList == nil {
Expand All @@ -64,7 +64,7 @@
qb.currentBatch = nil
qb.currentBatchMu.Unlock()
for i := 0; i < len(reqList); i++ {
qb.flushAsync(batch{
qb.flushAsync(batch[K]{
req: reqList[i],
ctx: ctx,
idxList: []uint64{idx},
Expand All @@ -73,30 +73,30 @@
}
qb.resetTimer()
} else {
qb.currentBatch = &batch{
qb.currentBatch = &batch[K]{
req: reqList[0],
ctx: ctx,
idxList: []uint64{idx},
}
qb.currentBatchMu.Unlock()
}
} else {
if qb.currentBatch == nil || qb.currentBatch.req == nil {
if qb.currentBatch == nil {
qb.resetTimer()
qb.currentBatch = &batch{
qb.currentBatch = &batch[K]{
req: req,
ctx: ctx,
idxList: []uint64{idx},
}
} else {
// TODO: consolidate implementation for the cases where MaxSizeConfig is specified and the case where it is not specified
mergedReq, mergeErr := qb.currentBatch.req.MergeSplit(qb.currentBatch.ctx, qb.batchCfg.MaxSizeConfig, req)
mergedReq, mergeErr := qb.merger.MergeSplit(qb.currentBatch.req, req, qb.batchCfg.MaxSizeConfig)
if mergeErr != nil {
qb.queue.OnProcessingFinished(idx, mergeErr)
qb.currentBatchMu.Unlock()
continue
}
qb.currentBatch = &batch{
qb.currentBatch = &batch[K]{
req: mergedReq[0],
ctx: qb.currentBatch.ctx,
idxList: append(qb.currentBatch.idxList, idx),
Expand All @@ -120,7 +120,7 @@
}

// startTimeBasedFlushingGoroutine starts a goroutine that flushes on timeout.
func (qb *DefaultBatcher) startTimeBasedFlushingGoroutine() {
func (qb *DefaultBatcher[K]) startTimeBasedFlushingGoroutine() {
qb.stopWG.Add(1)
go func() {
defer qb.stopWG.Done()
Expand All @@ -136,7 +136,7 @@
}

// Start starts the goroutine that reads from the queue and flushes asynchronously.
func (qb *DefaultBatcher) Start(_ context.Context, _ component.Host) error {
func (qb *DefaultBatcher[K]) Start(_ context.Context, _ component.Host) error {
// maxWorker being -1 means batcher is disabled. This is for testing queue sender metrics.
if qb.maxWorkers == -1 {
return nil
Expand All @@ -158,9 +158,9 @@
}

// flushCurrentBatchIfNecessary sends out the current request batch if it is not nil
func (qb *DefaultBatcher) flushCurrentBatchIfNecessary() {
func (qb *DefaultBatcher[K]) flushCurrentBatchIfNecessary() {
qb.currentBatchMu.Lock()
if qb.currentBatch == nil || qb.currentBatch.req == nil {
if qb.currentBatch == nil {
qb.currentBatchMu.Unlock()
return
}
Expand All @@ -174,7 +174,7 @@
}

// Shutdown ensures that queue and all Batcher are stopped.
func (qb *DefaultBatcher) Shutdown(_ context.Context) error {
func (qb *DefaultBatcher[K]) Shutdown(_ context.Context) error {
qb.flushCurrentBatchIfNecessary()
qb.stopWG.Wait()
return nil
Expand Down
10 changes: 5 additions & 5 deletions exporter/internal/queue/disabled_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ import (

// DisabledBatcher is a special-case of Batcher that has no size limit for sending. Any items read from the queue will
// be sent out (asynchronously) immediately regardless of the size.
type DisabledBatcher struct {
BaseBatcher
type DisabledBatcher[K any] struct {
BaseBatcher[K]
}

// Start starts the goroutine that reads from the queue and flushes asynchronously.
func (qb *DisabledBatcher) Start(_ context.Context, _ component.Host) error {
func (qb *DisabledBatcher[K]) Start(_ context.Context, _ component.Host) error {
// maxWorker being -1 means batcher is disabled. This is for testing queue sender metrics.
if qb.maxWorkers == -1 {
return nil
Expand All @@ -35,7 +35,7 @@ func (qb *DisabledBatcher) Start(_ context.Context, _ component.Host) error {
if !ok {
return
}
qb.flushAsync(batch{
qb.flushAsync(batch[K]{
req: req,
ctx: context.Background(),
idxList: []uint64{idx},
Expand All @@ -46,7 +46,7 @@ func (qb *DisabledBatcher) Start(_ context.Context, _ component.Host) error {
}

// Shutdown ensures that queue and all Batcher are stopped.
func (qb *DisabledBatcher) Shutdown(_ context.Context) error {
func (qb *DisabledBatcher[K]) Shutdown(_ context.Context) error {
qb.stopWG.Wait()
return nil
}
Loading