Skip to content

Commit

Permalink
[chore] Remove the need of calling startWorkerPool, simplify flushing
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Jan 6, 2025
1 parent 306c939 commit ec1732c
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 55 deletions.
78 changes: 33 additions & 45 deletions exporter/internal/queue/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ type Batcher interface {
}

type BaseBatcher struct {
batchCfg exporterbatcher.Config
queue Queue[internal.Request]
batchCfg exporterbatcher.Config
queue Queue[internal.Request]
// TODO: Remove when the -1 hack for testing is removed.
maxWorkers int
workerPool chan bool
exportFunc func(ctx context.Context, req internal.Request) error
Expand All @@ -38,60 +39,47 @@ func NewBatcher(batchCfg exporterbatcher.Config,
maxWorkers int,
) (Batcher, error) {
if !batchCfg.Enabled {
return &DisabledBatcher{
BaseBatcher{
batchCfg: batchCfg,
queue: queue,
maxWorkers: maxWorkers,
exportFunc: exportFunc,
stopWG: sync.WaitGroup{},
},
}, nil
return &DisabledBatcher{BaseBatcher: newBaseBatcher(batchCfg, queue, exportFunc, maxWorkers)}, nil
}

return &DefaultBatcher{
BaseBatcher: BaseBatcher{
batchCfg: batchCfg,
queue: queue,
maxWorkers: maxWorkers,
exportFunc: exportFunc,
stopWG: sync.WaitGroup{},
},
}, nil
return &DefaultBatcher{BaseBatcher: newBaseBatcher(batchCfg, queue, exportFunc, maxWorkers)}, nil
}

func (qb *BaseBatcher) startWorkerPool() {
if qb.maxWorkers == 0 {
return
func newBaseBatcher(batchCfg exporterbatcher.Config,
queue Queue[internal.Request],
exportFunc func(ctx context.Context, req internal.Request) error,
maxWorkers int,
) BaseBatcher {
var workerPool chan bool
if maxWorkers > 0 {
workerPool = make(chan bool, maxWorkers)
for i := 0; i < maxWorkers; i++ {
workerPool <- true
}
}
qb.workerPool = make(chan bool, qb.maxWorkers)
for i := 0; i < qb.maxWorkers; i++ {
qb.workerPool <- true
return BaseBatcher{
batchCfg: batchCfg,
queue: queue,
maxWorkers: maxWorkers,
workerPool: workerPool,
exportFunc: exportFunc,
stopWG: sync.WaitGroup{},
}
}

// flush exports the incoming batch synchronously.
// flush starts a goroutine that calls exportFunc. It blocks until a worker is available if necessary.
func (qb *BaseBatcher) flush(batchToFlush batch) {
err := qb.exportFunc(batchToFlush.ctx, batchToFlush.req)
for _, idx := range batchToFlush.idxList {
qb.queue.OnProcessingFinished(idx, err)
}
}

// flushAsync starts a goroutine that calls flushIfNecessary. It blocks until a worker is available.
func (qb *BaseBatcher) flushAsync(batchToFlush batch) {
qb.stopWG.Add(1)
if qb.maxWorkers == 0 {
go func() {
defer qb.stopWG.Done()
qb.flush(batchToFlush)
}()
return
if qb.workerPool != nil {
<-qb.workerPool
}
<-qb.workerPool
go func() {
defer qb.stopWG.Done()
qb.flush(batchToFlush)
qb.workerPool <- true
err := qb.exportFunc(batchToFlush.ctx, batchToFlush.req)
for _, idx := range batchToFlush.idxList {
qb.queue.OnProcessingFinished(idx, err)
}
if qb.workerPool != nil {
qb.workerPool <- true
}
}()
}
11 changes: 5 additions & 6 deletions exporter/internal/queue/default_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (qb *DefaultBatcher) startReadingFlushingGoroutine() {
qb.currentBatch = nil
qb.currentBatchMu.Unlock()
for i := 0; i < len(reqList); i++ {
qb.flushAsync(batch{
qb.flush(batch{
req: reqList[i],
ctx: ctx,
idxList: []uint64{idx},
Expand Down Expand Up @@ -108,8 +108,8 @@ func (qb *DefaultBatcher) startReadingFlushingGoroutine() {
qb.currentBatch = nil
qb.currentBatchMu.Unlock()

// flushAsync() blocks until successfully started a goroutine for flushing.
qb.flushAsync(batchToFlush)
// flush() blocks until successfully started a goroutine for flushing.
qb.flush(batchToFlush)
qb.resetTimer()
} else {
qb.currentBatchMu.Unlock()
Expand Down Expand Up @@ -142,7 +142,6 @@ func (qb *DefaultBatcher) Start(_ context.Context, _ component.Host) error {
return nil
}

qb.startWorkerPool()
qb.shutdownCh = make(chan bool, 1)

if qb.batchCfg.FlushTimeout == 0 {
Expand All @@ -168,8 +167,8 @@ func (qb *DefaultBatcher) flushCurrentBatchIfNecessary() {
qb.currentBatch = nil
qb.currentBatchMu.Unlock()

// flushAsync() blocks until successfully started a goroutine for flushing.
qb.flushAsync(batchToFlush)
// flush() blocks until successfully started a goroutine for flushing.
qb.flush(batchToFlush)
qb.resetTimer()
}

Expand Down
6 changes: 2 additions & 4 deletions exporter/internal/queue/disabled_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@ func (qb *DisabledBatcher) Start(_ context.Context, _ component.Host) error {
return nil
}

qb.startWorkerPool()

// This goroutine reads and then flushes.
// 1. Reading from the queue is blocked until the queue is non-empty or until the queue is stopped.
// 2. flushAsync() blocks until there are idle workers in the worker pool.
// 2. flush() blocks until there are idle workers in the worker pool.
qb.stopWG.Add(1)
go func() {
defer qb.stopWG.Done()
Expand All @@ -35,7 +33,7 @@ func (qb *DisabledBatcher) Start(_ context.Context, _ component.Host) error {
if !ok {
return
}
qb.flushAsync(batch{
qb.flush(batch{
req: req,
ctx: context.Background(),
idxList: []uint64{idx},
Expand Down

0 comments on commit ec1732c

Please sign in to comment.