Skip to content

Commit

Permalink
smallrye#2756 Add test cases which fail due to serializer/deserialize…
Browse files Browse the repository at this point in the history
…r not being initialized.
  • Loading branch information
diversit committed Sep 16, 2024
1 parent 8ad49bc commit c2ed366
Showing 1 changed file with 63 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,53 @@

import static org.assertj.core.api.Assertions.assertThat;

import io.smallrye.reactive.messaging.kafka.companion.test.KafkaCompanionTestBase;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.junit.jupiter.api.Test;

import io.smallrye.reactive.messaging.kafka.companion.test.KafkaCompanionTestBase;
import java.util.Map;

public class SerdesTest extends KafkaCompanionTestBase {

@Test
void testRegisteredSerdeShouldBeConfigured() {
// Custom Serde which Serialize/Deserializer throw exception if not configured
companion.registerSerde(Person.class, new Serde<>() {
private PersonDeserializer personDeserializer = new PersonDeserializer();
private PersonSerializer personSerializer = new PersonSerializer();

@Override
public Serializer<Person> serializer() {
return personSerializer;
}

@Override
public Deserializer<Person> deserializer() {
return personDeserializer;
}

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// Configure the serializer and deserializer
personSerializer.configure(configs, isKey);
personDeserializer.configure(configs, isKey);
}
});

companion.produce(Person.class).fromRecords(
new ProducerRecord<>(topic, new Person("1", 30)),
new ProducerRecord<>(topic, new Person("2", 25)),
new ProducerRecord<>(topic, new Person("3", 18))).awaitCompletion();

ConsumerBuilder<String, Person> consumer = companion.consumeWithDeserializers(PersonDeserializer.class.getName());
ConsumerTask<String, Person> task = consumer.fromTopics(topic, 3).awaitCompletion();
assertThat(task.getRecords()).hasSize(3);
}

@Test
void testProduceRegisteredSerdeConsumeWithSerdeName() {
companion.registerSerde(Person.class, new PersonSerializer(), new PersonDeserializer());
Expand Down Expand Up @@ -80,9 +118,20 @@ public Person(String id, int age) {

public static class PersonSerializer implements Serializer<Person> {

private boolean isConfigured = false;

@Override
public byte[] serialize(String s, Person person) {
return (person.id + "|" + person.age).getBytes();
if (isConfigured) {
return (person.id + "|" + person.age).getBytes();
} else {
throw new SerializationException("Serializer not configured");
}
}

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
isConfigured = true;
}
}

Expand All @@ -102,11 +151,21 @@ public T deserialize(String s, byte[] bytes) {
}

public static class PersonDeserializer implements Deserializer<Person> {
private boolean isConfigured = false;

@Override
public Person deserialize(String s, byte[] bytes) {
String[] split = new String(bytes).split("\\|");
return new Person(split[0], Integer.parseInt(split[1]));
if (isConfigured) {
String[] split = new String(bytes).split("\\|");
return new Person(split[0], Integer.parseInt(split[1]));
} else {
throw new SerializationException("Deserializer not configured");
}
}

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
isConfigured = true;
}
}
}

0 comments on commit c2ed366

Please sign in to comment.