diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index f0d0a52ea7869..99dbbd0c3365b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -927,6 +927,7 @@ private Future doSend(ProducerRecord record, Callback call try { serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key()); } catch (ClassCastException cce) { + sender.sendError(record.topic(), 1); throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() + " specified in key.serializer", cce); @@ -935,6 +936,7 @@ private Future doSend(ProducerRecord record, Callback call try { serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value()); } catch (ClassCastException cce) { + sender.sendError(record.topic(), 1); throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() + " specified in value.serializer", cce); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index c4811a573790c..d739eccca6d04 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -811,6 +811,13 @@ public void onComplete(ClientResponse response) { log.trace("Sent produce request to {}: {}", nodeId, requestBuilder); } + /** + * Send record error as error metric to sensor + */ + public void sendError(String topic, int count) { + this.sensors.recordErrors(topic, count); + } + /** * Wake up the selector associated with this send thread */