From 1c3b656a372794f15491593bcadc62dc2e3da945 Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Tue, 7 Jan 2025 09:07:46 -0300 Subject: [PATCH 1/8] Add support for setting a document id --- exporter/elasticsearchexporter/README.md | 5 + exporter/elasticsearchexporter/bulkindexer.go | 15 ++- .../elasticsearchexporter/bulkindexer_test.go | 8 +- exporter/elasticsearchexporter/config.go | 3 + exporter/elasticsearchexporter/config_test.go | 9 ++ exporter/elasticsearchexporter/exporter.go | 26 ++++- .../elasticsearchexporter/exporter_test.go | 100 ++++++++++++++++++ exporter/elasticsearchexporter/factory.go | 3 + 8 files changed, 157 insertions(+), 12 deletions(-) diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index 13ecfa53507d..b418e37e561c 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -145,6 +145,11 @@ This can be customised through the following settings: - `prefix_separator`(default=`-`): Set a separator between logstash_prefix and date. - `date_format`(default=`%Y.%m.%d`): Time format (based on strftime) to generate the second part of the Index name. +- `logs_dynamic_id` (optional): Dynamically determines the document ID to be used in Elasticsearch based on resource, scope, or log record attributes. + - `enabled`(default=false): Enable/Disable dynamic ID for log records. If `elasticsearch.document_id` exists in attributes (precedence: log record attribute > scope attribute > resource attribute), it will be used as the document ID. Otherwise, the document ID will be generated by Elasticsearch. + +TODO(mauri870): Add metrics and traces dynamic ID support. + ### Elasticsearch document mapping The Elasticsearch exporter supports several document schemas and preprocessing diff --git a/exporter/elasticsearchexporter/bulkindexer.go b/exporter/elasticsearchexporter/bulkindexer.go index 2200216be4ef..bfd3b9c25134 100644 --- a/exporter/elasticsearchexporter/bulkindexer.go +++ b/exporter/elasticsearchexporter/bulkindexer.go @@ -31,7 +31,7 @@ type bulkIndexer interface { type bulkIndexerSession interface { // Add adds a document to the bulk indexing session. - Add(ctx context.Context, index string, document io.WriterTo, dynamicTemplates map[string]string) error + Add(ctx context.Context, index string, document io.WriterTo, dynamicTemplates map[string]string, docID *string) error // End must be called on the session object once it is no longer // needed, in order to release any associated resources. @@ -126,8 +126,12 @@ type syncBulkIndexerSession struct { } // Add adds an item to the sync bulk indexer session. -func (s *syncBulkIndexerSession) Add(ctx context.Context, index string, document io.WriterTo, dynamicTemplates map[string]string) error { - err := s.bi.Add(docappender.BulkIndexerItem{Index: index, Body: document, DynamicTemplates: dynamicTemplates}) +func (s *syncBulkIndexerSession) Add(ctx context.Context, index string, document io.WriterTo, dynamicTemplates map[string]string, docID *string) error { + doc := docappender.BulkIndexerItem{Index: index, Body: document, DynamicTemplates: dynamicTemplates} + if docID != nil { + doc.DocumentID = *docID + } + err := s.bi.Add(doc) if err != nil { return err } @@ -248,12 +252,15 @@ func (a *asyncBulkIndexer) Close(ctx context.Context) error { // Add adds an item to the async bulk indexer session. // // Adding an item after a call to Close() will panic. -func (s asyncBulkIndexerSession) Add(ctx context.Context, index string, document io.WriterTo, dynamicTemplates map[string]string) error { +func (s asyncBulkIndexerSession) Add(ctx context.Context, index string, document io.WriterTo, dynamicTemplates map[string]string, docID *string) error { item := docappender.BulkIndexerItem{ Index: index, Body: document, DynamicTemplates: dynamicTemplates, } + if docID != nil { + item.DocumentID = *docID + } select { case <-ctx.Done(): return ctx.Err() diff --git a/exporter/elasticsearchexporter/bulkindexer_test.go b/exporter/elasticsearchexporter/bulkindexer_test.go index 2b3d86a30128..850fffe48cd7 100644 --- a/exporter/elasticsearchexporter/bulkindexer_test.go +++ b/exporter/elasticsearchexporter/bulkindexer_test.go @@ -102,7 +102,7 @@ func TestAsyncBulkIndexer_flush(t *testing.T) { session, err := bulkIndexer.StartSession(context.Background()) require.NoError(t, err) - assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil)) + assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil, nil)) // should flush time.Sleep(100 * time.Millisecond) assert.Equal(t, int64(1), bulkIndexer.stats.docsIndexed.Load()) @@ -229,7 +229,7 @@ func TestAsyncBulkIndexer_flush_error(t *testing.T) { session, err := bulkIndexer.StartSession(context.Background()) require.NoError(t, err) - assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil)) + assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil, nil)) // should flush time.Sleep(100 * time.Millisecond) assert.Equal(t, int64(0), bulkIndexer.stats.docsIndexed.Load()) @@ -312,7 +312,7 @@ func runBulkIndexerOnce(t *testing.T, config *Config, client *elasticsearch.Clie session, err := bulkIndexer.StartSession(context.Background()) require.NoError(t, err) - assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil)) + assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil, nil)) assert.NoError(t, bulkIndexer.Close(context.Background())) return bulkIndexer @@ -338,7 +338,7 @@ func TestSyncBulkIndexer_flushBytes(t *testing.T) { session, err := bi.StartSession(context.Background()) require.NoError(t, err) - assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil)) + assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil, nil)) assert.Equal(t, int64(1), reqCnt.Load()) // flush due to flush::bytes assert.NoError(t, bi.Close(context.Background())) } diff --git a/exporter/elasticsearchexporter/config.go b/exporter/elasticsearchexporter/config.go index 0835396d928f..198b03ac57a8 100644 --- a/exporter/elasticsearchexporter/config.go +++ b/exporter/elasticsearchexporter/config.go @@ -61,6 +61,9 @@ type Config struct { // fall back to pure TracesIndex, if 'elasticsearch.index.prefix' or 'elasticsearch.index.suffix' are not found in resource or attribute (prio: resource > attribute) TracesDynamicIndex DynamicIndexSetting `mapstructure:"traces_dynamic_index"` + // LogsDynamicID is used to configure the document id for logs. + LogsDynamicID DynamicIndexSetting `mapstructure:"logs_dynamic_id"` + // Pipeline configures the ingest node pipeline name that should be used to process the // events. // diff --git a/exporter/elasticsearchexporter/config_test.go b/exporter/elasticsearchexporter/config_test.go index b83beb3e91ba..310a5501887d 100644 --- a/exporter/elasticsearchexporter/config_test.go +++ b/exporter/elasticsearchexporter/config_test.go @@ -74,6 +74,9 @@ func TestConfig(t *testing.T) { TracesDynamicIndex: DynamicIndexSetting{ Enabled: false, }, + LogsDynamicID: DynamicIndexSetting{ + Enabled: false, + }, Pipeline: "mypipeline", ClientConfig: withDefaultHTTPClientConfig(func(cfg *confighttp.ClientConfig) { cfg.Timeout = 2 * time.Minute @@ -146,6 +149,9 @@ func TestConfig(t *testing.T) { TracesDynamicIndex: DynamicIndexSetting{ Enabled: false, }, + LogsDynamicID: DynamicIndexSetting{ + Enabled: false, + }, Pipeline: "mypipeline", ClientConfig: withDefaultHTTPClientConfig(func(cfg *confighttp.ClientConfig) { cfg.Timeout = 2 * time.Minute @@ -218,6 +224,9 @@ func TestConfig(t *testing.T) { TracesDynamicIndex: DynamicIndexSetting{ Enabled: false, }, + LogsDynamicID: DynamicIndexSetting{ + Enabled: false, + }, Pipeline: "mypipeline", ClientConfig: withDefaultHTTPClientConfig(func(cfg *confighttp.ClientConfig) { cfg.Timeout = 2 * time.Minute diff --git a/exporter/elasticsearchexporter/exporter.go b/exporter/elasticsearchexporter/exporter.go index ebd3800858a2..47d1efe33b38 100644 --- a/exporter/elasticsearchexporter/exporter.go +++ b/exporter/elasticsearchexporter/exporter.go @@ -23,6 +23,11 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/objmodel" ) +const ( + // documentIDAttributeName is the attribute name used to specify the document ID. + documentIDAttributeName = "elasticsearch.document_id" +) + type elasticsearchExporter struct { component.TelemetrySettings userAgent string @@ -177,7 +182,9 @@ func (e *elasticsearchExporter) pushLogRecord( if err != nil { return fmt.Errorf("failed to encode log event: %w", err) } - return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(document), nil) + + docID := e.getDocumentIDAttribute(record.Attributes(), scope.Attributes(), resource.Attributes()) + return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(document), nil, docID) } func (e *elasticsearchExporter) pushMetricsData( @@ -300,7 +307,8 @@ func (e *elasticsearchExporter) pushMetricsData( errs = append(errs, err) continue } - if err := session.Add(ctx, fIndex, bytes.NewReader(docBytes), doc.DynamicTemplates()); err != nil { + + if err := session.Add(ctx, fIndex, bytes.NewReader(docBytes), doc.DynamicTemplates(), nil); err != nil { if cerr := ctx.Err(); cerr != nil { return cerr } @@ -415,7 +423,7 @@ func (e *elasticsearchExporter) pushTraceRecord( if err != nil { return fmt.Errorf("failed to encode trace record: %w", err) } - return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(document), nil) + return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(document), nil, nil) } func (e *elasticsearchExporter) pushSpanEvent( @@ -449,5 +457,15 @@ func (e *elasticsearchExporter) pushSpanEvent( if err != nil { return err } - return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(docBytes), nil) + return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(docBytes), nil, nil) +} + +func (e *elasticsearchExporter) getDocumentIDAttribute(attributeMaps ...pcommon.Map) *string { + if e.config.LogsDynamicID.Enabled { + docID, ok := getFromAttributes(documentIDAttributeName, "", attributeMaps...) + if docID != "" && ok { + return &docID + } + } + return nil } diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go index 5554c089e02b..6bb2cf839dc2 100644 --- a/exporter/elasticsearchexporter/exporter_test.go +++ b/exporter/elasticsearchexporter/exporter_test.go @@ -734,6 +734,95 @@ func TestExporterLogs(t *testing.T) { assert.JSONEq(t, `{"a":"a","a.b":"a.b"}`, gjson.GetBytes(doc, `scope.attributes`).Raw) assert.JSONEq(t, `{"a":"a","a.b":"a.b"}`, gjson.GetBytes(doc, `resource.attributes`).Raw) }) + + t.Run("publish logs with dynamic id", func(t *testing.T) { + exampleDocID := "abc123" + tableTests := []struct { + name string + expectedDocID *string // nil means the _id will not be set + recordAttrs map[string]any + scopeAttrs map[string]any + resourceAttrs map[string]any + }{ + { + name: "missing document id attribute should not set _id", + expectedDocID: nil, + }, + { + name: "record attributes", + expectedDocID: &exampleDocID, + recordAttrs: map[string]any{ + documentIDAttributeName: exampleDocID, + }, + }, + { + name: "scope attributes", + expectedDocID: &exampleDocID, + scopeAttrs: map[string]any{ + documentIDAttributeName: exampleDocID, + }, + }, + { + name: "resource attributes", + expectedDocID: &exampleDocID, + resourceAttrs: map[string]any{ + documentIDAttributeName: exampleDocID, + }, + }, + { + name: "record attributes takes precedence over others", + expectedDocID: &exampleDocID, + recordAttrs: map[string]any{ + documentIDAttributeName: exampleDocID, + }, + scopeAttrs: map[string]any{ + documentIDAttributeName: "id1", + }, + resourceAttrs: map[string]any{ + documentIDAttributeName: "id2", + }, + }, + { + name: "scope attributes takes precedence over resource attributes", + expectedDocID: &exampleDocID, + scopeAttrs: map[string]any{ + documentIDAttributeName: exampleDocID, + }, + resourceAttrs: map[string]any{ + documentIDAttributeName: "id1", + }, + }, + } + + for _, tt := range tableTests { + t.Run(tt.name, func(t *testing.T) { + rec := newBulkRecorder() + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + rec.Record(docs) + + if tt.expectedDocID == nil { + assert.NotContains(t, string(docs[0].Action), "_id", "expected _id to not be set") + } else { + assert.Equal(t, *tt.expectedDocID, actionJSONToID(t, docs[0].Action), "expected _id to be set") + } + return itemsAllOK(docs) + }) + + exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) { + cfg.LogsDynamicID.Enabled = true + }) + logs := newLogsWithAttributes( + tt.recordAttrs, + tt.scopeAttrs, + tt.resourceAttrs, + ) + logs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body().SetStr("hello world") + mustSendLogs(t, exporter, logs) + + rec.WaitItems(1) + }) + } + }) } func TestExporterMetrics(t *testing.T) { @@ -1909,3 +1998,14 @@ func actionJSONToIndex(t *testing.T, actionJSON json.RawMessage) string { require.NoError(t, err) return action.Create.Index } + +func actionJSONToID(t *testing.T, actionJSON json.RawMessage) string { + action := struct { + Create struct { + ID string `json:"_id"` + } `json:"create"` + }{} + err := json.Unmarshal(actionJSON, &action) + require.NoError(t, err) + return action.Create.ID +} diff --git a/exporter/elasticsearchexporter/factory.go b/exporter/elasticsearchexporter/factory.go index 4783d430196a..5ce0116d14f2 100644 --- a/exporter/elasticsearchexporter/factory.go +++ b/exporter/elasticsearchexporter/factory.go @@ -63,6 +63,9 @@ func createDefaultConfig() component.Config { TracesDynamicIndex: DynamicIndexSetting{ Enabled: false, }, + LogsDynamicID: DynamicIndexSetting{ + Enabled: false, + }, Retry: RetrySettings{ Enabled: true, MaxRetries: 0, // default is set in exporter code From 7128be6341fbe629794650d4f7c570bbf801b05d Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Tue, 7 Jan 2025 09:52:10 -0300 Subject: [PATCH 2/8] run tests in parallel, test with sync and async bulk indexer --- .../elasticsearchexporter/exporter_test.go | 64 ++++++++++++------- 1 file changed, 40 insertions(+), 24 deletions(-) diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go index 6bb2cf839dc2..9612a279c965 100644 --- a/exporter/elasticsearchexporter/exporter_test.go +++ b/exporter/elasticsearchexporter/exporter_test.go @@ -736,6 +736,7 @@ func TestExporterLogs(t *testing.T) { }) t.Run("publish logs with dynamic id", func(t *testing.T) { + t.Parallel() exampleDocID := "abc123" tableTests := []struct { name string @@ -794,33 +795,48 @@ func TestExporterLogs(t *testing.T) { }, } + cfgs := map[string]func(*Config){ + "sync": func(cfg *Config) { + batcherEnabled := false + cfg.Batcher.Enabled = &batcherEnabled + }, + "async": func(cfg *Config) { + batcherEnabled := true + cfg.Batcher.Enabled = &batcherEnabled + cfg.Batcher.FlushTimeout = 10 * time.Millisecond + }, + } for _, tt := range tableTests { - t.Run(tt.name, func(t *testing.T) { - rec := newBulkRecorder() - server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { - rec.Record(docs) - - if tt.expectedDocID == nil { - assert.NotContains(t, string(docs[0].Action), "_id", "expected _id to not be set") - } else { - assert.Equal(t, *tt.expectedDocID, actionJSONToID(t, docs[0].Action), "expected _id to be set") - } - return itemsAllOK(docs) - }) + for cfgName, cfgFn := range cfgs { + t.Run(tt.name+"/"+cfgName, func(t *testing.T) { + t.Parallel() + rec := newBulkRecorder() + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + rec.Record(docs) + + if tt.expectedDocID == nil { + assert.NotContains(t, string(docs[0].Action), "_id", "expected _id to not be set") + } else { + assert.Equal(t, *tt.expectedDocID, actionJSONToID(t, docs[0].Action), "expected _id to be set") + } + return itemsAllOK(docs) + }) - exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) { - cfg.LogsDynamicID.Enabled = true + exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) { + cfg.LogsDynamicID.Enabled = true + cfgFn(cfg) + }) + logs := newLogsWithAttributes( + tt.recordAttrs, + tt.scopeAttrs, + tt.resourceAttrs, + ) + logs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body().SetStr("hello world") + mustSendLogs(t, exporter, logs) + + rec.WaitItems(1) }) - logs := newLogsWithAttributes( - tt.recordAttrs, - tt.scopeAttrs, - tt.resourceAttrs, - ) - logs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body().SetStr("hello world") - mustSendLogs(t, exporter, logs) - - rec.WaitItems(1) - }) + } } }) } From c277b557f9a5154da048639cef1af3663812af81 Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Tue, 7 Jan 2025 10:41:27 -0300 Subject: [PATCH 3/8] add changelog entry --- ...elasticsearchexporter_logs_dynamic_id.yaml | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 .chloggen/elasticsearchexporter_logs_dynamic_id.yaml diff --git a/.chloggen/elasticsearchexporter_logs_dynamic_id.yaml b/.chloggen/elasticsearchexporter_logs_dynamic_id.yaml new file mode 100644 index 000000000000..a846bbcc1445 --- /dev/null +++ b/.chloggen/elasticsearchexporter_logs_dynamic_id.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: elasticsearchexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Support for complex attributes for log records in OTel mode + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [36882] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] From 89f19d399fdd7cc1b641d17e3eb126b253a5d9d0 Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Tue, 7 Jan 2025 10:44:50 -0300 Subject: [PATCH 4/8] remove todo for metrics and traces --- exporter/elasticsearchexporter/README.md | 2 -- 1 file changed, 2 deletions(-) diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index b418e37e561c..986171f40d27 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -148,8 +148,6 @@ This can be customised through the following settings: - `logs_dynamic_id` (optional): Dynamically determines the document ID to be used in Elasticsearch based on resource, scope, or log record attributes. - `enabled`(default=false): Enable/Disable dynamic ID for log records. If `elasticsearch.document_id` exists in attributes (precedence: log record attribute > scope attribute > resource attribute), it will be used as the document ID. Otherwise, the document ID will be generated by Elasticsearch. -TODO(mauri870): Add metrics and traces dynamic ID support. - ### Elasticsearch document mapping The Elasticsearch exporter supports several document schemas and preprocessing From dc2505745485323b3f9a9b3021ed415f4d267aea Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Wed, 8 Jan 2025 09:09:44 -0300 Subject: [PATCH 5/8] only look into record attributes --- exporter/elasticsearchexporter/README.md | 4 +- exporter/elasticsearchexporter/exporter.go | 6 +-- .../elasticsearchexporter/exporter_test.go | 43 +------------------ 3 files changed, 7 insertions(+), 46 deletions(-) diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index 986171f40d27..fb60c308ffba 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -145,8 +145,8 @@ This can be customised through the following settings: - `prefix_separator`(default=`-`): Set a separator between logstash_prefix and date. - `date_format`(default=`%Y.%m.%d`): Time format (based on strftime) to generate the second part of the Index name. -- `logs_dynamic_id` (optional): Dynamically determines the document ID to be used in Elasticsearch based on resource, scope, or log record attributes. - - `enabled`(default=false): Enable/Disable dynamic ID for log records. If `elasticsearch.document_id` exists in attributes (precedence: log record attribute > scope attribute > resource attribute), it will be used as the document ID. Otherwise, the document ID will be generated by Elasticsearch. +- `logs_dynamic_id` (optional): Dynamically determines the document ID to be used in Elasticsearch based on a log record attribute. + - `enabled`(default=false): Enable/Disable dynamic ID for log records. If `elasticsearch.document_id` exists in the log record attributes, it will be used as the document ID. Otherwise, the document ID will be generated by Elasticsearch. ### Elasticsearch document mapping diff --git a/exporter/elasticsearchexporter/exporter.go b/exporter/elasticsearchexporter/exporter.go index 47d1efe33b38..6ca7c465b880 100644 --- a/exporter/elasticsearchexporter/exporter.go +++ b/exporter/elasticsearchexporter/exporter.go @@ -183,7 +183,7 @@ func (e *elasticsearchExporter) pushLogRecord( return fmt.Errorf("failed to encode log event: %w", err) } - docID := e.getDocumentIDAttribute(record.Attributes(), scope.Attributes(), resource.Attributes()) + docID := e.getDocumentIDAttribute(record.Attributes()) return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(document), nil, docID) } @@ -460,9 +460,9 @@ func (e *elasticsearchExporter) pushSpanEvent( return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(docBytes), nil, nil) } -func (e *elasticsearchExporter) getDocumentIDAttribute(attributeMaps ...pcommon.Map) *string { +func (e *elasticsearchExporter) getDocumentIDAttribute(m pcommon.Map) *string { if e.config.LogsDynamicID.Enabled { - docID, ok := getFromAttributes(documentIDAttributeName, "", attributeMaps...) + docID, ok := getFromAttributes(documentIDAttributeName, "", m) if docID != "" && ok { return &docID } diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go index 9612a279c965..86e62a6364b7 100644 --- a/exporter/elasticsearchexporter/exporter_test.go +++ b/exporter/elasticsearchexporter/exporter_test.go @@ -742,8 +742,6 @@ func TestExporterLogs(t *testing.T) { name string expectedDocID *string // nil means the _id will not be set recordAttrs map[string]any - scopeAttrs map[string]any - resourceAttrs map[string]any }{ { name: "missing document id attribute should not set _id", @@ -756,43 +754,6 @@ func TestExporterLogs(t *testing.T) { documentIDAttributeName: exampleDocID, }, }, - { - name: "scope attributes", - expectedDocID: &exampleDocID, - scopeAttrs: map[string]any{ - documentIDAttributeName: exampleDocID, - }, - }, - { - name: "resource attributes", - expectedDocID: &exampleDocID, - resourceAttrs: map[string]any{ - documentIDAttributeName: exampleDocID, - }, - }, - { - name: "record attributes takes precedence over others", - expectedDocID: &exampleDocID, - recordAttrs: map[string]any{ - documentIDAttributeName: exampleDocID, - }, - scopeAttrs: map[string]any{ - documentIDAttributeName: "id1", - }, - resourceAttrs: map[string]any{ - documentIDAttributeName: "id2", - }, - }, - { - name: "scope attributes takes precedence over resource attributes", - expectedDocID: &exampleDocID, - scopeAttrs: map[string]any{ - documentIDAttributeName: exampleDocID, - }, - resourceAttrs: map[string]any{ - documentIDAttributeName: "id1", - }, - }, } cfgs := map[string]func(*Config){ @@ -828,8 +789,8 @@ func TestExporterLogs(t *testing.T) { }) logs := newLogsWithAttributes( tt.recordAttrs, - tt.scopeAttrs, - tt.resourceAttrs, + map[string]any{}, + map[string]any{}, ) logs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body().SetStr("hello world") mustSendLogs(t, exporter, logs) From 7f2557c15c13a5e5144b1bb09e54952e18f911d0 Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Thu, 9 Jan 2025 08:35:39 -0300 Subject: [PATCH 6/8] fixes from code review --- ...elasticsearchexporter_logs_dynamic_id.yaml | 2 +- exporter/elasticsearchexporter/bulkindexer.go | 14 ++++++------- .../elasticsearchexporter/bulkindexer_test.go | 8 +++---- exporter/elasticsearchexporter/exporter.go | 14 ++++++------- .../elasticsearchexporter/exporter_test.go | 21 ++++++++++++------- 5 files changed, 33 insertions(+), 26 deletions(-) diff --git a/.chloggen/elasticsearchexporter_logs_dynamic_id.yaml b/.chloggen/elasticsearchexporter_logs_dynamic_id.yaml index a846bbcc1445..9acda46d18f8 100644 --- a/.chloggen/elasticsearchexporter_logs_dynamic_id.yaml +++ b/.chloggen/elasticsearchexporter_logs_dynamic_id.yaml @@ -7,7 +7,7 @@ change_type: enhancement component: elasticsearchexporter # A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). -note: Support for complex attributes for log records in OTel mode +note: Support for dynamically setting the document ID of log records. # Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. issues: [36882] diff --git a/exporter/elasticsearchexporter/bulkindexer.go b/exporter/elasticsearchexporter/bulkindexer.go index bfd3b9c25134..49953e06ddff 100644 --- a/exporter/elasticsearchexporter/bulkindexer.go +++ b/exporter/elasticsearchexporter/bulkindexer.go @@ -31,7 +31,7 @@ type bulkIndexer interface { type bulkIndexerSession interface { // Add adds a document to the bulk indexing session. - Add(ctx context.Context, index string, document io.WriterTo, dynamicTemplates map[string]string, docID *string) error + Add(ctx context.Context, index string, docID string, document io.WriterTo, dynamicTemplates map[string]string) error // End must be called on the session object once it is no longer // needed, in order to release any associated resources. @@ -126,10 +126,10 @@ type syncBulkIndexerSession struct { } // Add adds an item to the sync bulk indexer session. -func (s *syncBulkIndexerSession) Add(ctx context.Context, index string, document io.WriterTo, dynamicTemplates map[string]string, docID *string) error { +func (s *syncBulkIndexerSession) Add(ctx context.Context, index string, docID string, document io.WriterTo, dynamicTemplates map[string]string) error { doc := docappender.BulkIndexerItem{Index: index, Body: document, DynamicTemplates: dynamicTemplates} - if docID != nil { - doc.DocumentID = *docID + if docID != "" { + doc.DocumentID = docID } err := s.bi.Add(doc) if err != nil { @@ -252,14 +252,14 @@ func (a *asyncBulkIndexer) Close(ctx context.Context) error { // Add adds an item to the async bulk indexer session. // // Adding an item after a call to Close() will panic. -func (s asyncBulkIndexerSession) Add(ctx context.Context, index string, document io.WriterTo, dynamicTemplates map[string]string, docID *string) error { +func (s asyncBulkIndexerSession) Add(ctx context.Context, index string, docID string, document io.WriterTo, dynamicTemplates map[string]string) error { item := docappender.BulkIndexerItem{ Index: index, Body: document, DynamicTemplates: dynamicTemplates, } - if docID != nil { - item.DocumentID = *docID + if docID != "" { + item.DocumentID = docID } select { case <-ctx.Done(): diff --git a/exporter/elasticsearchexporter/bulkindexer_test.go b/exporter/elasticsearchexporter/bulkindexer_test.go index 850fffe48cd7..9f2139e83710 100644 --- a/exporter/elasticsearchexporter/bulkindexer_test.go +++ b/exporter/elasticsearchexporter/bulkindexer_test.go @@ -102,7 +102,7 @@ func TestAsyncBulkIndexer_flush(t *testing.T) { session, err := bulkIndexer.StartSession(context.Background()) require.NoError(t, err) - assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil, nil)) + assert.NoError(t, session.Add(context.Background(), "foo", "", strings.NewReader(`{"foo": "bar"}`), nil)) // should flush time.Sleep(100 * time.Millisecond) assert.Equal(t, int64(1), bulkIndexer.stats.docsIndexed.Load()) @@ -229,7 +229,7 @@ func TestAsyncBulkIndexer_flush_error(t *testing.T) { session, err := bulkIndexer.StartSession(context.Background()) require.NoError(t, err) - assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil, nil)) + assert.NoError(t, session.Add(context.Background(), "foo", "", strings.NewReader(`{"foo": "bar"}`), nil)) // should flush time.Sleep(100 * time.Millisecond) assert.Equal(t, int64(0), bulkIndexer.stats.docsIndexed.Load()) @@ -312,7 +312,7 @@ func runBulkIndexerOnce(t *testing.T, config *Config, client *elasticsearch.Clie session, err := bulkIndexer.StartSession(context.Background()) require.NoError(t, err) - assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil, nil)) + assert.NoError(t, session.Add(context.Background(), "foo", "", strings.NewReader(`{"foo": "bar"}`), nil)) assert.NoError(t, bulkIndexer.Close(context.Background())) return bulkIndexer @@ -338,7 +338,7 @@ func TestSyncBulkIndexer_flushBytes(t *testing.T) { session, err := bi.StartSession(context.Background()) require.NoError(t, err) - assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil, nil)) + assert.NoError(t, session.Add(context.Background(), "foo", "", strings.NewReader(`{"foo": "bar"}`), nil)) assert.Equal(t, int64(1), reqCnt.Load()) // flush due to flush::bytes assert.NoError(t, bi.Close(context.Background())) } diff --git a/exporter/elasticsearchexporter/exporter.go b/exporter/elasticsearchexporter/exporter.go index 6ca7c465b880..eb59cafbea79 100644 --- a/exporter/elasticsearchexporter/exporter.go +++ b/exporter/elasticsearchexporter/exporter.go @@ -184,7 +184,7 @@ func (e *elasticsearchExporter) pushLogRecord( } docID := e.getDocumentIDAttribute(record.Attributes()) - return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(document), nil, docID) + return bulkIndexerSession.Add(ctx, fIndex, docID, bytes.NewReader(document), nil) } func (e *elasticsearchExporter) pushMetricsData( @@ -308,7 +308,7 @@ func (e *elasticsearchExporter) pushMetricsData( continue } - if err := session.Add(ctx, fIndex, bytes.NewReader(docBytes), doc.DynamicTemplates(), nil); err != nil { + if err := session.Add(ctx, fIndex, "", bytes.NewReader(docBytes), doc.DynamicTemplates()); err != nil { if cerr := ctx.Err(); cerr != nil { return cerr } @@ -423,7 +423,7 @@ func (e *elasticsearchExporter) pushTraceRecord( if err != nil { return fmt.Errorf("failed to encode trace record: %w", err) } - return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(document), nil, nil) + return bulkIndexerSession.Add(ctx, fIndex, "", bytes.NewReader(document), nil) } func (e *elasticsearchExporter) pushSpanEvent( @@ -457,15 +457,15 @@ func (e *elasticsearchExporter) pushSpanEvent( if err != nil { return err } - return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(docBytes), nil, nil) + return bulkIndexerSession.Add(ctx, fIndex, "", bytes.NewReader(docBytes), nil) } -func (e *elasticsearchExporter) getDocumentIDAttribute(m pcommon.Map) *string { +func (e *elasticsearchExporter) getDocumentIDAttribute(m pcommon.Map) string { if e.config.LogsDynamicID.Enabled { docID, ok := getFromAttributes(documentIDAttributeName, "", m) if docID != "" && ok { - return &docID + return docID } } - return nil + return "" } diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go index 86e62a6364b7..7f4f4f2a659b 100644 --- a/exporter/elasticsearchexporter/exporter_test.go +++ b/exporter/elasticsearchexporter/exporter_test.go @@ -740,16 +740,23 @@ func TestExporterLogs(t *testing.T) { exampleDocID := "abc123" tableTests := []struct { name string - expectedDocID *string // nil means the _id will not be set + expectedDocID string // "" means the _id will not be set recordAttrs map[string]any }{ { name: "missing document id attribute should not set _id", - expectedDocID: nil, + expectedDocID: "", + }, + { + name: "empty document id attribute should not set _id", + expectedDocID: "", + recordAttrs: map[string]any{ + documentIDAttributeName: "", + }, }, { name: "record attributes", - expectedDocID: &exampleDocID, + expectedDocID: exampleDocID, recordAttrs: map[string]any{ documentIDAttributeName: exampleDocID, }, @@ -757,11 +764,11 @@ func TestExporterLogs(t *testing.T) { } cfgs := map[string]func(*Config){ - "sync": func(cfg *Config) { + "async": func(cfg *Config) { batcherEnabled := false cfg.Batcher.Enabled = &batcherEnabled }, - "async": func(cfg *Config) { + "sync": func(cfg *Config) { batcherEnabled := true cfg.Batcher.Enabled = &batcherEnabled cfg.Batcher.FlushTimeout = 10 * time.Millisecond @@ -775,10 +782,10 @@ func TestExporterLogs(t *testing.T) { server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { rec.Record(docs) - if tt.expectedDocID == nil { + if tt.expectedDocID == "" { assert.NotContains(t, string(docs[0].Action), "_id", "expected _id to not be set") } else { - assert.Equal(t, *tt.expectedDocID, actionJSONToID(t, docs[0].Action), "expected _id to be set") + assert.Equal(t, tt.expectedDocID, actionJSONToID(t, docs[0].Action), "expected _id to be set") } return itemsAllOK(docs) }) From 108079071e6cef13b4b4d3eb9d526852d017c2b9 Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Thu, 9 Jan 2025 08:38:13 -0300 Subject: [PATCH 7/8] clarify about empty string value --- exporter/elasticsearchexporter/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index fb60c308ffba..b77844edeb0a 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -146,7 +146,7 @@ This can be customised through the following settings: - `date_format`(default=`%Y.%m.%d`): Time format (based on strftime) to generate the second part of the Index name. - `logs_dynamic_id` (optional): Dynamically determines the document ID to be used in Elasticsearch based on a log record attribute. - - `enabled`(default=false): Enable/Disable dynamic ID for log records. If `elasticsearch.document_id` exists in the log record attributes, it will be used as the document ID. Otherwise, the document ID will be generated by Elasticsearch. + - `enabled`(default=false): Enable/Disable dynamic ID for log records. If `elasticsearch.document_id` exists and is != "" in the log record attributes, it will be used as the document ID. Otherwise, the document ID will be generated by Elasticsearch. ### Elasticsearch document mapping From 9e0ad2f4c090d5b971ecb4d069f01da14ffd99a0 Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Thu, 9 Jan 2025 15:17:09 -0300 Subject: [PATCH 8/8] remove id attribute from the final document --- exporter/elasticsearchexporter/exporter.go | 5 +++-- exporter/elasticsearchexporter/exporter_test.go | 4 ++++ 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/exporter/elasticsearchexporter/exporter.go b/exporter/elasticsearchexporter/exporter.go index eb59cafbea79..172f088cf650 100644 --- a/exporter/elasticsearchexporter/exporter.go +++ b/exporter/elasticsearchexporter/exporter.go @@ -178,12 +178,12 @@ func (e *elasticsearchExporter) pushLogRecord( fIndex = formattedIndex } + docID := e.extractDocumentIDAttribute(record.Attributes()) document, err := e.model.encodeLog(resource, resourceSchemaURL, record, scope, scopeSchemaURL) if err != nil { return fmt.Errorf("failed to encode log event: %w", err) } - docID := e.getDocumentIDAttribute(record.Attributes()) return bulkIndexerSession.Add(ctx, fIndex, docID, bytes.NewReader(document), nil) } @@ -460,9 +460,10 @@ func (e *elasticsearchExporter) pushSpanEvent( return bulkIndexerSession.Add(ctx, fIndex, "", bytes.NewReader(docBytes), nil) } -func (e *elasticsearchExporter) getDocumentIDAttribute(m pcommon.Map) string { +func (e *elasticsearchExporter) extractDocumentIDAttribute(m pcommon.Map) string { if e.config.LogsDynamicID.Enabled { docID, ok := getFromAttributes(documentIDAttributeName, "", m) + m.Remove(documentIDAttributeName) if docID != "" && ok { return docID } diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go index 7f4f4f2a659b..a0a838785aea 100644 --- a/exporter/elasticsearchexporter/exporter_test.go +++ b/exporter/elasticsearchexporter/exporter_test.go @@ -787,10 +787,14 @@ func TestExporterLogs(t *testing.T) { } else { assert.Equal(t, tt.expectedDocID, actionJSONToID(t, docs[0].Action), "expected _id to be set") } + + // Ensure the document id attribute is removed from the final document. + assert.NotContains(t, docs[0].Document, documentIDAttributeName, "expected document id attribute to be removed") return itemsAllOK(docs) }) exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) { + cfg.Mapping.Mode = "otel" cfg.LogsDynamicID.Enabled = true cfgFn(cfg) })