Skip to content

Commit

Permalink
[chore] Use generics for exporterhelper Sender
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Jan 5, 2025
1 parent e5e12bf commit dfdcde3
Show file tree
Hide file tree
Showing 12 changed files with 38 additions and 38 deletions.
16 changes: 8 additions & 8 deletions exporter/exporterhelper/internal/base_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ var usePullingBasedExporterQueueBatcher = featuregate.GlobalRegistry().MustRegis
featuregate.WithRegisterDescription("if set to true, turns on the pulling-based exporter queue bathcer"),
)

type ObsrepSenderFactory = func(obsrep *ObsReport) RequestSender
type ObsrepSenderFactory = func(obsrep *ObsReport) Sender[internal.Request]

// Option apply changes to BaseExporter.
type Option func(*BaseExporter) error
Expand All @@ -53,10 +53,10 @@ type BaseExporter struct {
// Chain of senders that the exporter helper applies before passing the data to the actual exporter.
// The data is handled by each sender in the respective order starting from the queueSender.
// Most of the senders are optional, and initialized with a no-op path-through sender.
BatchSender RequestSender
QueueSender RequestSender
ObsrepSender RequestSender
RetrySender RequestSender
BatchSender Sender[internal.Request]
QueueSender Sender[internal.Request]
ObsrepSender Sender[internal.Request]
RetrySender Sender[internal.Request]
TimeoutSender *TimeoutSender // TimeoutSender is always initialized.

ConsumerOptions []consumer.Option
Expand All @@ -73,10 +73,10 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSe
}

be := &BaseExporter{
BatchSender: &BaseRequestSender{},
QueueSender: &BaseRequestSender{},
BatchSender: &BaseSender[internal.Request]{},
QueueSender: &BaseSender[internal.Request]{},
ObsrepSender: osf(obsReport),
RetrySender: &BaseRequestSender{},
RetrySender: &BaseSender[internal.Request]{},
TimeoutSender: &TimeoutSender{cfg: NewDefaultTimeoutConfig()},

Set: set,
Expand Down
4 changes: 2 additions & 2 deletions exporter/exporterhelper/internal/base_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ var (
}()
)

func newNoopObsrepSender(*ObsReport) RequestSender {
return &BaseRequestSender{}
func newNoopObsrepSender(*ObsReport) Sender[internal.Request] {
return &BaseSender[internal.Request]{}
}

func TestBaseExporter(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion exporter/exporterhelper/internal/batch_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
// - cfg.FlushTimeout is elapsed since the timestamp when the previous batch was sent out.
// - concurrencyLimit is reached.
type BatchSender struct {
BaseRequestSender
BaseSender[internal.Request]
cfg exporterbatcher.Config

// concurrencyLimit is the maximum number of goroutines that can be blocked by the batcher.
Expand Down
2 changes: 1 addition & 1 deletion exporter/exporterhelper/internal/queue_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (qCfg *QueueConfig) Validate() error {
}

type QueueSender struct {
BaseRequestSender
BaseSender[internal.Request]
queue exporterqueue.Queue[internal.Request]
numConsumers int
traceAttribute attribute.KeyValue
Expand Down
18 changes: 9 additions & 9 deletions exporter/exporterhelper/internal/request_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,30 @@
package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal"

import (
"context" // RequestSender is an abstraction of a sender for a request independent of the type of the data (traces, metrics, logs).
"context" // Sender is an abstraction of a sender for a request independent of the type of the data (traces, metrics, logs).

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter/internal"
)

type RequestSender interface {
type Sender[K any] interface {
component.Component
Send(context.Context, internal.Request) error
SetNextSender(nextSender RequestSender)
Send(context.Context, K) error
SetNextSender(nextSender Sender[K])
}

type BaseRequestSender struct {
type BaseSender[K any] struct {
component.StartFunc
component.ShutdownFunc
NextSender RequestSender
NextSender Sender[K]
}

var _ RequestSender = (*BaseRequestSender)(nil)
var _ Sender[internal.Request] = (*BaseSender[internal.Request])(nil)

func (b *BaseRequestSender) Send(ctx context.Context, req internal.Request) error {
func (b *BaseSender[K]) Send(ctx context.Context, req K) error {
return b.NextSender.Send(ctx, req)
}

func (b *BaseRequestSender) SetNextSender(nextSender RequestSender) {
func (b *BaseSender[K]) SetNextSender(nextSender Sender[K]) {
b.NextSender = nextSender
}
4 changes: 2 additions & 2 deletions exporter/exporterhelper/internal/retry_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func NewThrottleRetry(err error, delay time.Duration) error {
}

type retrySender struct {
BaseRequestSender
BaseSender[internal.Request]
traceAttribute attribute.KeyValue
cfg configretry.BackOffConfig
stopCh chan struct{}
Expand All @@ -65,7 +65,7 @@ func (rs *retrySender) Shutdown(context.Context) error {
return nil
}

// send implements the requestSender interface
// Send implements the requestSender interface
func (rs *retrySender) Send(ctx context.Context, req internal.Request) error {
// Do not use NewExponentialBackOff since it calls Reset and the code here must
// call Reset after changing the InitialInterval (this saves an unnecessary call to Now).
Expand Down
4 changes: 2 additions & 2 deletions exporter/exporterhelper/internal/retry_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,13 +477,13 @@ func newMockRequest(cnt int, consumeError error) *mockRequest {
}

type observabilityConsumerSender struct {
BaseRequestSender
BaseSender[internal.Request]
waitGroup *sync.WaitGroup
sentItemsCount *atomic.Int64
droppedItemsCount *atomic.Int64
}

func newObservabilityConsumerSender(*ObsReport) RequestSender {
func newObservabilityConsumerSender(*ObsReport) Sender[internal.Request] {
return &observabilityConsumerSender{
waitGroup: new(sync.WaitGroup),
droppedItemsCount: &atomic.Int64{},
Expand Down
2 changes: 1 addition & 1 deletion exporter/exporterhelper/internal/timeout_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func NewDefaultTimeoutConfig() TimeoutConfig {

// TimeoutSender is a requestSender that adds a `timeout` to every request that passes this sender.
type TimeoutSender struct {
BaseRequestSender
BaseSender[internal.Request]
cfg TimeoutConfig
}

Expand Down
6 changes: 3 additions & 3 deletions exporter/exporterhelper/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func requestFromLogs(pusher consumer.ConsumeLogsFunc) RequestFromLogsFunc {
}
}

// NewLogsRequest creates new logs exporter based on custom LogsConverter and RequestSender.
// NewLogsRequest creates new logs exporter based on custom LogsConverter and Sender.
// Experimental: This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
func NewLogsRequest(
Expand Down Expand Up @@ -148,11 +148,11 @@ func NewLogsRequest(
}

type logsExporterWithObservability struct {
internal.BaseRequestSender
internal.BaseSender[Request]
obsrep *internal.ObsReport
}

func newLogsWithObservability(obsrep *internal.ObsReport) internal.RequestSender {
func newLogsWithObservability(obsrep *internal.ObsReport) internal.Sender[Request] {
return &logsExporterWithObservability{obsrep: obsrep}
}

Expand Down
6 changes: 3 additions & 3 deletions exporter/exporterhelper/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func requestFromMetrics(pusher consumer.ConsumeMetricsFunc) RequestFromMetricsFu
}
}

// NewMetricsRequest creates a new metrics exporter based on a custom MetricsConverter and RequestSender.
// NewMetricsRequest creates a new metrics exporter based on a custom MetricsConverter and Sender.
// Experimental: This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
func NewMetricsRequest(
Expand Down Expand Up @@ -148,11 +148,11 @@ func NewMetricsRequest(
}

type metricsSenderWithObservability struct {
internal.BaseRequestSender
internal.BaseSender[Request]
obsrep *internal.ObsReport
}

func newMetricsSenderWithObservability(obsrep *internal.ObsReport) internal.RequestSender {
func newMetricsSenderWithObservability(obsrep *internal.ObsReport) internal.Sender[Request] {
return &metricsSenderWithObservability{obsrep: obsrep}
}

Expand Down
6 changes: 3 additions & 3 deletions exporter/exporterhelper/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func requestFromTraces(pusher consumer.ConsumeTracesFunc) RequestFromTracesFunc
}
}

// NewTracesRequest creates a new traces exporter based on a custom TracesConverter and RequestSender.
// NewTracesRequest creates a new traces exporter based on a custom TracesConverter and Sender.
// Experimental: This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
func NewTracesRequest(
Expand Down Expand Up @@ -148,11 +148,11 @@ func NewTracesRequest(
}

type tracesWithObservability struct {
internal.BaseRequestSender
internal.BaseSender[Request]
obsrep *internal.ObsReport
}

func newTracesWithObservability(obsrep *internal.ObsReport) internal.RequestSender {
func newTracesWithObservability(obsrep *internal.ObsReport) internal.Sender[Request] {
return &tracesWithObservability{obsrep: obsrep}
}

Expand Down
6 changes: 3 additions & 3 deletions exporter/exporterhelper/xexporterhelper/profiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func requestFromProfiles(pusher xconsumer.ConsumeProfilesFunc) RequestFromProfil
}
}

// NewProfilesRequestExporter creates a new profiles exporter based on a custom ProfilesConverter and RequestSender.
// NewProfilesRequestExporter creates a new profiles exporter based on a custom ProfilesConverter and Sender.
// Experimental: This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
func NewProfilesRequestExporter(
Expand Down Expand Up @@ -146,11 +146,11 @@ func NewProfilesRequestExporter(
}

type profilesExporterWithObservability struct {
internal.BaseRequestSender
internal.BaseSender[exporterhelper.Request]
obsrep *internal.ObsReport
}

func newProfilesExporterWithObservability(obsrep *internal.ObsReport) internal.RequestSender {
func newProfilesExporterWithObservability(obsrep *internal.ObsReport) internal.Sender[exporterhelper.Request] {
return &profilesExporterWithObservability{obsrep: obsrep}
}

Expand Down

0 comments on commit dfdcde3

Please sign in to comment.