From 709b3e586f7744eb48ecaffd88db08b3676bb132 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20Paj=C4=85k?= Date: Mon, 3 Jun 2024 13:43:27 +0200 Subject: [PATCH 1/3] [PoC] Change Processor.OnEmit to return record passed to next registered processor --- sdk/log/batch.go | 6 +-- sdk/log/batch_test.go | 48 ++++++++++++++-------- sdk/log/bench_test.go | 88 +++++++++++++++++++++++++++++----------- sdk/log/logger.go | 5 ++- sdk/log/processor.go | 4 +- sdk/log/provider_test.go | 6 +-- sdk/log/simple.go | 4 +- sdk/log/simple_test.go | 7 ++-- 8 files changed, 114 insertions(+), 54 deletions(-) diff --git a/sdk/log/batch.go b/sdk/log/batch.go index 8e43b0e8f75..09d5b5ffcc1 100644 --- a/sdk/log/batch.go +++ b/sdk/log/batch.go @@ -176,9 +176,9 @@ func (b *BatchProcessor) poll(interval time.Duration) (done chan struct{}) { } // OnEmit batches provided log record. -func (b *BatchProcessor) OnEmit(_ context.Context, r Record) error { +func (b *BatchProcessor) OnEmit(_ context.Context, r Record) (Record, error) { if b.stopped.Load() || b.q == nil { - return nil + return r, nil } if n := b.q.Enqueue(r); n >= b.batchSize { select { @@ -189,7 +189,7 @@ func (b *BatchProcessor) OnEmit(_ context.Context, r Record) error { // records. } } - return nil + return r, nil } // Enabled returns if b is enabled. diff --git a/sdk/log/batch_test.go b/sdk/log/batch_test.go index 70b12ab04fa..919bff0820c 100644 --- a/sdk/log/batch_test.go +++ b/sdk/log/batch_test.go @@ -46,7 +46,10 @@ func TestEmptyBatchConfig(t *testing.T) { var bp BatchProcessor ctx := context.Background() var record Record - assert.NoError(t, bp.OnEmit(ctx, record), "OnEmit") + record.SetBody(log.StringValue("message")) + r, err := bp.OnEmit(ctx, record) + assert.Equal(t, record, r, "OnEmit record") + assert.NoError(t, err, "OnEmit err") assert.False(t, bp.Enabled(ctx, record), "Enabled") assert.NoError(t, bp.ForceFlush(ctx), "ForceFlush") assert.NoError(t, bp.Shutdown(ctx), "Shutdown") @@ -198,7 +201,8 @@ func TestBatchProcessor(t *testing.T) { WithExportTimeout(time.Hour), ) for _, r := range make([]Record, size) { - assert.NoError(t, b.OnEmit(ctx, r)) + _, err := b.OnEmit(ctx, r) + assert.NoError(t, err) } var got []Record assert.Eventually(t, func() bool { @@ -221,7 +225,8 @@ func TestBatchProcessor(t *testing.T) { WithExportTimeout(time.Hour), ) for _, r := range make([]Record, 10*batch) { - assert.NoError(t, b.OnEmit(ctx, r)) + _, err := b.OnEmit(ctx, r) + assert.NoError(t, err) } assert.Eventually(t, func() bool { return e.ExportN() > 1 @@ -244,7 +249,8 @@ func TestBatchProcessor(t *testing.T) { WithExportTimeout(time.Hour), ) for _, r := range make([]Record, 2*batch) { - assert.NoError(t, b.OnEmit(ctx, r)) + _, err := b.OnEmit(ctx, r) + assert.NoError(t, err) } var n int @@ -255,7 +261,7 @@ func TestBatchProcessor(t *testing.T) { var err error require.Eventually(t, func() bool { - err = b.OnEmit(ctx, Record{}) + _, err = b.OnEmit(ctx, Record{}) return true }, time.Second, time.Microsecond, "OnEmit blocked") assert.NoError(t, err) @@ -303,7 +309,8 @@ func TestBatchProcessor(t *testing.T) { assert.NoError(t, b.Shutdown(ctx)) want := e.ExportN() - assert.NoError(t, b.OnEmit(ctx, Record{})) + _, err := b.OnEmit(ctx, Record{}) + assert.NoError(t, err) assert.Equal(t, want, e.ExportN(), "Export called after shutdown") }) @@ -311,7 +318,8 @@ func TestBatchProcessor(t *testing.T) { e := newTestExporter(nil) b := NewBatchProcessor(e) - assert.NoError(t, b.OnEmit(ctx, Record{})) + _, err := b.OnEmit(ctx, Record{}) + assert.NoError(t, err) assert.NoError(t, b.Shutdown(ctx)) assert.NoError(t, b.ForceFlush(ctx)) @@ -346,7 +354,8 @@ func TestBatchProcessor(t *testing.T) { var r Record r.SetBody(log.BoolValue(true)) - require.NoError(t, b.OnEmit(ctx, r)) + _, err := b.OnEmit(ctx, r) + require.NoError(t, err) assert.ErrorIs(t, b.ForceFlush(ctx), assert.AnError, "exporter error not returned") assert.Equal(t, 1, e.ForceFlushN(), "exporter ForceFlush calls") @@ -381,7 +390,8 @@ func TestBatchProcessor(t *testing.T) { // Enqueue 10 x "batch size" amount of records. for i := 0; i < 10*batch; i++ { - require.NoError(t, b.OnEmit(ctx, Record{})) + _, err := b.OnEmit(ctx, Record{}) + require.NoError(t, err) } assert.Eventually(t, func() bool { return e.ExportN() > 0 && len(b.exporter.input) == cap(b.exporter.input) @@ -425,7 +435,7 @@ func TestBatchProcessor(t *testing.T) { var r Record r.SetBody(log.BoolValue(true)) - _ = b.OnEmit(ctx, r) + _, _ = b.OnEmit(ctx, r) t.Cleanup(func() { _ = b.Shutdown(ctx) }) t.Cleanup(func() { close(e.ExportTrigger) }) @@ -455,21 +465,26 @@ func TestBatchProcessor(t *testing.T) { ) var r Record // First record will be blocked by testExporter.Export - assert.NoError(t, b.OnEmit(ctx, r), "exported record") + + _, err := b.OnEmit(ctx, r) + assert.NoError(t, err, "exported record") require.Eventually(t, func() bool { return e.ExportN() > 0 }, 2*time.Second, time.Microsecond, "blocked export not attempted") // Second record will be written to export queue - assert.NoError(t, b.OnEmit(ctx, r), "export queue record") + _, err = b.OnEmit(ctx, r) + assert.NoError(t, err, "export queue record") require.Eventually(t, func() bool { return len(b.exporter.input) == cap(b.exporter.input) }, 2*time.Second, time.Microsecond, "blocked queue read not attempted") // Third record will be written to BatchProcessor.q - assert.NoError(t, b.OnEmit(ctx, r), "first queued") + _, err = b.OnEmit(ctx, r) + assert.NoError(t, err, "first queued") // The previous record will be dropped, as the new one will be written to BatchProcessor.q - assert.NoError(t, b.OnEmit(ctx, r), "second queued") + _, err = b.OnEmit(ctx, r) + assert.NoError(t, err, "second queued") wantMsg := `"level"=1 "msg"="dropped log records" "dropped"=1` assert.Eventually(t, func() bool { @@ -497,7 +512,8 @@ func TestBatchProcessor(t *testing.T) { case <-ctx.Done(): return default: - assert.NoError(t, b.OnEmit(ctx, Record{})) + _, err := b.OnEmit(ctx, Record{}) + assert.NoError(t, err) // Ignore partial flush errors. _ = b.ForceFlush(ctx) } @@ -663,7 +679,7 @@ func BenchmarkBatchProcessorOnEmit(b *testing.B) { b.RunParallel(func(pb *testing.PB) { var err error for pb.Next() { - err = bp.OnEmit(ctx, r) + r, err = bp.OnEmit(ctx, r) } _ = err }) diff --git a/sdk/log/bench_test.go b/sdk/log/bench_test.go index ff5d6fe2bfa..0d471ee89cc 100644 --- a/sdk/log/bench_test.go +++ b/sdk/log/bench_test.go @@ -16,47 +16,67 @@ import ( func BenchmarkProcessor(b *testing.B) { for _, tc := range []struct { name string - f func() Processor + f func() []Processor }{ { name: "Simple", - f: func() Processor { - return NewSimpleProcessor(noopExporter{}) + f: func() []Processor { + return []Processor{ + NewSimpleProcessor(noopExporter{}), + } }, }, { name: "Batch", - f: func() Processor { - return NewBatchProcessor(noopExporter{}) + f: func() []Processor { + return []Processor{ + NewBatchProcessor(noopExporter{}), + } }, }, { name: "ModifyTimestampSimple", - f: func() Processor { - return timestampDecorator{NewSimpleProcessor(noopExporter{})} + f: func() []Processor { + return []Processor{ + timestampSetter{}, + NewSimpleProcessor(noopExporter{}), + } }, }, { name: "ModifyTimestampBatch", - f: func() Processor { - return timestampDecorator{NewBatchProcessor(noopExporter{})} + f: func() []Processor { + return []Processor{ + timestampSetter{}, + NewBatchProcessor(noopExporter{}), + } }, }, { name: "ModifyAttributesSimple", - f: func() Processor { - return attrDecorator{NewSimpleProcessor(noopExporter{})} + f: func() []Processor { + return []Processor{ + attrSetter{}, + NewSimpleProcessor(noopExporter{}), + } }, }, { name: "ModifyAttributesBatch", - f: func() Processor { - return attrDecorator{NewBatchProcessor(noopExporter{})} + f: func() []Processor { + return []Processor{ + attrSetter{}, + NewBatchProcessor(noopExporter{}), + } }, }, } { b.Run(tc.name, func(b *testing.B) { - provider := NewLoggerProvider(WithProcessor(tc.f())) + var opts []LoggerProviderOption + for _, p := range tc.f() { + opts = append(opts, WithProcessor(p)) + } + provider := NewLoggerProvider(opts...) b.Cleanup(func() { assert.NoError(b, provider.Shutdown(context.Background())) }) logger := provider.Logger(b.Name()) @@ -79,22 +99,42 @@ func BenchmarkProcessor(b *testing.B) { } } -type timestampDecorator struct { - Processor +type timestampSetter struct { } -func (e timestampDecorator) OnEmit(ctx context.Context, r Record) error { - r = r.Clone() +func (e timestampSetter) OnEmit(ctx context.Context, r Record) (Record, error) { r.SetObservedTimestamp(time.Date(1988, time.November, 17, 0, 0, 0, 0, time.UTC)) - return e.Processor.OnEmit(ctx, r) + return r, nil +} + +func (e timestampSetter) Enabled(context.Context, Record) bool { + return true +} + +func (e timestampSetter) Shutdown(ctx context.Context) error { + return nil +} + +func (e timestampSetter) ForceFlush(ctx context.Context) error { + return nil } -type attrDecorator struct { - Processor +type attrSetter struct { } -func (e attrDecorator) OnEmit(ctx context.Context, r Record) error { - r = r.Clone() +func (e attrSetter) OnEmit(ctx context.Context, r Record) (Record, error) { r.SetAttributes(log.String("replace", "me")) - return e.Processor.OnEmit(ctx, r) + return r, nil +} + +func (e attrSetter) Enabled(context.Context, Record) bool { + return true +} + +func (e attrSetter) Shutdown(ctx context.Context) error { + return nil +} + +func (e attrSetter) ForceFlush(ctx context.Context) error { + return nil } diff --git a/sdk/log/logger.go b/sdk/log/logger.go index 245867f3fd6..6a01080c633 100644 --- a/sdk/log/logger.go +++ b/sdk/log/logger.go @@ -34,9 +34,10 @@ func newLogger(p *LoggerProvider, scope instrumentation.Scope) *logger { } func (l *logger) Emit(ctx context.Context, r log.Record) { - newRecord := l.newRecord(ctx, r) + record := l.newRecord(ctx, r) for _, p := range l.provider.processors { - if err := p.OnEmit(ctx, newRecord); err != nil { + var err error + if record, err = p.OnEmit(ctx, record); err != nil { otel.Handle(err) } } diff --git a/sdk/log/processor.go b/sdk/log/processor.go index f95ea949027..55cf4fa9bb2 100644 --- a/sdk/log/processor.go +++ b/sdk/log/processor.go @@ -28,7 +28,9 @@ type Processor interface { // // Before modifying a Record, the implementation must use Record.Clone // to create a copy that shares no state with the original. - OnEmit(ctx context.Context, record Record) error + // + // The returned record is passed to the next registered processor. + OnEmit(ctx context.Context, record Record) (Record, error) // Enabled returns whether the Processor will process for the given context // and record. // diff --git a/sdk/log/provider_test.go b/sdk/log/provider_test.go index bfa8afcda1d..d0c6408c89c 100644 --- a/sdk/log/provider_test.go +++ b/sdk/log/provider_test.go @@ -36,13 +36,13 @@ func newProcessor(name string) *processor { return &processor{Name: name, enabled: true} } -func (p *processor) OnEmit(ctx context.Context, r Record) error { +func (p *processor) OnEmit(ctx context.Context, r Record) (Record, error) { if p.Err != nil { - return p.Err + return r, p.Err } p.records = append(p.records, r) - return nil + return r, nil } func (p *processor) Enabled(context.Context, Record) bool { diff --git a/sdk/log/simple.go b/sdk/log/simple.go index c7aa14b8706..6d9fedd1572 100644 --- a/sdk/log/simple.go +++ b/sdk/log/simple.go @@ -31,8 +31,8 @@ func NewSimpleProcessor(exporter Exporter, _ ...SimpleProcessorOption) *SimplePr } // OnEmit batches provided log record. -func (s *SimpleProcessor) OnEmit(ctx context.Context, r Record) error { - return s.exporter.Export(ctx, []Record{r}) +func (s *SimpleProcessor) OnEmit(ctx context.Context, r Record) (Record, error) { + return r, s.exporter.Export(ctx, []Record{r}) } // Enabled returns true. diff --git a/sdk/log/simple_test.go b/sdk/log/simple_test.go index 805130465b0..aa4c29a493d 100644 --- a/sdk/log/simple_test.go +++ b/sdk/log/simple_test.go @@ -44,10 +44,11 @@ func TestSimpleProcessorOnEmit(t *testing.T) { var r log.Record r.SetSeverityText("test") - _ = s.OnEmit(context.Background(), r) + nextR, _ := s.OnEmit(context.Background(), r) require.True(t, e.exportCalled, "exporter Export not called") assert.Equal(t, []log.Record{r}, e.records) + assert.Equal(t, r, nextR) } func TestSimpleProcessorEnabled(t *testing.T) { @@ -83,7 +84,7 @@ func TestSimpleProcessorConcurrentSafe(t *testing.T) { go func() { defer wg.Done() - _ = s.OnEmit(ctx, r) + _, _ = s.OnEmit(ctx, r) _ = s.Enabled(ctx, r) _ = s.Shutdown(ctx) _ = s.ForceFlush(ctx) @@ -105,7 +106,7 @@ func BenchmarkSimpleProcessorOnEmit(b *testing.B) { var out error for pb.Next() { - out = s.OnEmit(ctx, r) + r, out = s.OnEmit(ctx, r) } _ = out From 5b5730509b5892f93338654acf7ec2448b8433a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20Paj=C4=85k?= Date: Mon, 3 Jun 2024 13:47:58 +0200 Subject: [PATCH 2/3] Add Clone --- sdk/log/bench_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdk/log/bench_test.go b/sdk/log/bench_test.go index 0d471ee89cc..9982cdcd672 100644 --- a/sdk/log/bench_test.go +++ b/sdk/log/bench_test.go @@ -103,6 +103,7 @@ type timestampSetter struct { } func (e timestampSetter) OnEmit(ctx context.Context, r Record) (Record, error) { + r = r.Clone() r.SetObservedTimestamp(time.Date(1988, time.November, 17, 0, 0, 0, 0, time.UTC)) return r, nil } @@ -123,6 +124,7 @@ type attrSetter struct { } func (e attrSetter) OnEmit(ctx context.Context, r Record) (Record, error) { + r = r.Clone() r.SetAttributes(log.String("replace", "me")) return r, nil } From 66cf41bc27dbdeb9965c25e045a82ab8e47bb5ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20Paj=C4=85k?= Date: Mon, 3 Jun 2024 13:50:05 +0200 Subject: [PATCH 3/3] gofumpt --- sdk/log/bench_test.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/sdk/log/bench_test.go b/sdk/log/bench_test.go index 9982cdcd672..07f900cbc31 100644 --- a/sdk/log/bench_test.go +++ b/sdk/log/bench_test.go @@ -99,8 +99,7 @@ func BenchmarkProcessor(b *testing.B) { } } -type timestampSetter struct { -} +type timestampSetter struct{} func (e timestampSetter) OnEmit(ctx context.Context, r Record) (Record, error) { r = r.Clone() @@ -120,8 +119,7 @@ func (e timestampSetter) ForceFlush(ctx context.Context) error { return nil } -type attrSetter struct { -} +type attrSetter struct{} func (e attrSetter) OnEmit(ctx context.Context, r Record) (Record, error) { r = r.Clone()