diff --git a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarIncomingBatchMessage.java b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarIncomingBatchMessage.java index daae983c3f..9f01746408 100644 --- a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarIncomingBatchMessage.java +++ b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarIncomingBatchMessage.java @@ -73,7 +73,7 @@ public CompletionStage ack() { return stream; } }) - .onItem().transformToUniAndConcatenate(m -> Uni.createFrom().completionStage(m.getAck())) + .onItem().transformToUniAndMerge(m -> Uni.createFrom().completionStage(m.getAck())) .toUni().subscribeAsCompletionStage(); } @@ -85,7 +85,7 @@ public Supplier> getAck() { @Override public CompletionStage nack(Throwable reason, Metadata metadata) { return Multi.createFrom().iterable(incomingMessages) - .onItem().transformToUniAndConcatenate(m -> Uni.createFrom().completionStage(m.nack(reason, metadata))) + .onItem().transformToUniAndMerge(m -> Uni.createFrom().completionStage(m.nack(reason, metadata))) .toUni().subscribeAsCompletionStage(); } diff --git a/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/base/PulsarContainer.java b/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/base/PulsarContainer.java index 108e7dfbb9..e31837e619 100644 --- a/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/base/PulsarContainer.java +++ b/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/base/PulsarContainer.java @@ -47,6 +47,10 @@ protected void containerIsStarting(InspectContainerResponse containerInfo, boole command += "export PULSAR_PREFIX_advertisedListeners=" + advertisedListeners + " \n"; command += "export PULSAR_PREFIX_transactionCoordinatorEnabled=true\n"; command += "export PULSAR_PREFIX_systemTopicEnabled=true\n"; + command += "export PULSAR_PREFIX_brokerDeduplicationEnabled=true\n"; + command += "export PULSAR_PREFIX_brokerDeduplicationEntriesInterval=100\n"; + command += "export PULSAR_PREFIX_brokerDeduplicationSnapshotIntervalSeconds=3000\n"; + command += "export PULSAR_PREFIX_acknowledgmentAtBatchIndexLevelEnabled=true\n"; command += "bin/apply-config-from-env.py conf/standalone.conf && bin/pulsar standalone -nfw -nss"; copyFileToContainer( Transferable.of(command.getBytes(StandardCharsets.UTF_8), 700), diff --git a/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/fault/PulsarNackTest.java b/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/fault/PulsarNackTest.java index 9781573f52..c7a4dfb37a 100644 --- a/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/fault/PulsarNackTest.java +++ b/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/fault/PulsarNackTest.java @@ -117,7 +117,6 @@ void testReconsumeLater() throws PulsarClientException { @Test void testDeadLetterTopic() throws PulsarClientException { - addBeans(PulsarReconsumeLater.Factory.class); // Run app FailingConsumingApp app = runApplication(config() .with("mp.messaging.incoming.data.failure-strategy", "nack") @@ -126,13 +125,14 @@ void testDeadLetterTopic() throws PulsarClientException { .with("mp.messaging.incoming.data.deadLetterPolicy.maxRedeliverCount", 2) .with("mp.messaging.incoming.data.deadLetterPolicy.deadLetterTopic", topic + "-dlq") .with("mp.messaging.incoming.data.deadLetterPolicy.initialSubscriptionName", "initial-dlq-sub") + .with("mp.messaging.incoming.data.deadLetterPolicy.batchIndexAckEnabled", true) .with("mp.messaging.incoming.data.subscriptionType", "Shared"), FailingConsumingApp.class); // Produce messages send(client.newProducer(Schema.INT32) .producerName("test-producer") .enableBatching(false) .topic(topic) - .create(), NUMBER_OF_MESSAGES, i -> i); + .create(), NUMBER_OF_MESSAGES, (i, p) -> p.newMessage().sequenceId(i).value(i)); // Check for consumed messages in app await().untilAsserted(() -> { @@ -142,9 +142,11 @@ void testDeadLetterTopic() throws PulsarClientException { List> retries = new CopyOnWriteArrayList<>(); receive(client.newConsumer(Schema.INT32) + .consumerName("dlq-consumer") .topic(topic + "-dlq") .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .subscriptionName("initial-dlq-sub") + .enableBatchIndexAcknowledgment(true) .subscribe()).subscribe().with(retries::add); // Check for retried messages diff --git a/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/transactions/ExactlyOnceProcessingBatchTest.java b/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/transactions/ExactlyOnceProcessingBatchTest.java index 0b4fc529ee..009a303ef0 100644 --- a/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/transactions/ExactlyOnceProcessingBatchTest.java +++ b/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/transactions/ExactlyOnceProcessingBatchTest.java @@ -25,9 +25,6 @@ import org.eclipse.microprofile.reactive.messaging.Channel; import org.eclipse.microprofile.reactive.messaging.Incoming; import org.eclipse.microprofile.reactive.messaging.OnOverflow; -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.RepeatedTest; -import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import io.smallrye.common.annotation.Identifier; @@ -35,7 +32,6 @@ import io.smallrye.reactive.messaging.pulsar.PulsarConnector; import io.smallrye.reactive.messaging.pulsar.PulsarIncomingBatchMessage; import io.smallrye.reactive.messaging.pulsar.PulsarMessage; -import io.smallrye.reactive.messaging.pulsar.TestTags; import io.smallrye.reactive.messaging.pulsar.base.WeldTestBase; import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; @@ -92,17 +88,13 @@ Uni process(PulsarIncomingBatchMessage batch) { } /** - * TODO Disabled test * For exactly-once processing the broker needs to enable the Message deduplication with * broker config brokerDeduplicationEnabled=true and producer `sendTimeoutMs` needs to be `0`. * - * However only this doesn't solve the issue of duplicated items. - * * There is also batch index level acknowledgement to avoid duplicated items which can be enabled with, * The broker config acknowledgmentAtBatchIndexLevelEnabled=true and consumer config `batchIndexAckEnable` to * `true. * - * There are still duplicate items delivered to the consumer batch after an transaction abort. */ @Test void testExactlyOnceProcessorWithProcessingError() throws PulsarAdminException, PulsarClientException { @@ -125,8 +117,6 @@ void testExactlyOnceProcessorWithProcessingError() throws PulsarAdminException, .enableBatchIndexAcknowledgment(true) .subscribe()).subscribe().with(messages -> { for (Message message : messages) { - System.out.println( - "-received " + message.getSequenceId() + " - " + message.getKey() + ":" + message.getValue()); list.add(message.getValue()); } });