From b91d442b544712fa9f125e6ba14ca394c87213ab Mon Sep 17 00:00:00 2001 From: Qi Liu Date: Tue, 21 Feb 2023 18:25:46 -0500 Subject: [PATCH] [LI-HOTFIX] Add serialization error metric in record-errors (#437) TICKET = LIKAFKA-49998 LI_DESCRIPTION = Add producer metric for serialization error EXIT_CRITERIA = N/A Co-authored-by: Qi Liu --- .../org/apache/kafka/clients/producer/KafkaProducer.java | 2 ++ .../apache/kafka/clients/producer/internals/Sender.java | 7 +++++++ 2 files changed, 9 insertions(+) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index f0d0a52ea7869..99dbbd0c3365b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -927,6 +927,7 @@ private Future doSend(ProducerRecord record, Callback call try { serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key()); } catch (ClassCastException cce) { + sender.sendError(record.topic(), 1); throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() + " specified in key.serializer", cce); @@ -935,6 +936,7 @@ private Future doSend(ProducerRecord record, Callback call try { serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value()); } catch (ClassCastException cce) { + sender.sendError(record.topic(), 1); throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() + " specified in value.serializer", cce); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index c4811a573790c..d739eccca6d04 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -811,6 +811,13 @@ public void onComplete(ClientResponse response) { log.trace("Sent produce request to {}: {}", nodeId, requestBuilder); } + /** + * Send record error as error metric to sensor + */ + public void sendError(String topic, int count) { + this.sensors.recordErrors(topic, count); + } + /** * Wake up the selector associated with this send thread */