diff --git a/exporter/exporterhelper/internal/base_exporter.go b/exporter/exporterhelper/internal/base_exporter.go index a76a725981f..0a27a6e0de6 100644 --- a/exporter/exporterhelper/internal/base_exporter.go +++ b/exporter/exporterhelper/internal/base_exporter.go @@ -108,8 +108,23 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSe } } - if !usePullingBasedExporterQueueBatcher.IsEnabled() && be.BatcherCfg.Enabled || - usePullingBasedExporterQueueBatcher.IsEnabled() && be.BatcherCfg.Enabled && !be.queueCfg.Enabled { + if usePullingBasedExporterQueueBatcher.IsEnabled() && be.BatcherCfg.Enabled && !be.queueCfg.Enabled { + be.queueFactory = exporterqueue.NewBlockingMemoryQueue[internal.Request]() + be.queueCfg.QueueSize = 20 + q := be.queueFactory( + context.Background(), + exporterqueue.Settings{ + Signal: be.Signal, + ExporterSettings: be.Set, + }, + be.queueCfg) + be.QueueSender = NewQueueSender(q, be.Set, be.queueCfg.NumConsumers, be.ExportFailureMessage, be.Obsrep, be.BatcherCfg) + for _, op := range options { + err = multierr.Append(err, op(be)) + } + } + + if !usePullingBasedExporterQueueBatcher.IsEnabled() && be.BatcherCfg.Enabled { bs := NewBatchSender(be.BatcherCfg, be.Set) be.BatchSender = bs } diff --git a/exporter/exporterhelper/internal/batch_sender_test.go b/exporter/exporterhelper/internal/batch_sender_test.go index 061aeea89a3..4a6402bde33 100644 --- a/exporter/exporterhelper/internal/batch_sender_test.go +++ b/exporter/exporterhelper/internal/batch_sender_test.go @@ -326,7 +326,7 @@ func TestBatchSender_PostShutdown(t *testing.T) { assert.Equal(t, int64(8), sink.itemsCount.Load()) }) } - runTest("enable_queue_batcher", true) + // We don't expect the same behavior when disable_queue_batcher is true runTest("disable_queue_batcher", false) } diff --git a/exporter/exporterhelper/internal/queue_sender.go b/exporter/exporterhelper/internal/queue_sender.go index 509c747115b..1a569512b38 100644 --- a/exporter/exporterhelper/internal/queue_sender.go +++ b/exporter/exporterhelper/internal/queue_sender.go @@ -178,10 +178,12 @@ func (qs *QueueSender) Shutdown(ctx context.Context) error { func (qs *QueueSender) Send(ctx context.Context, req internal.Request) error { // Prevent cancellation and deadline to propagate to the context stored in the queue. // The grpc/http based receivers will cancel the request context after this function returns. - c := context.WithoutCancel(ctx) + if !usePullingBasedExporterQueueBatcher.IsEnabled() && !qs.queue.IsBlocking() { + ctx = context.WithoutCancel(ctx) + } - span := trace.SpanFromContext(c) - if err := qs.queue.Offer(c, req); err != nil { + span := trace.SpanFromContext(ctx) + if err := qs.queue.Offer(ctx, req); err != nil { span.AddEvent("Failed to enqueue item.", trace.WithAttributes(qs.traceAttribute)) return err } diff --git a/exporter/exporterqueue/queue.go b/exporter/exporterqueue/queue.go index 724cc23e0ae..984b27d4403 100644 --- a/exporter/exporterqueue/queue.go +++ b/exporter/exporterqueue/queue.go @@ -56,6 +56,18 @@ func NewMemoryQueueFactory[T any]() Factory[T] { } } +// NewBlockingMemoryQueue returns a factory to create a new blocking memory queue. +// Experimental: This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +func NewBlockingMemoryQueue[T any]() Factory[T] { + return func(_ context.Context, _ Settings, cfg Config) Queue[T] { + return queue.NewBlockingMemoryQueue[T](queue.BlockingMemoryQueueSettings[T]{ + Sizer: &queue.RequestSizer[T]{}, + Capacity: int64(cfg.QueueSize), + }) + } +} + // PersistentQueueSettings defines developer settings for the persistent queue factory. // Experimental: This API is at the early stage of development and may change without backward compatibility // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. diff --git a/exporter/internal/queue/blocking_queue.go b/exporter/internal/queue/blocking_queue.go new file mode 100644 index 00000000000..e5a04405ba8 --- /dev/null +++ b/exporter/internal/queue/blocking_queue.go @@ -0,0 +1,93 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package queue // import "go.opentelemetry.io/collector/exporter/internal/queue" + +import ( + "context" + "sync" + + "go.opentelemetry.io/collector/component" +) + +// boundedMemoryQueue blocks insert until the batch containing the request is sent out. +type blockingMemoryQueue[T any] struct { + component.StartFunc + *sizedChannel[blockingMemQueueEl[T]] + sizer Sizer[T] + + mu sync.Mutex + nextIndex uint64 + doneCh map[uint64](chan error) +} + +// MemoryQueueSettings defines internal parameters for boundedMemoryQueue creation. +type BlockingMemoryQueueSettings[T any] struct { + Sizer Sizer[T] + Capacity int64 +} + +// NewBoundedMemoryQueue constructs the new queue of specified capacity, and with an optional +// callback for dropped items (e.g. useful to emit metrics). +func NewBlockingMemoryQueue[T any](set BlockingMemoryQueueSettings[T]) Queue[T] { + return &blockingMemoryQueue[T]{ + sizedChannel: newSizedChannel[blockingMemQueueEl[T]](set.Capacity, nil, 0), + sizer: set.Sizer, + nextIndex: 0, + doneCh: make(map[uint64](chan error)), + } +} + +// Offer is used by the producer to submit new item to the queue. Calling this method on a stopped queue will panic. +func (q *blockingMemoryQueue[T]) Offer(ctx context.Context, req T) error { + q.mu.Lock() + index := q.nextIndex + q.nextIndex++ + done := make(chan error) + q.doneCh[index] = done + + if err := q.sizedChannel.push( + blockingMemQueueEl[T]{ctx: ctx, req: req, index: index}, + q.sizer.Sizeof(req), + nil); err != nil { + delete(q.doneCh, index) + q.mu.Unlock() + return err + } + + q.mu.Unlock() + err := <-done + return err +} + +func (q *blockingMemoryQueue[T]) Read(_ context.Context) (uint64, context.Context, T, bool) { + item, ok := q.sizedChannel.pop(func(el blockingMemQueueEl[T]) int64 { return q.sizer.Sizeof(el.req) }) + return item.index, item.ctx, item.req, ok +} + +// OnProcessingFinished should be called to remove the item of the given index from the queue once processing is finished. +// For in memory queue, this function is noop. +func (q *blockingMemoryQueue[T]) OnProcessingFinished(index uint64, err error) { + q.mu.Lock() + q.doneCh[index] <- err + delete(q.doneCh, index) + q.mu.Unlock() +} + +// Shutdown closes the queue channel to initiate draining of the queue. +func (q *blockingMemoryQueue[T]) Shutdown(context.Context) error { + q.mu.Lock() + defer q.mu.Unlock() + q.sizedChannel.shutdown() + return nil +} + +func (q *blockingMemoryQueue[T]) IsBlocking() bool { + return true +} + +type blockingMemQueueEl[T any] struct { + index uint64 + req T + ctx context.Context +} diff --git a/exporter/internal/queue/blocking_queue_test.go b/exporter/internal/queue/blocking_queue_test.go new file mode 100644 index 00000000000..648b395302b --- /dev/null +++ b/exporter/internal/queue/blocking_queue_test.go @@ -0,0 +1,12 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package queue + +import ( + "testing" +) + +func TestBlockingMemoryQueue(t *testing.T) { + +} diff --git a/exporter/internal/queue/bounded_memory_queue.go b/exporter/internal/queue/bounded_memory_queue.go index 015c94473df..9864fa01fa8 100644 --- a/exporter/internal/queue/bounded_memory_queue.go +++ b/exporter/internal/queue/bounded_memory_queue.go @@ -56,6 +56,10 @@ func (q *boundedMemoryQueue[T]) Shutdown(context.Context) error { return nil } +func (q *boundedMemoryQueue[T]) IsBlocking() bool { + return false +} + type memQueueEl[T any] struct { req T ctx context.Context diff --git a/exporter/internal/queue/default_batcher.go b/exporter/internal/queue/default_batcher.go index 3023fa4df46..258f397cc48 100644 --- a/exporter/internal/queue/default_batcher.go +++ b/exporter/internal/queue/default_batcher.go @@ -106,7 +106,6 @@ func (qb *DefaultBatcher) startReadingFlushingGoroutine() { batchToFlush := *qb.currentBatch qb.currentBatch = nil qb.currentBatchMu.Unlock() - // flushAsync() blocks until successfully started a goroutine for flushing. qb.flushAsync(batchToFlush) qb.resetTimer() diff --git a/exporter/internal/queue/persistent_queue.go b/exporter/internal/queue/persistent_queue.go index 038cb09cc39..df7c614b8c2 100644 --- a/exporter/internal/queue/persistent_queue.go +++ b/exporter/internal/queue/persistent_queue.go @@ -560,3 +560,7 @@ func bytesToItemIndexArray(buf []byte) ([]uint64, error) { } return val, nil } + +func (pq *persistentQueue[T]) IsBlocking() bool { + return false +} diff --git a/exporter/internal/queue/queue.go b/exporter/internal/queue/queue.go index 77cac737f7e..f258debdbd6 100644 --- a/exporter/internal/queue/queue.go +++ b/exporter/internal/queue/queue.go @@ -34,6 +34,8 @@ type Queue[T any] interface { Read(context.Context) (uint64, context.Context, T, bool) // OnProcessingFinished should be called to remove the item of the given index from the queue once processing is finished. OnProcessingFinished(index uint64, consumeErr error) + // Returns a boolean to tell whether the queue is blocking + IsBlocking() bool } // Sizer is an interface that returns the size of the given element.