diff --git a/receiver/kafkareceiver/config.go b/receiver/kafkareceiver/config.go index 5b25c7c7c5f3..c781fe0c2cd0 100644 --- a/receiver/kafkareceiver/config.go +++ b/receiver/kafkareceiver/config.go @@ -7,6 +7,7 @@ import ( "time" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configretry" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka" @@ -85,6 +86,9 @@ type Config struct { DefaultFetchSize int32 `mapstructure:"default_fetch_size"` // The maximum bytes per fetch from Kafka (default "0", no limit) MaxFetchSize int32 `mapstructure:"max_fetch_size"` + + // In case of some errors returned by the next consumer, the receiver will wait before consuming the next message + ErrorBackOff configretry.BackOffConfig `mapstructure:"error_backoff"` } const ( diff --git a/receiver/kafkareceiver/config_test.go b/receiver/kafkareceiver/config_test.go index b5e7faa1dc93..cee1cce72207 100644 --- a/receiver/kafkareceiver/config_test.go +++ b/receiver/kafkareceiver/config_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/confmap/confmaptest" @@ -65,6 +66,9 @@ func TestLoadConfig(t *testing.T) { MinFetchSize: 1, DefaultFetchSize: 1048576, MaxFetchSize: 0, + ErrorBackOff: configretry.BackOffConfig{ + Enabled: false, + }, }, }, { @@ -101,6 +105,13 @@ func TestLoadConfig(t *testing.T) { MinFetchSize: 1, DefaultFetchSize: 1048576, MaxFetchSize: 0, + ErrorBackOff: configretry.BackOffConfig{ + Enabled: true, + InitialInterval: 1 * time.Second, + MaxInterval: 10 * time.Second, + MaxElapsedTime: 1 * time.Minute, + Multiplier: 1.5, + }, }, }, } diff --git a/receiver/kafkareceiver/kafka_receiver.go b/receiver/kafkareceiver/kafka_receiver.go index 1ec2d5aca6e9..ead8402c5578 100644 --- a/receiver/kafkareceiver/kafka_receiver.go +++ b/receiver/kafkareceiver/kafka_receiver.go @@ -9,9 +9,12 @@ import ( "fmt" "strconv" "sync" + "time" "github.com/IBM/sarama" + "github.com/cenkalti/backoff/v4" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" @@ -35,6 +38,8 @@ const ( var errInvalidInitialOffset = errors.New("invalid initial offset") +var errMemoryLimiterDataRefused = errors.New("data refused due to high memory usage") + // kafkaTracesConsumer uses sarama to consume and handle messages from kafka. type kafkaTracesConsumer struct { config Config @@ -205,6 +210,7 @@ func (c *kafkaTracesConsumer) Start(_ context.Context, host component.Host) erro messageMarking: c.messageMarking, headerExtractor: &nopHeaderExtractor{}, telemetryBuilder: c.telemetryBuilder, + backOff: newExponentialBackOff(c.config.ErrorBackOff), } if c.headerExtraction { consumerGroup.headerExtractor = &headerExtractor{ @@ -218,6 +224,20 @@ func (c *kafkaTracesConsumer) Start(_ context.Context, host component.Host) erro return nil } +func newExponentialBackOff(config configretry.BackOffConfig) *backoff.ExponentialBackOff { + if !config.Enabled { + return nil + } + backOff := backoff.NewExponentialBackOff() + backOff.InitialInterval = config.InitialInterval + backOff.RandomizationFactor = config.RandomizationFactor + backOff.Multiplier = config.Multiplier + backOff.MaxInterval = config.MaxInterval + backOff.MaxElapsedTime = config.MaxElapsedTime + backOff.Reset() + return backOff +} + func (c *kafkaTracesConsumer) consumeLoop(ctx context.Context, handler sarama.ConsumerGroupHandler) { defer c.consumeLoopWG.Done() for { @@ -481,6 +501,7 @@ type tracesConsumerGroupHandler struct { autocommitEnabled bool messageMarking MessageMarking headerExtractor HeaderExtractor + backOff *backoff.ExponentialBackOff } type metricsConsumerGroupHandler struct { @@ -582,8 +603,18 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe if c.messageMarking.After && c.messageMarking.OnError { session.MarkMessage(message, "") } + if errorRequiresBackoff(err) && c.backOff != nil { + select { + case <-session.Context().Done(): + return nil + case <-time.After(c.backOff.NextBackOff()): + } + } return err } + if c.backOff != nil { + c.backOff.Reset() + } if c.messageMarking.After { session.MarkMessage(message, "") } @@ -600,6 +631,10 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe } } +func errorRequiresBackoff(err error) bool { + return err.Error() == errMemoryLimiterDataRefused.Error() +} + func (c *metricsConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error { c.readyCloser.Do(func() { close(c.ready) diff --git a/receiver/kafkareceiver/kafka_receiver_test.go b/receiver/kafkareceiver/kafka_receiver_test.go index e353974acd91..a76b7592f845 100644 --- a/receiver/kafkareceiver/kafka_receiver_test.go +++ b/receiver/kafkareceiver/kafka_receiver_test.go @@ -11,6 +11,7 @@ import ( "time" "github.com/IBM/sarama" + "github.com/cenkalti/backoff/v4" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" @@ -340,35 +341,63 @@ func TestTracesConsumerGroupHandler_error_nextConsumer(t *testing.T) { consumerError := errors.New("failed to consume") obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings()}) require.NoError(t, err) - c := tracesConsumerGroupHandler{ - unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding), - logger: zap.NewNop(), - ready: make(chan bool), - nextConsumer: consumertest.NewErr(consumerError), - obsrecv: obsrecv, - headerExtractor: &nopHeaderExtractor{}, - telemetryBuilder: nopTelemetryBuilder(t), - } - wg := sync.WaitGroup{} - wg.Add(1) - groupClaim := &testConsumerGroupClaim{ - messageChan: make(chan *sarama.ConsumerMessage), + tests := []struct { + name string + err error + expectedBackoff time.Duration + }{ + { + name: "memory limiter data refused error", + err: errMemoryLimiterDataRefused, + expectedBackoff: backoff.DefaultInitialInterval, + }, + { + name: "consumer error that does not require backoff", + err: consumerError, + expectedBackoff: 0, + }, } - go func() { - e := c.ConsumeClaim(testConsumerGroupSession{ctx: context.Background()}, groupClaim) - assert.EqualError(t, e, consumerError.Error()) - wg.Done() - }() - td := ptrace.NewTraces() - td.ResourceSpans().AppendEmpty() - unmarshaler := &ptrace.ProtoMarshaler{} - bts, err := unmarshaler.MarshalTraces(td) - require.NoError(t, err) - groupClaim.messageChan <- &sarama.ConsumerMessage{Value: bts} - close(groupClaim.messageChan) - wg.Wait() + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + backOff := backoff.NewExponentialBackOff() + backOff.RandomizationFactor = 0 + c := tracesConsumerGroupHandler{ + unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding), + logger: zap.NewNop(), + ready: make(chan bool), + nextConsumer: consumertest.NewErr(tt.err), + obsrecv: obsrecv, + headerExtractor: &nopHeaderExtractor{}, + telemetryBuilder: nopTelemetryBuilder(t), + backOff: backOff, + } + + wg := sync.WaitGroup{} + wg.Add(1) + groupClaim := &testConsumerGroupClaim{ + messageChan: make(chan *sarama.ConsumerMessage), + } + go func() { + start := time.Now() + e := c.ConsumeClaim(testConsumerGroupSession{ctx: context.Background()}, groupClaim) + end := time.Now() + assert.EqualError(t, e, tt.err.Error()) + assert.WithinDuration(t, start.Add(tt.expectedBackoff), end, 100*time.Millisecond) + wg.Done() + }() + + td := ptrace.NewTraces() + td.ResourceSpans().AppendEmpty() + unmarshaler := &ptrace.ProtoMarshaler{} + bts, err := unmarshaler.MarshalTraces(td) + require.NoError(t, err) + groupClaim.messageChan <- &sarama.ConsumerMessage{Value: bts} + close(groupClaim.messageChan) + wg.Wait() + }) + } } func TestTracesReceiver_encoding_extension(t *testing.T) { diff --git a/receiver/kafkareceiver/testdata/config.yaml b/receiver/kafkareceiver/testdata/config.yaml index a0a744764602..835b50193198 100644 --- a/receiver/kafkareceiver/testdata/config.yaml +++ b/receiver/kafkareceiver/testdata/config.yaml @@ -35,3 +35,9 @@ kafka/logs: retry: max: 10 backoff: 5s + error_backoff: + enabled: true + initial_interval: 1s + max_interval: 10s + max_elapsed_time: 1m + multiplier: 1.5 \ No newline at end of file