Skip to content

Commit

Permalink
Make the msgSendLatencySensor non-static (#499)
Browse files Browse the repository at this point in the history
The msgSendLatencySensor sensor is used to add the following metrics together with client-id tags to
a single KafkaProducer:

MetricName [name=message-produce-latency-avg, group=producer-metrics, description=The average
latency between record queuing and get acknowledged in ms, tags={client-id=client-id1}],
MetricName [name=message-produce-latency-max, group=producer-metrics, description=The average latency between record queuing and get acknowledged in ms, tags={client-id=client-id1}],

The current implematation uses a static variable for the msgSendLatencySensor field,
which means all KafkaProducer objects in the same process share the same sensor,
and hence can interfere with each other.

This PR fixes the problem by making the msgSendLatencySensor a non-static field.
  • Loading branch information
gitlw authored Dec 22, 2023
1 parent ce545b4 commit 84e7571
Showing 1 changed file with 11 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
private static final String JMX_PREFIX = "kafka.producer";
public static final String NETWORK_THREAD_PREFIX = "kafka-producer-network-thread";
public static final String PRODUCER_METRIC_GROUP_NAME = "producer-metrics";
private static Sensor msgSendLatencySensor = null;
private final Sensor msgSendLatencySensor;

private final String clientId;
// Visible for testing
Expand Down Expand Up @@ -955,7 +955,7 @@ private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback call
log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
}
// producer callback will make sure to call both 'callback' and interceptor callback
Callback interceptCallback = new InterceptorCallback<>(callback, startTime, time, this.interceptors, tp);
Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp, startTime, time, this.msgSendLatencySensor);

if (transactionManager != null && transactionManager.isTransactional()) {
transactionManager.failIfNotReadyForSend();
Expand All @@ -972,7 +972,7 @@ private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback call
log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
}
// producer callback will make sure to call both 'callback' and interceptor callback
interceptCallback = new InterceptorCallback<>(callback, startTime, time, this.interceptors, tp);
interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp, startTime, time, this.msgSendLatencySensor);

result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs, false);
Expand Down Expand Up @@ -1343,9 +1343,6 @@ private static class ClusterAndWaitTime {
}
}

public static void recordMsgProducingLatency(long start, long now) {
msgSendLatencySensor.record(now - start, now);
}

private static class FutureFailure implements Future<RecordMetadata> {

Expand Down Expand Up @@ -1393,16 +1390,20 @@ private static class InterceptorCallback<K, V> implements Callback {
private final Time time;
private final long startTime;

private InterceptorCallback(Callback userCallback, ProducerInterceptors<K, V> interceptors, TopicPartition tp) {
this(userCallback, Long.MIN_VALUE, null, interceptors, tp);
}
private final Sensor msgSendLatencySensor;


private InterceptorCallback(Callback userCallback, long startTime, Time time, ProducerInterceptors<K, V> interceptors, TopicPartition tp) {
private InterceptorCallback(Callback userCallback, ProducerInterceptors<K, V> interceptors, TopicPartition tp, long startTime, Time time, Sensor msgSendLatencySensor) {
this.userCallback = userCallback;
this.interceptors = interceptors;
this.tp = tp;
this.time = time;
this.startTime = startTime;
this.msgSendLatencySensor = msgSendLatencySensor;
}

public void recordMsgProducingLatency(long start, long now) {
msgSendLatencySensor.record(now - start, now);
}

public void onCompletion(RecordMetadata metadata, Exception exception) {
Expand Down

0 comments on commit 84e7571

Please sign in to comment.