Skip to content

Commit

Permalink
[enhance][pulsar] add apache-pulsar client support (#5926)
Browse files Browse the repository at this point in the history
Fix:
#2107

Motivation:
Support apache pulsar client from version 2.8.0 to lastest.

---------

Co-authored-by: daojun <[email protected]>
  • Loading branch information
tjiuming and dao-jun authored Mar 7, 2023
1 parent deafc5f commit dc2c4f6
Show file tree
Hide file tree
Showing 16 changed files with 1,459 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
plugins {
id("otel.javaagent-instrumentation")
}

muzzle {
pass {
group.set("org.apache.pulsar")
module.set("pulsar-client")
versions.set("[2.8.0,)")
assertInverse.set(true)
}
}

dependencies {
library("org.apache.pulsar:pulsar-client:2.8.0")

testImplementation("javax.annotation:javax.annotation-api:1.3.2")
testImplementation("org.testcontainers:pulsar:1.17.1")
testImplementation("org.apache.pulsar:pulsar-client-admin:2.8.0")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.pulsar.v28;

import static io.opentelemetry.javaagent.instrumentation.pulsar.v28.telemetry.PulsarSingletons.startAndEndConsumerReceive;
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isProtected;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.namedOneOf;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import io.opentelemetry.context.Context;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import java.util.concurrent.CompletableFuture;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.PulsarClientImpl;

public class ConsumerImplInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return namedOneOf(
"org.apache.pulsar.client.impl.ConsumerImpl",
"org.apache.pulsar.client.impl.MultiTopicsConsumerImpl");
}

@Override
public void transform(TypeTransformer transformer) {
String klassName = ConsumerImplInstrumentation.class.getName();

transformer.applyAdviceToMethod(isConstructor(), klassName + "$ConsumerConstructorAdviser");

// internalReceive will apply to Consumer#receive(long,TimeUnit)
// and called before MessageListener#receive.
transformer.applyAdviceToMethod(
isMethod()
.and(isProtected())
.and(named("internalReceive"))
.and(takesArguments(2))
.and(takesArgument(1, named("java.util.concurrent.TimeUnit"))),
klassName + "$ConsumerInternalReceiveAdviser");
// receive/batchReceive will apply to Consumer#receive()/Consumer#batchReceive()
transformer.applyAdviceToMethod(
isMethod()
.and(isPublic())
.and(namedOneOf("receive", "batchReceive"))
.and(takesArguments(0)),
klassName + "$ConsumerSyncReceiveAdviser");
// receiveAsync/batchReceiveAsync will apply to
// Consumer#receiveAsync()/Consumer#batchReceiveAsync()
transformer.applyAdviceToMethod(
isMethod()
.and(isPublic())
.and(namedOneOf("receiveAsync", "batchReceiveAsync"))
.and(takesArguments(0)),
klassName + "$ConsumerAsyncReceiveAdviser");
}

@SuppressWarnings("unused")
public static class ConsumerConstructorAdviser {
private ConsumerConstructorAdviser() {}

@Advice.OnMethodExit(suppress = Throwable.class)
public static void after(
@Advice.This Consumer<?> consumer, @Advice.Argument(value = 0) PulsarClient client) {

PulsarClientImpl pulsarClient = (PulsarClientImpl) client;
String url = pulsarClient.getLookup().getServiceUrl();
VirtualFieldStore.inject(consumer, url);
}
}

@SuppressWarnings("unused")
public static class ConsumerInternalReceiveAdviser {
private ConsumerInternalReceiveAdviser() {}

@Advice.OnMethodEnter
public static void before(@Advice.Local(value = "startTime") long startTime) {
startTime = System.currentTimeMillis();
}

@Advice.OnMethodExit(onThrowable = Throwable.class)
public static void after(
@Advice.This Consumer<?> consumer,
@Advice.Return Message<?> message,
@Advice.Thrown Throwable t,
@Advice.Local(value = "startTime") long startTime) {
if (t != null) {
return;
}

Context parent = Context.current();
Context current = startAndEndConsumerReceive(parent, message, startTime, consumer);
if (current != null) {
// ConsumerBase#internalReceive(long,TimeUnit) will be called before
// ConsumerListener#receive(Consumer,Message), so, need to inject Context into Message.
VirtualFieldStore.inject(message, current);
}
}
}

@SuppressWarnings("unused")
public static class ConsumerSyncReceiveAdviser {
private ConsumerSyncReceiveAdviser() {}

@Advice.OnMethodEnter
public static void before(@Advice.Local(value = "startTime") long startTime) {
startTime = System.currentTimeMillis();
}

@Advice.OnMethodExit(onThrowable = Throwable.class)
public static void after(
@Advice.This Consumer<?> consumer,
@Advice.Return Message<?> message,
@Advice.Thrown Throwable t,
@Advice.Local(value = "startTime") long startTime) {
if (t != null) {
return;
}

Context parent = Context.current();
startAndEndConsumerReceive(parent, message, startTime, consumer);
// No need to inject context to message.
}
}

@SuppressWarnings("unused")
public static class ConsumerAsyncReceiveAdviser {
private ConsumerAsyncReceiveAdviser() {}

@Advice.OnMethodEnter
public static void before(@Advice.Local(value = "startTime") long startTime) {
startTime = System.currentTimeMillis();
}

@Advice.OnMethodExit(onThrowable = Throwable.class)
public static void after(
@Advice.This Consumer<?> consumer,
@Advice.Return CompletableFuture<Message<?>> future,
@Advice.Local(value = "startTime") long startTime) {
future.whenComplete(
(message, t) -> {
if (t != null) {
return;
}

Context parent = Context.current();
startAndEndConsumerReceive(parent, message, startTime, consumer);
// No need to inject context to message.
});
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.pulsar.v28;

import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.pulsar.client.api.Message;

public class MessageInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("org.apache.pulsar.client.impl.MessageImpl");
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isMethod().and(isPublic()).and(named("recycle")).and(takesArguments(0)),
MessageInstrumentation.class.getName() + "$MessageRecycleAdvice");
}

@SuppressWarnings("unused")
public static class MessageRecycleAdvice {
private MessageRecycleAdvice() {}

@Advice.OnMethodExit
public static void after(@Advice.This Message<?> message) {
// Clean context to prevent memory leak.
VirtualFieldStore.inject(message, null);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.pulsar.v28;

import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import io.opentelemetry.javaagent.instrumentation.pulsar.v28.telemetry.PulsarSingletons;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.implementation.bytecode.assign.Assigner;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;

public class MessageListenerInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
// return hasSuperType(named("org.apache.pulsar.client.api.MessageListener"));
// can't enhance MessageListener here like above due to jvm can't enhance lambda.
return named("org.apache.pulsar.client.impl.conf.ConsumerConfigurationData");
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isMethod().and(isPublic()).and(named("getMessageListener")),
MessageListenerInstrumentation.class.getName() + "$ConsumerConfigurationDataMethodAdviser");
}

@SuppressWarnings("unused")
public static class ConsumerConfigurationDataMethodAdviser {
private ConsumerConfigurationDataMethodAdviser() {}

@Advice.OnMethodExit
public static void after(
@Advice.This ConsumerConfigurationData<?> data,
@Advice.Return(readOnly = false, typing = Assigner.Typing.DYNAMIC)
MessageListener<?> listener) {
if (listener == null) {
return;
}

listener = new MessageListenerWrapper<>(listener);
}
}

public static class MessageListenerWrapper<T> implements MessageListener<T> {
private static final long serialVersionUID = 1L;

private final MessageListener<T> delegator;

public MessageListenerWrapper(MessageListener<T> messageListener) {
this.delegator = messageListener;
}

@Override
public void received(Consumer<T> consumer, Message<T> msg) {
Context parent = VirtualFieldStore.extract(msg);

Instrumenter<Message<?>, Attributes> instrumenter =
PulsarSingletons.consumerListenerInstrumenter();
if (!instrumenter.shouldStart(parent, msg)) {
this.delegator.received(consumer, msg);
return;
}

Context current = instrumenter.start(parent, msg);
try (Scope scope = current.makeCurrent()) {
this.delegator.received(consumer, msg);
instrumenter.end(current, msg, null, null);
} catch (Throwable t) {
instrumenter.end(current, msg, null, t);
throw t;
}
}

@Override
public void reachedEndOfTopic(Consumer<T> consumer) {
this.delegator.reachedEndOfTopic(consumer);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.pulsar.v28;

public class ProducerData {
public final String url;
public final String topic;

private ProducerData(String url, String topic) {
this.url = url;
this.topic = topic;
}

public static ProducerData create(String url, String topic) {
return new ProducerData(url, topic);
}
}
Loading

0 comments on commit dc2c4f6

Please sign in to comment.