Skip to content

Commit

Permalink
[LI-HOTFIX] Add serialization error metric in record-errors (#437) (#441
Browse files Browse the repository at this point in the history
)

TICKET = LIKAFKA-49998
LI_DESCRIPTION = Add producer metric for serialization error
EXIT_CRITERIA = N/A

Co-authored-by: Qi Liu <[email protected]>
  • Loading branch information
Q1Liu and Q1Liu authored Feb 22, 2023
1 parent dec729a commit d8e93c3
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -954,6 +954,7 @@ private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback call
try {
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
} catch (ClassCastException cce) {
sender.sendError(record.topic(), 1);
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
" specified in key.serializer", cce);
Expand All @@ -962,6 +963,7 @@ private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback call
try {
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
} catch (ClassCastException cce) {
sender.sendError(record.topic(), 1);
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
" specified in value.serializer", cce);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -846,6 +846,13 @@ private void sendProduceRequest(long now, int destination, short acks, int timeo
log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
}

/**
* Send record error as error metric to sensor
*/
public void sendError(String topic, int count) {
this.sensors.recordErrors(topic, count);
}

/**
* Wake up the selector associated with this send thread
*/
Expand Down

0 comments on commit d8e93c3

Please sign in to comment.