Skip to content

Commit

Permalink
[kafka] limit when messaging.kafka.message.key attribute is set to …
Browse files Browse the repository at this point in the history
…basic types (#3379)
  • Loading branch information
lachmatt authored Apr 22, 2024
1 parent 5227a33 commit bac0813
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

using System.Diagnostics;
using System.Globalization;
using System.Text;
using OpenTelemetry.AutoInstrumentation.DuckTyping;
using OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.DuckTypes;
Expand Down Expand Up @@ -116,6 +117,16 @@ public static void SetDeliveryResults(Activity activity, IDeliveryResult deliver
deliveryResult.Offset.Value);
}

internal static string? ExtractMessageKeyValue(object key)
{
return key switch
{
string s => s,
int or uint or long or ulong or float or double or decimal => Convert.ToString(key, CultureInfo.InvariantCulture),
_ => null
};
}

private static void SetCommonAttributes(
Activity activity,
string operationName,
Expand All @@ -138,7 +149,11 @@ private static void SetCommonAttributes(

if (key is not null)
{
activity.SetTag(MessagingAttributes.Keys.Kafka.MessageKey, key);
var keyValue = ExtractMessageKeyValue(key);
if (keyValue is not null)
{
activity.SetTag(MessagingAttributes.Keys.Kafka.MessageKey, keyValue);
}
}

if (partition is not null)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

using System.Globalization;
using FluentAssertions;
using OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka;
using Xunit;

namespace OpenTelemetry.AutoInstrumentation.Tests;

public class KafkaInstrumentationTests
{
[Theory]
[InlineData("abc")]
[InlineData(int.MaxValue)]
[InlineData(uint.MaxValue)]
[InlineData(long.MaxValue)]
[InlineData(ulong.MaxValue)]
[InlineData(float.MaxValue)]
[InlineData(double.MaxValue)]
public void MessageKeyValueIsExtractedForBasicType(object value)
{
KafkaInstrumentation.ExtractMessageKeyValue(value).Should().Be(Convert.ToString(value, CultureInfo.InvariantCulture));
}

[Fact]
public void MessageKeyValueIsExtractedForDecimal()
{
decimal input = decimal.MaxValue;
KafkaInstrumentation.ExtractMessageKeyValue(input).Should().Be(Convert.ToString(input, CultureInfo.InvariantCulture));
}

[Fact]
public void MessageKeyValueIsNotExtractedForUnrecognizedType()
{
var value = new byte[] { 1, 2, 3 };
KafkaInstrumentation.ExtractMessageKeyValue(value).Should().BeNull();
}
}

0 comments on commit bac0813

Please sign in to comment.