diff --git a/.chloggen/blocking_queue.yaml b/.chloggen/blocking_queue.yaml new file mode 100644 index 00000000000..6caead02278 --- /dev/null +++ b/.chloggen/blocking_queue.yaml @@ -0,0 +1,18 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Adds blocking queue which is used when the user sets up batching but not queuing. + +# One or more tracking issues or pull requests related to the change +issues: [8122, 10368] + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/exporter/exporterhelper/internal/base_exporter.go b/exporter/exporterhelper/internal/base_exporter.go index 416d11264c0..9be1de21bde 100644 --- a/exporter/exporterhelper/internal/base_exporter.go +++ b/exporter/exporterhelper/internal/base_exporter.go @@ -101,8 +101,23 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSe be.QueueSender = NewQueueSender(q, be.Set, be.queueCfg.NumConsumers, be.ExportFailureMessage, be.Obsrep, be.BatcherCfg) } - if !usePullingBasedExporterQueueBatcher.IsEnabled() && be.BatcherCfg.Enabled || - usePullingBasedExporterQueueBatcher.IsEnabled() && be.BatcherCfg.Enabled && !be.queueCfg.Enabled { + if usePullingBasedExporterQueueBatcher.IsEnabled() && be.BatcherCfg.Enabled && !be.queueCfg.Enabled { + be.queueFactory = exporterqueue.NewBlockingMemoryQueueFactory[internal.Request]() + be.queueCfg.QueueSize = 20 + q := be.queueFactory( + context.Background(), + exporterqueue.Settings{ + Signal: signal, + ExporterSettings: be.Set, + }, + be.queueCfg) + be.QueueSender = NewQueueSender(q, be.Set, be.queueCfg.NumConsumers, be.ExportFailureMessage, be.Obsrep, be.BatcherCfg) + for _, op := range options { + err = multierr.Append(err, op(be)) + } + } + + if !usePullingBasedExporterQueueBatcher.IsEnabled() && be.BatcherCfg.Enabled { bs := NewBatchSender(be.BatcherCfg, be.Set) be.BatchSender = bs } diff --git a/exporter/exporterhelper/internal/batch_sender_test.go b/exporter/exporterhelper/internal/batch_sender_test.go index 061aeea89a3..4a6402bde33 100644 --- a/exporter/exporterhelper/internal/batch_sender_test.go +++ b/exporter/exporterhelper/internal/batch_sender_test.go @@ -326,7 +326,7 @@ func TestBatchSender_PostShutdown(t *testing.T) { assert.Equal(t, int64(8), sink.itemsCount.Load()) }) } - runTest("enable_queue_batcher", true) + // We don't expect the same behavior when disable_queue_batcher is true runTest("disable_queue_batcher", false) } diff --git a/exporter/exporterhelper/internal/queue_sender.go b/exporter/exporterhelper/internal/queue_sender.go index 3c99cf38876..cbeb6c2af61 100644 --- a/exporter/exporterhelper/internal/queue_sender.go +++ b/exporter/exporterhelper/internal/queue_sender.go @@ -178,10 +178,12 @@ func (qs *QueueSender) Shutdown(ctx context.Context) error { func (qs *QueueSender) Send(ctx context.Context, req internal.Request) error { // Prevent cancellation and deadline to propagate to the context stored in the queue. // The grpc/http based receivers will cancel the request context after this function returns. - c := context.WithoutCancel(ctx) + if !usePullingBasedExporterQueueBatcher.IsEnabled() && !qs.queue.IsBlocking() { + ctx = context.WithoutCancel(ctx) + } - span := trace.SpanFromContext(c) - if err := qs.queue.Offer(c, req); err != nil { + span := trace.SpanFromContext(ctx) + if err := qs.queue.Offer(ctx, req); err != nil { span.AddEvent("Failed to enqueue item.", trace.WithAttributes(qs.traceAttribute)) return err } diff --git a/exporter/exporterqueue/blocking_queue.go b/exporter/exporterqueue/blocking_queue.go new file mode 100644 index 00000000000..49e30819e97 --- /dev/null +++ b/exporter/exporterqueue/blocking_queue.go @@ -0,0 +1,93 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterqueue // import "go.opentelemetry.io/collector/exporter/exporterqueue" + +import ( + "context" + "sync" + + "go.opentelemetry.io/collector/component" +) + +// boundedMemoryQueue blocks insert until the batch containing the request is sent out. +type blockingMemoryQueue[T any] struct { + component.StartFunc + *sizedChannel[blockingMemQueueEl[T]] + sizer sizer[T] + + mu sync.Mutex + nextIndex uint64 + doneCh map[uint64](chan error) +} + +// MemoryQueueSettings defines internal parameters for boundedMemoryQueue creation. +type blockingMemoryQueueSettings[T any] struct { + Sizer sizer[T] + Capacity int64 +} + +// NewBoundedMemoryQueue constructs the new queue of specified capacity, and with an optional +// callback for dropped items (e.g. useful to emit metrics). +func NewBlockingMemoryQueue[T any](set blockingMemoryQueueSettings[T]) Queue[T] { + return &blockingMemoryQueue[T]{ + sizedChannel: newSizedChannel[blockingMemQueueEl[T]](set.Capacity, nil, 0), + sizer: set.Sizer, + nextIndex: 0, + doneCh: make(map[uint64](chan error)), + } +} + +// Offer is used by the producer to submit new item to the queue. Calling this method on a stopped queue will panic. +func (q *blockingMemoryQueue[T]) Offer(ctx context.Context, req T) error { + q.mu.Lock() + index := q.nextIndex + q.nextIndex++ + done := make(chan error) + q.doneCh[index] = done + + if err := q.sizedChannel.push( + blockingMemQueueEl[T]{ctx: ctx, req: req, index: index}, + q.sizer.Sizeof(req), + nil); err != nil { + delete(q.doneCh, index) + q.mu.Unlock() + return err + } + + q.mu.Unlock() + err := <-done + return err +} + +func (q *blockingMemoryQueue[T]) Read(_ context.Context) (uint64, context.Context, T, bool) { + item, ok := q.sizedChannel.pop(func(el blockingMemQueueEl[T]) int64 { return q.sizer.Sizeof(el.req) }) + return item.index, item.ctx, item.req, ok +} + +// OnProcessingFinished should be called to remove the item of the given index from the queue once processing is finished. +// For in memory queue, this function is noop. +func (q *blockingMemoryQueue[T]) OnProcessingFinished(index uint64, err error) { + q.mu.Lock() + q.doneCh[index] <- err + delete(q.doneCh, index) + q.mu.Unlock() +} + +// Shutdown closes the queue channel to initiate draining of the queue. +func (q *blockingMemoryQueue[T]) Shutdown(context.Context) error { + q.mu.Lock() + defer q.mu.Unlock() + q.sizedChannel.shutdown() + return nil +} + +func (q *blockingMemoryQueue[T]) IsBlocking() bool { + return true +} + +type blockingMemQueueEl[T any] struct { + index uint64 + req T + ctx context.Context +} diff --git a/exporter/exporterqueue/blocking_queue_test.go b/exporter/exporterqueue/blocking_queue_test.go new file mode 100644 index 00000000000..6bce812337f --- /dev/null +++ b/exporter/exporterqueue/blocking_queue_test.go @@ -0,0 +1,37 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterqueue + +import ( + "context" + "errors" + "sync" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestBlockingMemoryQueue(t *testing.T) { + var wg sync.WaitGroup + q := NewBlockingMemoryQueue[string](blockingMemoryQueueSettings[string]{Sizer: &requestSizer[string]{}, Capacity: 1}) + + err := errors.New("This is an error") + wg.Add(1) + go func() { + assert.EqualError(t, q.Offer(context.Background(), "a"), err.Error()) // Blocks until OnProcessingFinished is called + wg.Done() + }() + + index, ctx, req, ok := q.Read(context.Background()) + for !ok { + index, ctx, req, ok = q.Read(context.Background()) + } + + require.Equal(t, uint64(0), index) + require.Equal(t, context.Background(), ctx) + require.Equal(t, "a", req) + q.OnProcessingFinished(index, err) + wg.Wait() +} diff --git a/exporter/exporterqueue/bounded_memory_queue.go b/exporter/exporterqueue/bounded_memory_queue.go index ecf5ec1649f..9ae8554c230 100644 --- a/exporter/exporterqueue/bounded_memory_queue.go +++ b/exporter/exporterqueue/bounded_memory_queue.go @@ -56,6 +56,10 @@ func (q *boundedMemoryQueue[T]) Shutdown(context.Context) error { return nil } +func (q *boundedMemoryQueue[T]) IsBlocking() bool { + return false +} + type memQueueEl[T any] struct { req T ctx context.Context diff --git a/exporter/exporterqueue/persistent_queue.go b/exporter/exporterqueue/persistent_queue.go index d930df5fabf..03a9042f2e4 100644 --- a/exporter/exporterqueue/persistent_queue.go +++ b/exporter/exporterqueue/persistent_queue.go @@ -560,3 +560,7 @@ func bytesToItemIndexArray(buf []byte) ([]uint64, error) { } return val, nil } + +func (pq *persistentQueue[T]) IsBlocking() bool { + return false +} diff --git a/exporter/exporterqueue/queue.go b/exporter/exporterqueue/queue.go index cb2a6b82de1..07998331c61 100644 --- a/exporter/exporterqueue/queue.go +++ b/exporter/exporterqueue/queue.go @@ -38,6 +38,8 @@ type Queue[T any] interface { Read(context.Context) (uint64, context.Context, T, bool) // OnProcessingFinished should be called to remove the item of the given index from the queue once processing is finished. OnProcessingFinished(index uint64, consumeErr error) + // IsBlocking returns whether the queue is blocking or not + IsBlocking() bool } // Settings defines settings for creating a queue. @@ -73,6 +75,18 @@ func NewMemoryQueueFactory[T any]() Factory[T] { } } +// NewBlockingMemoryQueue returns a factory to create a new blocking memory queue. +// Experimental: This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +func NewBlockingMemoryQueueFactory[T any]() Factory[T] { + return func(_ context.Context, _ Settings, cfg Config) Queue[T] { + return NewBlockingMemoryQueue[T](blockingMemoryQueueSettings[T]{ + Sizer: &requestSizer[T]{}, + Capacity: int64(cfg.QueueSize), + }) + } +} + // PersistentQueueSettings defines developer settings for the persistent queue factory. // Experimental: This API is at the early stage of development and may change without backward compatibility // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.