diff --git a/smallrye-reactive-messaging-kafka-test-companion/src/test/java/io/smallrye/reactive/messaging/kafka/companion/SerdesTest.java b/smallrye-reactive-messaging-kafka-test-companion/src/test/java/io/smallrye/reactive/messaging/kafka/companion/SerdesTest.java index 1219b8b04..6796c0a4b 100644 --- a/smallrye-reactive-messaging-kafka-test-companion/src/test/java/io/smallrye/reactive/messaging/kafka/companion/SerdesTest.java +++ b/smallrye-reactive-messaging-kafka-test-companion/src/test/java/io/smallrye/reactive/messaging/kafka/companion/SerdesTest.java @@ -2,15 +2,53 @@ import static org.assertj.core.api.Assertions.assertThat; +import io.smallrye.reactive.messaging.kafka.companion.test.KafkaCompanionTestBase; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serializer; import org.junit.jupiter.api.Test; -import io.smallrye.reactive.messaging.kafka.companion.test.KafkaCompanionTestBase; +import java.util.Map; public class SerdesTest extends KafkaCompanionTestBase { + @Test + void testRegisteredSerdeShouldBeConfigured() { + // Custom Serde which Serialize/Deserializer throw exception if not configured + companion.registerSerde(Person.class, new Serde<>() { + private PersonDeserializer personDeserializer = new PersonDeserializer(); + private PersonSerializer personSerializer = new PersonSerializer(); + + @Override + public Serializer serializer() { + return personSerializer; + } + + @Override + public Deserializer deserializer() { + return personDeserializer; + } + + @Override + public void configure(Map configs, boolean isKey) { + // Configure the serializer and deserializer + personSerializer.configure(configs, isKey); + personDeserializer.configure(configs, isKey); + } + }); + + companion.produce(Person.class).fromRecords( + new ProducerRecord<>(topic, new Person("1", 30)), + new ProducerRecord<>(topic, new Person("2", 25)), + new ProducerRecord<>(topic, new Person("3", 18))).awaitCompletion(); + + ConsumerBuilder consumer = companion.consumeWithDeserializers(PersonDeserializer.class.getName()); + ConsumerTask task = consumer.fromTopics(topic, 3).awaitCompletion(); + assertThat(task.getRecords()).hasSize(3); + } + @Test void testProduceRegisteredSerdeConsumeWithSerdeName() { companion.registerSerde(Person.class, new PersonSerializer(), new PersonDeserializer()); @@ -80,9 +118,20 @@ public Person(String id, int age) { public static class PersonSerializer implements Serializer { + private boolean isConfigured = false; + @Override public byte[] serialize(String s, Person person) { - return (person.id + "|" + person.age).getBytes(); + if (isConfigured) { + return (person.id + "|" + person.age).getBytes(); + } else { + throw new SerializationException("Serializer not configured"); + } + } + + @Override + public void configure(Map configs, boolean isKey) { + isConfigured = true; } } @@ -102,11 +151,21 @@ public T deserialize(String s, byte[] bytes) { } public static class PersonDeserializer implements Deserializer { + private boolean isConfigured = false; @Override public Person deserialize(String s, byte[] bytes) { - String[] split = new String(bytes).split("\\|"); - return new Person(split[0], Integer.parseInt(split[1])); + if (isConfigured) { + String[] split = new String(bytes).split("\\|"); + return new Person(split[0], Integer.parseInt(split[1])); + } else { + throw new SerializationException("Deserializer not configured"); + } + } + + @Override + public void configure(Map configs, boolean isKey) { + isConfigured = true; } } }