From aacbcee187d592a0c07a6ab7ed20cd8def3612c3 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Tue, 1 Oct 2024 14:08:11 +0100 Subject: [PATCH] add Akka Scheduler context propagation Update AkkaScheduleInstrumentation.java Update AkkaSchedulerTest.scala --- .../AkkaActorInstrumentationModule.java | 3 +- .../AkkaScheduleInstrumentation.java | 64 +++++++++++++++++++ .../akkaactor/AkkaSchedulerTest.scala | 48 ++++++++++++++ 3 files changed, 114 insertions(+), 1 deletion(-) create mode 100644 instrumentation/akka/akka-actor-2.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/akkaactor/AkkaScheduleInstrumentation.java create mode 100644 instrumentation/akka/akka-actor-2.3/javaagent/src/test/scala/io/opentelemetry/instrumentation/akkaactor/AkkaSchedulerTest.scala diff --git a/instrumentation/akka/akka-actor-2.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/akkaactor/AkkaActorInstrumentationModule.java b/instrumentation/akka/akka-actor-2.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/akkaactor/AkkaActorInstrumentationModule.java index 0bae483ce168..6ac3e4d2b9d9 100644 --- a/instrumentation/akka/akka-actor-2.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/akkaactor/AkkaActorInstrumentationModule.java +++ b/instrumentation/akka/akka-actor-2.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/akkaactor/AkkaActorInstrumentationModule.java @@ -23,6 +23,7 @@ public List typeInstrumentations() { return asList( new AkkaDispatcherInstrumentation(), new AkkaActorCellInstrumentation(), - new AkkaDefaultSystemMessageQueueInstrumentation()); + new AkkaDefaultSystemMessageQueueInstrumentation(), + new AkkaScheduleInstrumentation()); } } diff --git a/instrumentation/akka/akka-actor-2.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/akkaactor/AkkaScheduleInstrumentation.java b/instrumentation/akka/akka-actor-2.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/akkaactor/AkkaScheduleInstrumentation.java new file mode 100644 index 000000000000..7a2c6230dcd7 --- /dev/null +++ b/instrumentation/akka/akka-actor-2.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/akkaactor/AkkaScheduleInstrumentation.java @@ -0,0 +1,64 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.akkaactor; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; + +import io.opentelemetry.context.Context; +import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge; +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; + +public class AkkaScheduleInstrumentation implements TypeInstrumentation { + + @Override + public ElementMatcher typeMatcher() { + return named("akka.actor.LightArrayRevolverScheduler"); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + named("schedule") + .and(takesArgument(0, named("scala.concurrent.duration.FiniteDuration"))) + .and(takesArgument(1, named("scala.concurrent.duration.FiniteDuration"))) + .and(takesArgument(2, named("java.lang.Runnable"))) + .and(takesArgument(3, named("scala.concurrent.ExecutionContext"))), + AkkaScheduleInstrumentation.class.getName() + "$ScheduleAdvice"); + transformer.applyAdviceToMethod( + named("scheduleOnce") + .and(takesArgument(0, named("scala.concurrent.duration.FiniteDuration"))) + .and(takesArgument(1, named("java.lang.Runnable"))) + .and(takesArgument(2, named("scala.concurrent.ExecutionContext"))), + AkkaScheduleInstrumentation.class.getName() + "$ScheduleOnceAdvice"); + } + + @SuppressWarnings("unused") + public static class ScheduleAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void enterSchedule( + @Advice.Argument(value = 2, readOnly = false) Runnable runnable) { + Context context = Java8BytecodeBridge.currentContext(); + runnable = context.wrap(runnable); + } + } + + @SuppressWarnings("unused") + public static class ScheduleOnceAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void enterScheduleOnce( + @Advice.Argument(value = 1, readOnly = false) Runnable runnable) { + Context context = Java8BytecodeBridge.currentContext(); + runnable = context.wrap(runnable); + } + } +} diff --git a/instrumentation/akka/akka-actor-2.3/javaagent/src/test/scala/io/opentelemetry/instrumentation/akkaactor/AkkaSchedulerTest.scala b/instrumentation/akka/akka-actor-2.3/javaagent/src/test/scala/io/opentelemetry/instrumentation/akkaactor/AkkaSchedulerTest.scala new file mode 100644 index 000000000000..a8276ba8f627 --- /dev/null +++ b/instrumentation/akka/akka-actor-2.3/javaagent/src/test/scala/io/opentelemetry/instrumentation/akkaactor/AkkaSchedulerTest.scala @@ -0,0 +1,48 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.akkaactor + +import akka.pattern.after +import io.opentelemetry.api.GlobalOpenTelemetry +import io.opentelemetry.api.trace.Span +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.Test +import scala.concurrent.{Await, Future} +import scala.concurrent.duration.DurationInt + +class AkkaSchedulerTest { + + @Test + def checkThatSpanWorksWithAkkaScheduledEvents(): Unit = { + val system = AkkaActors.system + implicit val executionContext = system.dispatcher + val tracer = GlobalOpenTelemetry.get.getTracer("test-tracer") + val initialSpan = tracer.spanBuilder("test").startSpan() + val scope = initialSpan.makeCurrent() + try { + val futureResult = for { + result1 <- Future { + compareSpanContexts(Span.current(), initialSpan) + 1 + } + _ = compareSpanContexts(Span.current(), initialSpan) + result2 <- after(200.millis, system.scheduler)(Future.successful(2)) + _ = compareSpanContexts(Span.current(), initialSpan) + } yield result1 + result2 + assertThat(Await.result(futureResult, 5.seconds)).isEqualTo(3) + } finally { + scope.close() + initialSpan.end() + } + } + + private def compareSpanContexts(span1: Span, span2: Span): Unit = { + assertThat(span1.getSpanContext().getTraceId()) + .isEqualTo(span2.getSpanContext().getTraceId()) + assertThat(span1.getSpanContext().getSpanId()) + .isEqualTo(span2.getSpanContext().getSpanId()) + } +}