Skip to content

Commit

Permalink
implement exponential backoff in case of memory limiter error when co…
Browse files Browse the repository at this point in the history
…nsuming traces
  • Loading branch information
yiquanzhou committed Jan 3, 2025
1 parent f21f718 commit 78a77fe
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 26 deletions.
4 changes: 4 additions & 0 deletions receiver/kafkareceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configretry"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka"
Expand Down Expand Up @@ -85,6 +86,9 @@ type Config struct {
DefaultFetchSize int32 `mapstructure:"default_fetch_size"`
// The maximum bytes per fetch from Kafka (default "0", no limit)
MaxFetchSize int32 `mapstructure:"max_fetch_size"`

// In case of some errors returned by the next consumer, the receiver will wait before consuming the next message
ErrorBackOff configretry.BackOffConfig `mapstructure:"error_backoff"`
}

const (
Expand Down
11 changes: 11 additions & 0 deletions receiver/kafkareceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/confmap/confmaptest"

Expand Down Expand Up @@ -65,6 +66,9 @@ func TestLoadConfig(t *testing.T) {
MinFetchSize: 1,
DefaultFetchSize: 1048576,
MaxFetchSize: 0,
ErrorBackOff: configretry.BackOffConfig{
Enabled: false,
},
},
},
{
Expand Down Expand Up @@ -101,6 +105,13 @@ func TestLoadConfig(t *testing.T) {
MinFetchSize: 1,
DefaultFetchSize: 1048576,
MaxFetchSize: 0,
ErrorBackOff: configretry.BackOffConfig{
Enabled: true,
InitialInterval: 1 * time.Second,
MaxInterval: 10 * time.Second,
MaxElapsedTime: 1 * time.Minute,
Multiplier: 1.5,
},
},
},
}
Expand Down
35 changes: 35 additions & 0 deletions receiver/kafkareceiver/kafka_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@ import (
"fmt"
"strconv"
"sync"
"time"

"github.com/IBM/sarama"
"github.com/cenkalti/backoff/v4"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
Expand All @@ -35,6 +38,8 @@ const (

var errInvalidInitialOffset = errors.New("invalid initial offset")

var errMemoryLimiterDataRefused = errors.New("data refused due to high memory usage")

// kafkaTracesConsumer uses sarama to consume and handle messages from kafka.
type kafkaTracesConsumer struct {
config Config
Expand Down Expand Up @@ -205,6 +210,7 @@ func (c *kafkaTracesConsumer) Start(_ context.Context, host component.Host) erro
messageMarking: c.messageMarking,
headerExtractor: &nopHeaderExtractor{},
telemetryBuilder: c.telemetryBuilder,
backOff: newExponentialBackOff(c.config.ErrorBackOff),
}
if c.headerExtraction {
consumerGroup.headerExtractor = &headerExtractor{
Expand All @@ -218,6 +224,20 @@ func (c *kafkaTracesConsumer) Start(_ context.Context, host component.Host) erro
return nil
}

func newExponentialBackOff(config configretry.BackOffConfig) *backoff.ExponentialBackOff {
if !config.Enabled {
return nil
}
backOff := backoff.NewExponentialBackOff()
backOff.InitialInterval = config.InitialInterval
backOff.RandomizationFactor = config.RandomizationFactor
backOff.Multiplier = config.Multiplier
backOff.MaxInterval = config.MaxInterval
backOff.MaxElapsedTime = config.MaxElapsedTime
backOff.Reset()
return backOff
}

func (c *kafkaTracesConsumer) consumeLoop(ctx context.Context, handler sarama.ConsumerGroupHandler) {
defer c.consumeLoopWG.Done()
for {
Expand Down Expand Up @@ -481,6 +501,7 @@ type tracesConsumerGroupHandler struct {
autocommitEnabled bool
messageMarking MessageMarking
headerExtractor HeaderExtractor
backOff *backoff.ExponentialBackOff
}

type metricsConsumerGroupHandler struct {
Expand Down Expand Up @@ -582,8 +603,18 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe
if c.messageMarking.After && c.messageMarking.OnError {
session.MarkMessage(message, "")
}
if errorRequiresBackoff(err) && c.backOff != nil {
select {
case <-session.Context().Done():
return nil
case <-time.After(c.backOff.NextBackOff()):
}
}
return err
}
if c.backOff != nil {
c.backOff.Reset()
}
if c.messageMarking.After {
session.MarkMessage(message, "")
}
Expand All @@ -600,6 +631,10 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe
}
}

func errorRequiresBackoff(err error) bool {
return err.Error() == errMemoryLimiterDataRefused.Error()
}

func (c *metricsConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error {
c.readyCloser.Do(func() {
close(c.ready)
Expand Down
81 changes: 55 additions & 26 deletions receiver/kafkareceiver/kafka_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/IBM/sarama"
"github.com/cenkalti/backoff/v4"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -340,35 +341,63 @@ func TestTracesConsumerGroupHandler_error_nextConsumer(t *testing.T) {
consumerError := errors.New("failed to consume")
obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings()})
require.NoError(t, err)
c := tracesConsumerGroupHandler{
unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding),
logger: zap.NewNop(),
ready: make(chan bool),
nextConsumer: consumertest.NewErr(consumerError),
obsrecv: obsrecv,
headerExtractor: &nopHeaderExtractor{},
telemetryBuilder: nopTelemetryBuilder(t),
}

wg := sync.WaitGroup{}
wg.Add(1)
groupClaim := &testConsumerGroupClaim{
messageChan: make(chan *sarama.ConsumerMessage),
tests := []struct {
name string
err error
expectedBackoff time.Duration
}{
{
name: "memory limiter data refused error",
err: errMemoryLimiterDataRefused,
expectedBackoff: backoff.DefaultInitialInterval,
},
{
name: "consumer error that does not require backoff",
err: consumerError,
expectedBackoff: 0,
},
}
go func() {
e := c.ConsumeClaim(testConsumerGroupSession{ctx: context.Background()}, groupClaim)
assert.EqualError(t, e, consumerError.Error())
wg.Done()
}()

td := ptrace.NewTraces()
td.ResourceSpans().AppendEmpty()
unmarshaler := &ptrace.ProtoMarshaler{}
bts, err := unmarshaler.MarshalTraces(td)
require.NoError(t, err)
groupClaim.messageChan <- &sarama.ConsumerMessage{Value: bts}
close(groupClaim.messageChan)
wg.Wait()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
backOff := backoff.NewExponentialBackOff()
backOff.RandomizationFactor = 0
c := tracesConsumerGroupHandler{
unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding),
logger: zap.NewNop(),
ready: make(chan bool),
nextConsumer: consumertest.NewErr(tt.err),
obsrecv: obsrecv,
headerExtractor: &nopHeaderExtractor{},
telemetryBuilder: nopTelemetryBuilder(t),
backOff: backOff,
}

wg := sync.WaitGroup{}
wg.Add(1)
groupClaim := &testConsumerGroupClaim{
messageChan: make(chan *sarama.ConsumerMessage),
}
go func() {
start := time.Now()
e := c.ConsumeClaim(testConsumerGroupSession{ctx: context.Background()}, groupClaim)
end := time.Now()
assert.EqualError(t, e, tt.err.Error())
assert.WithinDuration(t, start.Add(tt.expectedBackoff), end, 100*time.Millisecond)
wg.Done()
}()

td := ptrace.NewTraces()
td.ResourceSpans().AppendEmpty()
unmarshaler := &ptrace.ProtoMarshaler{}
bts, err := unmarshaler.MarshalTraces(td)
require.NoError(t, err)
groupClaim.messageChan <- &sarama.ConsumerMessage{Value: bts}
close(groupClaim.messageChan)
wg.Wait()
})
}
}

func TestTracesReceiver_encoding_extension(t *testing.T) {
Expand Down
6 changes: 6 additions & 0 deletions receiver/kafkareceiver/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,9 @@ kafka/logs:
retry:
max: 10
backoff: 5s
error_backoff:
enabled: true
initial_interval: 1s
max_interval: 10s
max_elapsed_time: 1m
multiplier: 1.5

0 comments on commit 78a77fe

Please sign in to comment.