From c22c6578156543b699a4e2c360618dbc92db5272 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Wed, 8 Jan 2025 21:21:15 +0100 Subject: [PATCH] Use MockSchemaRegistry and remove custom mock --- .../junit4/TestTopologyRule.java | 23 ++++++++--------- .../junit4/WordCountWitherTest.java | 5 +--- .../junit5/TestTopologyExtension.java | 25 +++++++++---------- .../junit5/WordCountWitherTest.java | 5 +--- .../TestTopology.java | 3 ++- 5 files changed, 27 insertions(+), 34 deletions(-) diff --git a/fluent-kafka-streams-tests-junit4/src/main/java/com/bakdata/fluent_kafka_streams_tests/junit4/TestTopologyRule.java b/fluent-kafka-streams-tests-junit4/src/main/java/com/bakdata/fluent_kafka_streams_tests/junit4/TestTopologyRule.java index 198df5a..9e840f0 100644 --- a/fluent-kafka-streams-tests-junit4/src/main/java/com/bakdata/fluent_kafka_streams_tests/junit4/TestTopologyRule.java +++ b/fluent-kafka-streams-tests-junit4/src/main/java/com/bakdata/fluent_kafka_streams_tests/junit4/TestTopologyRule.java @@ -25,7 +25,6 @@ package com.bakdata.fluent_kafka_streams_tests.junit4; import com.bakdata.fluent_kafka_streams_tests.TestTopology; -import com.bakdata.schemaregistrymock.SchemaRegistryMock; import java.util.Map; import java.util.function.Function; import java.util.function.Supplier; @@ -105,8 +104,8 @@ protected TestTopologyRule( final Function, ? extends Topology> topologyFactory, final Function> propertiesFactory, final Serde defaultKeySerde, final Serde defaultValueSerde, - final SchemaRegistryMock schemaRegistryMock) { - super(topologyFactory, propertiesFactory, defaultKeySerde, defaultValueSerde, schemaRegistryMock); + final String schemaRegistryUrl) { + super(topologyFactory, propertiesFactory, defaultKeySerde, defaultValueSerde, schemaRegistryUrl); } @Override @@ -125,13 +124,8 @@ public void evaluate() throws Throwable { } @Override - protected TestTopologyRule with( - final Function, ? extends Topology> topologyFactory, - final Function> propertiesFactory, final Serde defaultKeySerde, - final Serde defaultValueSerde, - final SchemaRegistryMock schemaRegistryMock) { - return new TestTopologyRule<>(topologyFactory, propertiesFactory, defaultKeySerde, defaultValueSerde, - schemaRegistryMock); + public TestTopologyRule withSchemaRegistryUrl(final String schemaRegistryUrl) { + return (TestTopologyRule) super.withSchemaRegistryUrl(schemaRegistryUrl); } @Override @@ -151,8 +145,13 @@ public TestTopologyRule withDefaultSerde(final Serde defaultKeyS } @Override - public TestTopologyRule withSchemaRegistryUrl(final SchemaRegistryMock schemaRegistryMock) { - return (TestTopologyRule) super.withSchemaRegistryUrl(schemaRegistryMock); + protected TestTopologyRule with( + final Function, ? extends Topology> topologyFactory, + final Function> propertiesFactory, final Serde defaultKeySerde, + final Serde defaultValueSerde, + final String schemaRegistryUrl) { + return new TestTopologyRule<>(topologyFactory, propertiesFactory, defaultKeySerde, defaultValueSerde, + schemaRegistryUrl); } } diff --git a/fluent-kafka-streams-tests-junit4/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit4/WordCountWitherTest.java b/fluent-kafka-streams-tests-junit4/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit4/WordCountWitherTest.java index 1f67b67..1e4f0bd 100644 --- a/fluent-kafka-streams-tests-junit4/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit4/WordCountWitherTest.java +++ b/fluent-kafka-streams-tests-junit4/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit4/WordCountWitherTest.java @@ -25,9 +25,6 @@ package com.bakdata.fluent_kafka_streams_tests.junit4; import com.bakdata.fluent_kafka_streams_tests.junit4.test_applications.WordCount; -import com.bakdata.schemaregistrymock.SchemaRegistryMock; -import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider; -import java.util.List; import org.apache.kafka.common.serialization.Serdes; import org.junit.Rule; import org.junit.Test; @@ -39,7 +36,7 @@ public class WordCountWitherTest { public final TestTopologyRule testTopology = new TestTopologyRule<>(this.app.getTopology(), WordCount.getKafkaProperties()) .withDefaultValueSerde(Serdes.String()) - .withSchemaRegistryUrl(new SchemaRegistryMock(List.of(new AvroSchemaProvider()))); + .withSchemaRegistryUrl("mock://my-scope"); @Test public void shouldAggregateSameWordStream() { diff --git a/fluent-kafka-streams-tests-junit5/src/main/java/com/bakdata/fluent_kafka_streams_tests/junit5/TestTopologyExtension.java b/fluent-kafka-streams-tests-junit5/src/main/java/com/bakdata/fluent_kafka_streams_tests/junit5/TestTopologyExtension.java index 8cc0010..a2e4f1a 100644 --- a/fluent-kafka-streams-tests-junit5/src/main/java/com/bakdata/fluent_kafka_streams_tests/junit5/TestTopologyExtension.java +++ b/fluent-kafka-streams-tests-junit5/src/main/java/com/bakdata/fluent_kafka_streams_tests/junit5/TestTopologyExtension.java @@ -25,7 +25,6 @@ package com.bakdata.fluent_kafka_streams_tests.junit5; import com.bakdata.fluent_kafka_streams_tests.TestTopology; -import com.bakdata.schemaregistrymock.SchemaRegistryMock; import java.util.Map; import java.util.function.Function; import java.util.function.Supplier; @@ -106,8 +105,8 @@ protected TestTopologyExtension( final Function, ? extends Topology> topologyFactory, final Function> propertiesFactory, final Serde defaultKeySerde, final Serde defaultValueSerde, - final SchemaRegistryMock schemaRegistryMock) { - super(topologyFactory, propertiesFactory, defaultKeySerde, defaultValueSerde, schemaRegistryMock); + final String schemaRegistryUrl) { + super(topologyFactory, propertiesFactory, defaultKeySerde, defaultValueSerde, schemaRegistryUrl); } @Override @@ -121,13 +120,9 @@ public void beforeEach(final ExtensionContext context) { } @Override - protected TestTopology with( - final Function, ? extends Topology> topologyFactory, - final Function> propertiesFactory, final Serde defaultKeySerde, - final Serde defaultValueSerde, - final SchemaRegistryMock schemaRegistry) { - return new TestTopologyExtension<>(topologyFactory, propertiesFactory, defaultKeySerde, defaultValueSerde, - schemaRegistry); + public TestTopologyExtension withSchemaRegistryUrl( + final String schemaRegistryUrl) { + return (TestTopologyExtension) super.withSchemaRegistryUrl(schemaRegistryUrl); } @Override @@ -147,8 +142,12 @@ public TestTopologyExtension withDefaultSerde(final Serde defaul } @Override - public TestTopologyExtension withSchemaRegistryUrl( - final SchemaRegistryMock schemaRegistryMock) { - return (TestTopologyExtension) super.withSchemaRegistryUrl(schemaRegistryMock); + protected TestTopology with( + final Function, ? extends Topology> topologyFactory, + final Function> propertiesFactory, final Serde defaultKeySerde, + final Serde defaultValueSerde, + final String schemaRegistryUrl) { + return new TestTopologyExtension<>(topologyFactory, propertiesFactory, defaultKeySerde, defaultValueSerde, + schemaRegistryUrl); } } diff --git a/fluent-kafka-streams-tests-junit5/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit5/WordCountWitherTest.java b/fluent-kafka-streams-tests-junit5/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit5/WordCountWitherTest.java index 9ad610a..0ab6437 100644 --- a/fluent-kafka-streams-tests-junit5/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit5/WordCountWitherTest.java +++ b/fluent-kafka-streams-tests-junit5/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit5/WordCountWitherTest.java @@ -25,9 +25,6 @@ package com.bakdata.fluent_kafka_streams_tests.junit5; import com.bakdata.fluent_kafka_streams_tests.junit5.test_applications.WordCount; -import com.bakdata.schemaregistrymock.SchemaRegistryMock; -import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider; -import java.util.List; import org.apache.kafka.common.serialization.Serdes; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -39,7 +36,7 @@ class WordCountWitherTest { final TestTopologyExtension testTopology = new TestTopologyExtension<>(this.app::getTopology, WordCount.getKafkaProperties()) .withDefaultValueSerde(Serdes.String()) - .withSchemaRegistryUrl(new SchemaRegistryMock(List.of(new AvroSchemaProvider()))); + .withSchemaRegistryUrl("mock://my-scope"); @Test void shouldAggregateSameWordStream() { diff --git a/fluent-kafka-streams-tests/src/main/java/com/bakdata/fluent_kafka_streams_tests/TestTopology.java b/fluent-kafka-streams-tests/src/main/java/com/bakdata/fluent_kafka_streams_tests/TestTopology.java index 1a13150..1c57641 100644 --- a/fluent-kafka-streams-tests/src/main/java/com/bakdata/fluent_kafka_streams_tests/TestTopology.java +++ b/fluent-kafka-streams-tests/src/main/java/com/bakdata/fluent_kafka_streams_tests/TestTopology.java @@ -107,6 +107,7 @@ */ @Getter public class TestTopology { + private static final String DEFAULT_SCHEMA_REGISTRY_URL = "mock://"; private final String schemaRegistryUrl; private final Function, ? extends Topology> topologyFactory; private final Map properties = new HashMap<>(); @@ -146,7 +147,7 @@ protected TestTopology(final Function, ? extends Top */ public TestTopology(final Function, ? extends Topology> topologyFactory, final Function> propertiesFactory) { - this(topologyFactory, propertiesFactory, null, null, "mock://"); + this(topologyFactory, propertiesFactory, null, null, DEFAULT_SCHEMA_REGISTRY_URL); } /**