Skip to content

Commit

Permalink
Merge pull request #2204 from newrelic/sqs-distributed-trace
Browse files Browse the repository at this point in the history
Sqs distributed trace
  • Loading branch information
obenkenobi authored Jan 21, 2025
2 parents 88a742b + d68e95d commit d5e2196
Show file tree
Hide file tree
Showing 17 changed files with 526 additions and 53 deletions.
5 changes: 3 additions & 2 deletions instrumentation/aws-java-sdk-sqs-1.10.44/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ dependencies {


jar {
manifest { attributes 'Implementation-Title': 'com.newrelic.instrumentation.aws-java-sdk-sqs-1.10.44' }
manifest { attributes 'Implementation-Title': 'com.newrelic.instrumentation.aws-java-sdk-sqs-spans-1.10.44', 'Enabled': 'false',
'Implementation-Title-Alias': 'aws-java-sdk-sqs' }
}

verifyInstrumentation {
Expand All @@ -25,4 +26,4 @@ verifyInstrumentation {
site {
title 'AWS SQS'
type 'Messaging'
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import com.amazonaws.services.sqs.model.SendMessageBatchRequest;
import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry;
import com.amazonaws.services.sqs.model.SendMessageBatchResult;
import com.amazonaws.services.sqs.model.SendMessageRequest;
import com.amazonaws.services.sqs.model.SendMessageResult;
Expand All @@ -20,29 +21,48 @@
import com.newrelic.api.agent.weaver.MatchType;
import com.newrelic.api.agent.weaver.Weave;
import com.newrelic.api.agent.weaver.Weaver;
import com.newrelic.utils.MetricUtil;
import com.newrelic.utils.SQSBatchRequestHeaders;
import com.newrelic.utils.SQSRequestHeaders;
import com.newrelic.utils.SqsV1Util;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

@Weave(type = MatchType.Interface, originalName = "com.amazonaws.services.sqs.AmazonSQS")
public class AmazonSQS_Instrumentation {

@Trace
public SendMessageBatchResult sendMessageBatch(SendMessageBatchRequest sendMessageBatchRequest) {
MessageProduceParameters messageProduceParameters = MetricUtil.generateExternalProduceMetrics(sendMessageBatchRequest.getQueueUrl());
for (SendMessageBatchRequestEntry request : sendMessageBatchRequest.getEntries()) {
SQSBatchRequestHeaders headers = new SQSBatchRequestHeaders(request);
NewRelic.getAgent().getTransaction().insertDistributedTraceHeaders(headers);
}

MessageProduceParameters messageProduceParameters = SqsV1Util.generateExternalProduceMetrics(sendMessageBatchRequest.getQueueUrl());
NewRelic.getAgent().getTracedMethod().reportAsExternal(messageProduceParameters);
return Weaver.callOriginal();
}

@Trace
public SendMessageResult sendMessage(SendMessageRequest sendMessageRequest) {
MessageProduceParameters messageProduceParameters = MetricUtil.generateExternalProduceMetrics(sendMessageRequest.getQueueUrl());
SQSRequestHeaders headers = new SQSRequestHeaders(sendMessageRequest);
NewRelic.getAgent().getTransaction().insertDistributedTraceHeaders(headers);

MessageProduceParameters messageProduceParameters = SqsV1Util.generateExternalProduceMetrics(sendMessageRequest.getQueueUrl());
NewRelic.getAgent().getTracedMethod().reportAsExternal(messageProduceParameters);
return Weaver.callOriginal();
}

@Trace
public ReceiveMessageResult receiveMessage(ReceiveMessageRequest receiveMessageRequest) {
MessageConsumeParameters messageConsumeParameters = MetricUtil.generateExternalConsumeMetrics(receiveMessageRequest.getQueueUrl());
List<String> updatedMessageAttrNames = new ArrayList<>(receiveMessageRequest.getMessageAttributeNames());
Collections.addAll(updatedMessageAttrNames, SqsV1Util.DT_HEADERS);
receiveMessageRequest.setMessageAttributeNames(updatedMessageAttrNames);

MessageConsumeParameters messageConsumeParameters = SqsV1Util.generateExternalConsumeMetrics(receiveMessageRequest.getQueueUrl());
NewRelic.getAgent().getTracedMethod().reportAsExternal(messageConsumeParameters);
return Weaver.callOriginal();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package com.newrelic.utils;

import com.amazonaws.services.sqs.model.MessageAttributeValue;
import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry;
import com.newrelic.api.agent.HeaderType;
import com.newrelic.api.agent.Headers;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;

public class SQSBatchRequestHeaders implements Headers {
private final SendMessageBatchRequestEntry requestEntry;

public SQSBatchRequestHeaders(SendMessageBatchRequestEntry re) {
requestEntry = re;
}

@Override
public HeaderType getHeaderType() {
return HeaderType.MESSAGE;
}

@Override
public String getHeader(String name) {
Map<String, MessageAttributeValue> attributes = requestEntry.getMessageAttributes();
if (attributes != null) {
MessageAttributeValue value = attributes.get(name);
if (value != null) {
String dataType = value.getDataType();
if (dataType.equalsIgnoreCase("String")) {
String stringValue = value.getStringValue();
if (stringValue != null) {
return stringValue;
}
}
}
}
return null;
}

@Override
public Collection<String> getHeaders(String name) {
List<String> list = new ArrayList<String>();
String value = getHeader(name);
if (value != null && !value.isEmpty()) {
list.add(value);
}
return list;
}

@Override
public void setHeader(String name, String value) {
if (requestEntry != null) {
requestEntry.addMessageAttributesEntry(name, new MessageAttributeValue().withDataType("String").withStringValue(value));
}
}

@Override
public void addHeader(String name, String value) {
if (requestEntry != null) {
requestEntry.addMessageAttributesEntry(name, new MessageAttributeValue().withDataType("String").withStringValue(value));
}
}

@Override
public Collection<String> getHeaderNames() {
if (requestEntry != null) {
Map<String, MessageAttributeValue> attributes = requestEntry.getMessageAttributes();
if (attributes != null) {
return attributes.keySet();
}
}
return Collections.emptyList();
}

@Override
public boolean containsHeader(String name) {
return getHeaderNames().contains(name);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package com.newrelic.utils;

import com.amazonaws.services.sqs.model.MessageAttributeValue;
import com.amazonaws.services.sqs.model.SendMessageRequest;
import com.newrelic.api.agent.HeaderType;
import com.newrelic.api.agent.Headers;

import java.util.Collection;
import java.util.Collections;
import java.util.Map;

public class SQSRequestHeaders implements Headers {
private final SendMessageRequest request;

public SQSRequestHeaders(SendMessageRequest req) {
request = req;
}

@Override
public HeaderType getHeaderType() {
return HeaderType.MESSAGE;
}

@Override
public String getHeader(String name) {
Map<String, MessageAttributeValue> messageAttributes = request.getMessageAttributes();
if (messageAttributes != null) {
MessageAttributeValue value = messageAttributes.get(name);
if (value != null && value.getDataType().equalsIgnoreCase("string")) {
return value.getStringValue();
}
}
Map<String, String> customRequestHeaders = request.getCustomRequestHeaders();
if (customRequestHeaders != null) {
return customRequestHeaders.get(name);
}
return null;
}

@Override
public Collection<String> getHeaders(String name) {
String value = getHeader(name);
if (value != null) {
return Collections.singletonList(value);
}
return Collections.emptyList();
}

@Override
public void setHeader(String name, String value) {
if (request != null) {
Map<String, MessageAttributeValue> existingAttributes = request.getMessageAttributes();
if (!existingAttributes.containsKey(name)) {
request.addMessageAttributesEntry(name, new MessageAttributeValue().withDataType("String").withStringValue(value));
}
}
}

@Override
public void addHeader(String name, String value) {
if (request != null) {
Map<String, MessageAttributeValue> existingAttributes = request.getMessageAttributes();
if (!existingAttributes.containsKey(name)) {
request.addMessageAttributesEntry(name, new MessageAttributeValue().withDataType("String").withStringValue(value));
}
}
}

@Override
public Collection<String> getHeaderNames() {
Map<String, MessageAttributeValue> messageAttributes = request.getMessageAttributes();
return messageAttributes.keySet();
}

@Override
public boolean containsHeader(String name) {
return getHeaderNames().contains(name);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@
import com.newrelic.api.agent.Segment;
import com.newrelic.api.agent.weaver.Weaver;

public class MetricUtil {
public class SqsV1Util {

public static final String LIBRARY = "SQS";
public static final String OTEL_LIBRARY = "aws_sqs";
public static final String[] DT_HEADERS = new String[] {"newrelic","NEWRELIC","NewRelic","tracestate","TraceState","TRACESTATE"};

public static MessageProduceParameters generateExternalProduceMetrics(String queueUrl) {
DestinationData destinationData = DestinationData.parse(queueUrl);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@
import com.amazonaws.services.sqs.model.CreateQueueResult;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.SendMessageBatchRequest;
import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry;
import com.amazonaws.services.sqs.model.SendMessageRequest;
import com.newrelic.agent.introspec.InstrumentationTestConfig;
import com.newrelic.agent.introspec.InstrumentationTestRunner;
import com.newrelic.agent.introspec.Introspector;
import com.newrelic.agent.introspec.TracedMetricData;
import com.newrelic.api.agent.Trace;
import com.newrelic.utils.SqsV1Util;
import org.elasticmq.NodeAddress;
import org.elasticmq.rest.sqs.SQSRestServer;
import org.elasticmq.rest.sqs.SQSRestServerBuilder;
Expand All @@ -27,10 +29,16 @@
import org.junit.Test;
import org.junit.runner.RunWith;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

@RunWith(InstrumentationTestRunner.class)
@InstrumentationTestConfig(includePrefixes = { "com.amazonaws.services.sqs" }, configName = "dt_enabled.yml")
@InstrumentationTestConfig(includePrefixes = { "com.amazonaws.services.sqs", "com.newrelic.utils" }, configName = "dt_enabled.yml")
public class SqsClientTest {

private static AmazonSQSClient sqsClient;
Expand Down Expand Up @@ -67,7 +75,13 @@ public static void afterClass() {
@Test
public void testSendMessage() {
Introspector introspector = InstrumentationTestRunner.getIntrospector();
sendMessageRequest();
SendMessageRequest request = sendMessageRequest();

Set<String> dtHeaders = new HashSet<>(Arrays.asList(SqsV1Util.DT_HEADERS));
boolean containsDtHeaders = request.getMessageAttributes().entrySet().stream().anyMatch(e ->
dtHeaders.contains(e.getKey()) && e.getValue() != null && !e.getValue().getStringValue().isEmpty());
assertTrue("Message request must contain headers", containsDtHeaders);

assertEquals(1, introspector.getFinishedTransactionCount(10000));

String txName = introspector.getTransactionNames().iterator().next();
Expand All @@ -77,9 +91,17 @@ public void testSendMessage() {
@Test
public void testSendMessageBatch() {
Introspector introspector = InstrumentationTestRunner.getIntrospector();
sendMessageBatch();
SendMessageBatchRequest request = sendMessageBatch();
assertEquals(1, introspector.getFinishedTransactionCount(10000));

Set<String> dtHeaders = new HashSet<>(Arrays.asList(SqsV1Util.DT_HEADERS));
assertFalse("Batch request must contain at least one entry", request.getEntries().isEmpty());
for (SendMessageBatchRequestEntry entry: request.getEntries()) {
boolean containsDtHeaders = entry.getMessageAttributes().entrySet().stream().anyMatch(e ->
dtHeaders.contains(e.getKey()) && e.getValue() != null && !e.getValue().getStringValue().isEmpty());
assertTrue("Message entry must contain headers", containsDtHeaders);
}

String txName = introspector.getTransactionNames().iterator().next();
checkScopedMetricCount(txName, "MessageBroker/SQS/Queue/Produce/Named/" + QUEUE_NAME, 1);
}
Expand All @@ -95,15 +117,24 @@ public void testReceiveMessage() {
}

@Trace(dispatcher = true)
private void sendMessageRequest() {
private SendMessageRequest sendMessageRequest() {
SendMessageRequest request = (new SendMessageRequest()).withQueueUrl(queueUrl).withMessageBody("body");
sqsClient.sendMessage(request);
return request;
}

@Trace(dispatcher = true)
private void sendMessageBatch() {
SendMessageBatchRequest request = (new SendMessageBatchRequest()).withQueueUrl(queueUrl);
sqsClient.sendMessageBatch(request);
private SendMessageBatchRequest sendMessageBatch() {
SendMessageBatchRequestEntry entry = new SendMessageBatchRequestEntry();
SendMessageBatchRequest request = (new SendMessageBatchRequest()).withQueueUrl(queueUrl)
.withEntries(entry);
try {
sqsClient.sendMessageBatch(request);
} catch (Exception e) {
// Do nothing
}

return request;
}

@Trace(dispatcher = true)
Expand Down
Loading

0 comments on commit d5e2196

Please sign in to comment.