diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 74b93b1e826dc..21486978f8290 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -954,6 +954,7 @@ private Future doSend(ProducerRecord record, Callback call try { serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key()); } catch (ClassCastException cce) { + sender.sendError(record.topic(), 1); throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() + " specified in key.serializer", cce); @@ -962,6 +963,7 @@ private Future doSend(ProducerRecord record, Callback call try { serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value()); } catch (ClassCastException cce) { + sender.sendError(record.topic(), 1); throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() + " specified in value.serializer", cce); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 2fccc6aab59a1..95694d416581d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -846,6 +846,13 @@ private void sendProduceRequest(long now, int destination, short acks, int timeo log.trace("Sent produce request to {}: {}", nodeId, requestBuilder); } + /** + * Send record error as error metric to sensor + */ + public void sendError(String topic, int count) { + this.sensors.recordErrors(topic, count); + } + /** * Wake up the selector associated with this send thread */