Skip to content

Commit

Permalink
Pulsar EOP Fix
Browse files Browse the repository at this point in the history
Enables message deduplication and batch index level ack on broker level, as recommended for exactly-once processing.

But without activating those two EOP and dead letter queue tests seems to be stable with Pulsar 3.0.0
  • Loading branch information
ozangunalp committed Jun 14, 2023
1 parent 3b0caa0 commit 5de92d2
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public CompletionStage<Void> ack() {
return stream;
}
})
.onItem().transformToUniAndConcatenate(m -> Uni.createFrom().completionStage(m.getAck()))
.onItem().transformToUniAndMerge(m -> Uni.createFrom().completionStage(m.getAck()))
.toUni().subscribeAsCompletionStage();
}

Expand All @@ -85,7 +85,7 @@ public Supplier<CompletionStage<Void>> getAck() {
@Override
public CompletionStage<Void> nack(Throwable reason, Metadata metadata) {
return Multi.createFrom().iterable(incomingMessages)
.onItem().transformToUniAndConcatenate(m -> Uni.createFrom().completionStage(m.nack(reason, metadata)))
.onItem().transformToUniAndMerge(m -> Uni.createFrom().completionStage(m.nack(reason, metadata)))
.toUni().subscribeAsCompletionStage();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ protected void containerIsStarting(InspectContainerResponse containerInfo, boole
command += "export PULSAR_PREFIX_advertisedListeners=" + advertisedListeners + " \n";
command += "export PULSAR_PREFIX_transactionCoordinatorEnabled=true\n";
command += "export PULSAR_PREFIX_systemTopicEnabled=true\n";
command += "export PULSAR_PREFIX_brokerDeduplicationEnabled=true\n";
command += "export PULSAR_PREFIX_brokerDeduplicationEntriesInterval=100\n";
command += "export PULSAR_PREFIX_brokerDeduplicationSnapshotIntervalSeconds=3000\n";
command += "export PULSAR_PREFIX_acknowledgmentAtBatchIndexLevelEnabled=true\n";
command += "bin/apply-config-from-env.py conf/standalone.conf && bin/pulsar standalone -nfw -nss";
copyFileToContainer(
Transferable.of(command.getBytes(StandardCharsets.UTF_8), 700),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ void testReconsumeLater() throws PulsarClientException {

@Test
void testDeadLetterTopic() throws PulsarClientException {
addBeans(PulsarReconsumeLater.Factory.class);
// Run app
FailingConsumingApp app = runApplication(config()
.with("mp.messaging.incoming.data.failure-strategy", "nack")
Expand All @@ -126,13 +125,14 @@ void testDeadLetterTopic() throws PulsarClientException {
.with("mp.messaging.incoming.data.deadLetterPolicy.maxRedeliverCount", 2)
.with("mp.messaging.incoming.data.deadLetterPolicy.deadLetterTopic", topic + "-dlq")
.with("mp.messaging.incoming.data.deadLetterPolicy.initialSubscriptionName", "initial-dlq-sub")
.with("mp.messaging.incoming.data.deadLetterPolicy.batchIndexAckEnabled", true)
.with("mp.messaging.incoming.data.subscriptionType", "Shared"), FailingConsumingApp.class);
// Produce messages
send(client.newProducer(Schema.INT32)
.producerName("test-producer")
.enableBatching(false)
.topic(topic)
.create(), NUMBER_OF_MESSAGES, i -> i);
.create(), NUMBER_OF_MESSAGES, (i, p) -> p.newMessage().sequenceId(i).value(i));

// Check for consumed messages in app
await().untilAsserted(() -> {
Expand All @@ -142,9 +142,11 @@ void testDeadLetterTopic() throws PulsarClientException {

List<Message<Integer>> retries = new CopyOnWriteArrayList<>();
receive(client.newConsumer(Schema.INT32)
.consumerName("dlq-consumer")
.topic(topic + "-dlq")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscriptionName("initial-dlq-sub")
.enableBatchIndexAcknowledgment(true)
.subscribe()).subscribe().with(retries::add);

// Check for retried messages
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,13 @@
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.OnOverflow;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.pulsar.PulsarConnector;
import io.smallrye.reactive.messaging.pulsar.PulsarIncomingBatchMessage;
import io.smallrye.reactive.messaging.pulsar.PulsarMessage;
import io.smallrye.reactive.messaging.pulsar.TestTags;
import io.smallrye.reactive.messaging.pulsar.base.WeldTestBase;
import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;

Expand Down Expand Up @@ -92,17 +88,13 @@ Uni<Void> process(PulsarIncomingBatchMessage<Integer> batch) {
}

/**
* TODO Disabled test
* For exactly-once processing the broker needs to enable the Message deduplication with
* broker config <code>brokerDeduplicationEnabled=true</code> and producer `sendTimeoutMs` needs to be `0`.
*
* However only this doesn't solve the issue of duplicated items.
*
* There is also batch index level acknowledgement to avoid duplicated items which can be enabled with,
* The broker config <code>acknowledgmentAtBatchIndexLevelEnabled=true</code> and consumer config `batchIndexAckEnable` to
* `true.
*
* There are still duplicate items delivered to the consumer batch after an transaction abort.
*/
@Test
void testExactlyOnceProcessorWithProcessingError() throws PulsarAdminException, PulsarClientException {
Expand All @@ -125,8 +117,6 @@ void testExactlyOnceProcessorWithProcessingError() throws PulsarAdminException,
.enableBatchIndexAcknowledgment(true)
.subscribe()).subscribe().with(messages -> {
for (Message<Integer> message : messages) {
System.out.println(
"-received " + message.getSequenceId() + " - " + message.getKey() + ":" + message.getValue());
list.add(message.getValue());
}
});
Expand Down

0 comments on commit 5de92d2

Please sign in to comment.