Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[elasticsearchexporter]: Add dynamic document id support for logs #37065

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
27 changes: 27 additions & 0 deletions .chloggen/elasticsearchexporter_logs_dynamic_id.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: elasticsearchexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Support for complex attributes for log records in OTel mode
mauri870 marked this conversation as resolved.
Show resolved Hide resolved

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [36882]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
3 changes: 3 additions & 0 deletions exporter/elasticsearchexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ This can be customised through the following settings:
- `prefix_separator`(default=`-`): Set a separator between logstash_prefix and date.
- `date_format`(default=`%Y.%m.%d`): Time format (based on strftime) to generate the second part of the Index name.

- `logs_dynamic_id` (optional): Dynamically determines the document ID to be used in Elasticsearch based on resource, scope, or log record attributes.
- `enabled`(default=false): Enable/Disable dynamic ID for log records. If `elasticsearch.document_id` exists in attributes (precedence: log record attribute > scope attribute > resource attribute), it will be used as the document ID. Otherwise, the document ID will be generated by Elasticsearch.
mauri870 marked this conversation as resolved.
Show resolved Hide resolved

### Elasticsearch document mapping

The Elasticsearch exporter supports several document schemas and preprocessing
Expand Down
15 changes: 11 additions & 4 deletions exporter/elasticsearchexporter/bulkindexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type bulkIndexer interface {

type bulkIndexerSession interface {
// Add adds a document to the bulk indexing session.
Add(ctx context.Context, index string, document io.WriterTo, dynamicTemplates map[string]string) error
Add(ctx context.Context, index string, document io.WriterTo, dynamicTemplates map[string]string, docID *string) error

// End must be called on the session object once it is no longer
// needed, in order to release any associated resources.
Expand Down Expand Up @@ -126,8 +126,12 @@ type syncBulkIndexerSession struct {
}

// Add adds an item to the sync bulk indexer session.
func (s *syncBulkIndexerSession) Add(ctx context.Context, index string, document io.WriterTo, dynamicTemplates map[string]string) error {
err := s.bi.Add(docappender.BulkIndexerItem{Index: index, Body: document, DynamicTemplates: dynamicTemplates})
func (s *syncBulkIndexerSession) Add(ctx context.Context, index string, document io.WriterTo, dynamicTemplates map[string]string, docID *string) error {
doc := docappender.BulkIndexerItem{Index: index, Body: document, DynamicTemplates: dynamicTemplates}
if docID != nil {
doc.DocumentID = *docID
}
err := s.bi.Add(doc)
if err != nil {
return err
}
Expand Down Expand Up @@ -248,12 +252,15 @@ func (a *asyncBulkIndexer) Close(ctx context.Context) error {
// Add adds an item to the async bulk indexer session.
//
// Adding an item after a call to Close() will panic.
func (s asyncBulkIndexerSession) Add(ctx context.Context, index string, document io.WriterTo, dynamicTemplates map[string]string) error {
func (s asyncBulkIndexerSession) Add(ctx context.Context, index string, document io.WriterTo, dynamicTemplates map[string]string, docID *string) error {
item := docappender.BulkIndexerItem{
Index: index,
Body: document,
DynamicTemplates: dynamicTemplates,
}
if docID != nil {
item.DocumentID = *docID
}
select {
case <-ctx.Done():
return ctx.Err()
Expand Down
8 changes: 4 additions & 4 deletions exporter/elasticsearchexporter/bulkindexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func TestAsyncBulkIndexer_flush(t *testing.T) {
session, err := bulkIndexer.StartSession(context.Background())
require.NoError(t, err)

assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil))
assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil, nil))
// should flush
time.Sleep(100 * time.Millisecond)
assert.Equal(t, int64(1), bulkIndexer.stats.docsIndexed.Load())
Expand Down Expand Up @@ -229,7 +229,7 @@ func TestAsyncBulkIndexer_flush_error(t *testing.T) {
session, err := bulkIndexer.StartSession(context.Background())
require.NoError(t, err)

assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil))
assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil, nil))
// should flush
time.Sleep(100 * time.Millisecond)
assert.Equal(t, int64(0), bulkIndexer.stats.docsIndexed.Load())
Expand Down Expand Up @@ -312,7 +312,7 @@ func runBulkIndexerOnce(t *testing.T, config *Config, client *elasticsearch.Clie
session, err := bulkIndexer.StartSession(context.Background())
require.NoError(t, err)

assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil))
assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil, nil))
assert.NoError(t, bulkIndexer.Close(context.Background()))

return bulkIndexer
Expand All @@ -338,7 +338,7 @@ func TestSyncBulkIndexer_flushBytes(t *testing.T) {
session, err := bi.StartSession(context.Background())
require.NoError(t, err)

assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil))
assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil, nil))
assert.Equal(t, int64(1), reqCnt.Load()) // flush due to flush::bytes
assert.NoError(t, bi.Close(context.Background()))
}
3 changes: 3 additions & 0 deletions exporter/elasticsearchexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ type Config struct {
// fall back to pure TracesIndex, if 'elasticsearch.index.prefix' or 'elasticsearch.index.suffix' are not found in resource or attribute (prio: resource > attribute)
TracesDynamicIndex DynamicIndexSetting `mapstructure:"traces_dynamic_index"`

// LogsDynamicID is used to configure the document id for logs.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// LogsDynamicID is used to configure the document id for logs.
// LogsDynamicID configures whether log record attribute `elasticsearch.document_id` is set as the document ID in ES.

LogsDynamicID DynamicIndexSetting `mapstructure:"logs_dynamic_id"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
LogsDynamicID DynamicIndexSetting `mapstructure:"logs_dynamic_id"`
LogsDynamicID bool `mapstructure:"logs_dynamic_id"`

nit: This has nothing to do with Index, which makes the use of DynamicIndexSetting slightly odd.

Also, personally I find this 1 level of nesting not very ergonomic. Unless we plan to add features to dynamic ID in the future e.g. logs_dynamic_id::attribute_name to make the attribute name configurable, I feel like adding a layer just to have a ::enabled bool not very useful. You can tell I'm already not very happy with *_dynamic_index::enabled. But I'd be interested to know other codeowners' opinion on this.


// Pipeline configures the ingest node pipeline name that should be used to process the
// events.
//
Expand Down
9 changes: 9 additions & 0 deletions exporter/elasticsearchexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ func TestConfig(t *testing.T) {
TracesDynamicIndex: DynamicIndexSetting{
Enabled: false,
},
LogsDynamicID: DynamicIndexSetting{
Enabled: false,
},
Pipeline: "mypipeline",
ClientConfig: withDefaultHTTPClientConfig(func(cfg *confighttp.ClientConfig) {
cfg.Timeout = 2 * time.Minute
Expand Down Expand Up @@ -146,6 +149,9 @@ func TestConfig(t *testing.T) {
TracesDynamicIndex: DynamicIndexSetting{
Enabled: false,
},
LogsDynamicID: DynamicIndexSetting{
Enabled: false,
},
Pipeline: "mypipeline",
ClientConfig: withDefaultHTTPClientConfig(func(cfg *confighttp.ClientConfig) {
cfg.Timeout = 2 * time.Minute
Expand Down Expand Up @@ -218,6 +224,9 @@ func TestConfig(t *testing.T) {
TracesDynamicIndex: DynamicIndexSetting{
Enabled: false,
},
LogsDynamicID: DynamicIndexSetting{
Enabled: false,
},
Pipeline: "mypipeline",
ClientConfig: withDefaultHTTPClientConfig(func(cfg *confighttp.ClientConfig) {
cfg.Timeout = 2 * time.Minute
Expand Down
26 changes: 22 additions & 4 deletions exporter/elasticsearchexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/objmodel"
)

const (
// documentIDAttributeName is the attribute name used to specify the document ID.
documentIDAttributeName = "elasticsearch.document_id"
)

type elasticsearchExporter struct {
component.TelemetrySettings
userAgent string
Expand Down Expand Up @@ -177,7 +182,9 @@ func (e *elasticsearchExporter) pushLogRecord(
if err != nil {
return fmt.Errorf("failed to encode log event: %w", err)
}
return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(document), nil)

docID := e.getDocumentIDAttribute(record.Attributes(), scope.Attributes(), resource.Attributes())
return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(document), nil, docID)
}

func (e *elasticsearchExporter) pushMetricsData(
Expand Down Expand Up @@ -300,7 +307,8 @@ func (e *elasticsearchExporter) pushMetricsData(
errs = append(errs, err)
continue
}
if err := session.Add(ctx, fIndex, bytes.NewReader(docBytes), doc.DynamicTemplates()); err != nil {

if err := session.Add(ctx, fIndex, bytes.NewReader(docBytes), doc.DynamicTemplates(), nil); err != nil {
if cerr := ctx.Err(); cerr != nil {
return cerr
}
Expand Down Expand Up @@ -415,7 +423,7 @@ func (e *elasticsearchExporter) pushTraceRecord(
if err != nil {
return fmt.Errorf("failed to encode trace record: %w", err)
}
return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(document), nil)
return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(document), nil, nil)
}

func (e *elasticsearchExporter) pushSpanEvent(
Expand Down Expand Up @@ -449,5 +457,15 @@ func (e *elasticsearchExporter) pushSpanEvent(
if err != nil {
return err
}
return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(docBytes), nil)
return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(docBytes), nil, nil)
}

func (e *elasticsearchExporter) getDocumentIDAttribute(attributeMaps ...pcommon.Map) *string {
if e.config.LogsDynamicID.Enabled {
docID, ok := getFromAttributes(documentIDAttributeName, "", attributeMaps...)
if docID != "" && ok {
return &docID
}
}
return nil
}
116 changes: 116 additions & 0 deletions exporter/elasticsearchexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,111 @@ func TestExporterLogs(t *testing.T) {
assert.JSONEq(t, `{"a":"a","a.b":"a.b"}`, gjson.GetBytes(doc, `scope.attributes`).Raw)
assert.JSONEq(t, `{"a":"a","a.b":"a.b"}`, gjson.GetBytes(doc, `resource.attributes`).Raw)
})

t.Run("publish logs with dynamic id", func(t *testing.T) {
t.Parallel()
exampleDocID := "abc123"
tableTests := []struct {
name string
expectedDocID *string // nil means the _id will not be set
recordAttrs map[string]any
scopeAttrs map[string]any
resourceAttrs map[string]any
}{
{
name: "missing document id attribute should not set _id",
expectedDocID: nil,
},
{
name: "record attributes",
expectedDocID: &exampleDocID,
recordAttrs: map[string]any{
documentIDAttributeName: exampleDocID,
},
},
{
name: "scope attributes",
expectedDocID: &exampleDocID,
scopeAttrs: map[string]any{
documentIDAttributeName: exampleDocID,
},
},
{
name: "resource attributes",
expectedDocID: &exampleDocID,
resourceAttrs: map[string]any{
documentIDAttributeName: exampleDocID,
},
},
{
name: "record attributes takes precedence over others",
expectedDocID: &exampleDocID,
recordAttrs: map[string]any{
documentIDAttributeName: exampleDocID,
},
scopeAttrs: map[string]any{
documentIDAttributeName: "id1",
},
resourceAttrs: map[string]any{
documentIDAttributeName: "id2",
},
},
{
name: "scope attributes takes precedence over resource attributes",
expectedDocID: &exampleDocID,
scopeAttrs: map[string]any{
documentIDAttributeName: exampleDocID,
},
resourceAttrs: map[string]any{
documentIDAttributeName: "id1",
},
},
}

cfgs := map[string]func(*Config){
"sync": func(cfg *Config) {
batcherEnabled := false
cfg.Batcher.Enabled = &batcherEnabled
},
"async": func(cfg *Config) {
batcherEnabled := true
cfg.Batcher.Enabled = &batcherEnabled
cfg.Batcher.FlushTimeout = 10 * time.Millisecond
},
}
for _, tt := range tableTests {
for cfgName, cfgFn := range cfgs {
t.Run(tt.name+"/"+cfgName, func(t *testing.T) {
t.Parallel()
rec := newBulkRecorder()
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
rec.Record(docs)

if tt.expectedDocID == nil {
assert.NotContains(t, string(docs[0].Action), "_id", "expected _id to not be set")
} else {
assert.Equal(t, *tt.expectedDocID, actionJSONToID(t, docs[0].Action), "expected _id to be set")
}
return itemsAllOK(docs)
})

exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) {
cfg.LogsDynamicID.Enabled = true
cfgFn(cfg)
})
logs := newLogsWithAttributes(
tt.recordAttrs,
tt.scopeAttrs,
tt.resourceAttrs,
)
logs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body().SetStr("hello world")
mustSendLogs(t, exporter, logs)

rec.WaitItems(1)
})
}
}
})
}

func TestExporterMetrics(t *testing.T) {
Expand Down Expand Up @@ -1909,3 +2014,14 @@ func actionJSONToIndex(t *testing.T, actionJSON json.RawMessage) string {
require.NoError(t, err)
return action.Create.Index
}

func actionJSONToID(t *testing.T, actionJSON json.RawMessage) string {
action := struct {
Create struct {
ID string `json:"_id"`
} `json:"create"`
}{}
err := json.Unmarshal(actionJSON, &action)
require.NoError(t, err)
return action.Create.ID
}
3 changes: 3 additions & 0 deletions exporter/elasticsearchexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ func createDefaultConfig() component.Config {
TracesDynamicIndex: DynamicIndexSetting{
Enabled: false,
},
LogsDynamicID: DynamicIndexSetting{
Enabled: false,
},
Retry: RetrySettings{
Enabled: true,
MaxRetries: 0, // default is set in exporter code
Expand Down
Loading