From ead439324775e152e9eb199a9d3409ed4d77a2d7 Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Mon, 9 Dec 2024 16:51:26 +0100 Subject: [PATCH] SQS connector batch send #2672, #2884 --- .../main/docs/sqs/sending-aws-sqs-messages.md | 14 ++ .../aws/sqs/BatchResultErrorException.java | 16 ++ .../messaging/aws/sqs/SqsConnector.java | 3 + .../messaging/aws/sqs/SqsOutboundChannel.java | 100 ++++++++- .../aws/sqs/BatchOutboundMessageTest.java | 204 ++++++++++++++++++ .../aws/sqs/OutboundMessageTest.java | 4 +- .../messaging/aws/sqs/SqsTestBase.java | 33 ++- 7 files changed, 365 insertions(+), 9 deletions(-) create mode 100644 smallrye-reactive-messaging-aws-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/BatchResultErrorException.java create mode 100644 smallrye-reactive-messaging-aws-sqs/src/test/java/io/smallrye/reactive/messaging/aws/sqs/BatchOutboundMessageTest.java diff --git a/documentation/src/main/docs/sqs/sending-aws-sqs-messages.md b/documentation/src/main/docs/sqs/sending-aws-sqs-messages.md index 809b87492..14c8cfd11 100644 --- a/documentation/src/main/docs/sqs/sending-aws-sqs-messages.md +++ b/documentation/src/main/docs/sqs/sending-aws-sqs-messages.md @@ -21,6 +21,20 @@ explicitly specify metadata on the outgoing message: {{ insert('sqs/outbound/SqsMessageStringProducer.java') }} ``` +## Sending messages in batch + +You can configure the outbound channel to send messages in batch of maximum 10 messages (AWS SQS limitation). + +You can customize the size of batches, `10` being the default batch size, and the delay to wait for new messages to be added to the batch, 3000ms being the default delay: + +``` java +mp.messaging.outgoing.prices.connector=smallrye-sqs +mp.messaging.outgoing.prices.queue=prices +mp.messaging.outgoing.prices.batch=true +mp.messaging.outgoing.prices.batch-size=5 +mp.messaging.outgoing.prices.batch-delay=3000 +``` + ## Serialization When sending a `Message`, the connector converts the message into a AWS SQS Message. diff --git a/smallrye-reactive-messaging-aws-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/BatchResultErrorException.java b/smallrye-reactive-messaging-aws-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/BatchResultErrorException.java new file mode 100644 index 000000000..1773fa2ec --- /dev/null +++ b/smallrye-reactive-messaging-aws-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/BatchResultErrorException.java @@ -0,0 +1,16 @@ +package io.smallrye.reactive.messaging.aws.sqs; + +import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry; + +/** + * Exception thrown when a send message batch result contains an error. + * + * @see BatchResultErrorEntry + */ +public class BatchResultErrorException extends Exception { + + public BatchResultErrorException(BatchResultErrorEntry entry) { + super("BatchResultError " + entry.code() + " " + entry.message() + ", senderFault = " + entry.senderFault()); + } + +} diff --git a/smallrye-reactive-messaging-aws-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/SqsConnector.java b/smallrye-reactive-messaging-aws-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/SqsConnector.java index 10026ab79..95f48a7e6 100644 --- a/smallrye-reactive-messaging-aws-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/SqsConnector.java +++ b/smallrye-reactive-messaging-aws-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/SqsConnector.java @@ -40,6 +40,9 @@ @ConnectorAttribute(name = "health-enabled", type = "boolean", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "Whether health reporting is enabled (default) or disabled", defaultValue = "true") @ConnectorAttribute(name = "group.id", type = "string", direction = ConnectorAttribute.Direction.OUTGOING, description = "When set, sends messages with the specified group id") +@ConnectorAttribute(name = "batch", type = "boolean", direction = ConnectorAttribute.Direction.OUTGOING, description = "When set, sends messages in batches of maximum 10 messages", defaultValue = "false") +@ConnectorAttribute(name = "batch-size", type = "int", direction = ConnectorAttribute.Direction.OUTGOING, description = "In batch send mode, the maximum number of messages to include in batch, currently SQS maximum is 10 messages", defaultValue = "10") +@ConnectorAttribute(name = "batch-delay", type = "int", direction = ConnectorAttribute.Direction.OUTGOING, description = "In batch send mode, the maximum delay in milliseconds to wait for messages to be included in the batch", defaultValue = "3000") @ConnectorAttribute(name = "wait-time-seconds", type = "int", direction = ConnectorAttribute.Direction.INCOMING, description = "The maximum amount of time in seconds to wait for messages to be received", defaultValue = "1") @ConnectorAttribute(name = "max-number-of-messages", type = "int", direction = ConnectorAttribute.Direction.INCOMING, description = "The maximum number of messages to receive", defaultValue = "10") diff --git a/smallrye-reactive-messaging-aws-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/SqsOutboundChannel.java b/smallrye-reactive-messaging-aws-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/SqsOutboundChannel.java index a5ef36beb..29d627cbc 100644 --- a/smallrye-reactive-messaging-aws-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/SqsOutboundChannel.java +++ b/smallrye-reactive-messaging-aws-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/SqsOutboundChannel.java @@ -1,5 +1,6 @@ package io.smallrye.reactive.messaging.aws.sqs; +import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -8,9 +9,11 @@ import java.util.concurrent.Flow; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.eclipse.microprofile.reactive.messaging.Message; +import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; import io.smallrye.reactive.messaging.OutgoingMessageMetadata; import io.smallrye.reactive.messaging.aws.sqs.i18n.AwsSqsLogging; @@ -18,8 +21,13 @@ import io.smallrye.reactive.messaging.json.JsonMapping; import io.smallrye.reactive.messaging.providers.helpers.MultiUtils; import software.amazon.awssdk.services.sqs.SqsAsyncClient; +import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry; import software.amazon.awssdk.services.sqs.model.MessageAttributeValue; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchResultEntry; import software.amazon.awssdk.services.sqs.model.SendMessageRequest; +import software.amazon.awssdk.services.sqs.model.SendMessageResponse; public class SqsOutboundChannel { @@ -32,17 +40,31 @@ public class SqsOutboundChannel { private final List failures = new ArrayList<>(); private final boolean healthEnabled; private final String groupId; + private final boolean batch; + private final Duration batchDelay; + private final int batchSize; public SqsOutboundChannel(SqsConnectorOutgoingConfiguration conf, SqsManager sqsManager, JsonMapping jsonMapping) { this.channel = conf.getChannel(); this.healthEnabled = conf.getHealthEnabled(); this.client = sqsManager.getClient(conf); + this.batch = conf.getBatch(); + this.batchSize = conf.getBatchSize(); + this.batchDelay = Duration.ofMillis(conf.getBatchDelay()); this.queueUrlUni = sqsManager.getQueueUrl(conf).memoize().indefinitely(); this.groupId = conf.getGroupId().orElse(null); this.jsonMapping = jsonMapping; this.subscriber = MultiUtils.via(multi -> multi .onSubscription().call(s -> queueUrlUni) - .call(m -> publishMessage(this.client, m)) + .plug(stream -> { + if (batch) { + return stream.group().intoLists().of(batchSize, batchDelay) + .call(l -> publishMessage(this.client, l)) + .onItem().transformToMultiAndConcatenate(l -> Multi.createFrom().iterable(l)); + } else { + return stream.call(m -> publishMessage(this.client, m)); + } + }) .onFailure().invoke(f -> { AwsSqsLogging.log.unableToDispatch(channel, f); reportFailure(f); @@ -73,6 +95,82 @@ private Uni publishMessage(SqsAsyncClient client, Message m) { }); } + private Uni publishMessage(SqsAsyncClient client, List> messages) { + if (closed.get()) { + return Uni.createFrom().voidItem(); + } + if (messages.isEmpty()) { + return Uni.createFrom().nullItem(); + } + if (messages.size() == 1) { + return publishMessage(client, messages.get(0)); + } + return queueUrlUni.map(queueUrl -> getSendMessageRequest(queueUrl, messages)) + .chain(request -> Uni.createFrom().completionStage(() -> client.sendMessageBatch(request))) + .onItem().transformToUni(response -> { + List> results = new ArrayList<>(); + for (BatchResultErrorEntry entry : response.failed()) { + int index = Integer.parseInt(entry.id()); + if (messages.size() > index) { + Message m = messages.get(index); + results.add(Uni.createFrom().completionStage(m.nack(new BatchResultErrorException(entry)))); + } + } + for (SendMessageBatchResultEntry entry : response.successful()) { + int index = Integer.parseInt(entry.id()); + if (messages.size() > index) { + Message m = messages.get(index); + SendMessageResponse r = SendMessageResponse.builder() + .messageId(entry.messageId()) + .sequenceNumber(entry.sequenceNumber()) + .md5OfMessageBody(entry.md5OfMessageBody()) + .md5OfMessageAttributes(entry.md5OfMessageAttributes()) + .md5OfMessageSystemAttributes(entry.md5OfMessageSystemAttributes()) + .build(); + AwsSqsLogging.log.messageSentToChannel(channel, r.messageId(), r.sequenceNumber()); + OutgoingMessageMetadata.setResultOnMessage(m, r); + results.add(Uni.createFrom().completionStage(m.ack())); + } + } + return Uni.combine().all().unis(results).discardItems(); + }) + .onFailure().recoverWithUni(t -> { + List> results = new ArrayList<>(); + for (Message m : messages) { + results.add(Uni.createFrom().completionStage(m.nack(t))); + } + return Uni.combine().all().unis(results).discardItems(); + }); + } + + private SendMessageBatchRequest getSendMessageRequest(String channelQueueUrl, List> messages) { + List entries = getSendMessageBatchEntry(channelQueueUrl, messages); + return SendMessageBatchRequest.builder() + .entries(entries) + .queueUrl(channelQueueUrl) + .build(); + } + + private List getSendMessageBatchEntry(String channelQueueUrl, List> messages) { + // Use message index in the list as the id to identify the message in the batch result. + return IntStream.range(0, messages.size()) + .mapToObj(i -> sendMessageBatchRequestEntry(channelQueueUrl, String.valueOf(i), messages.get(i))) + .collect(Collectors.toList()); + } + + private SendMessageBatchRequestEntry sendMessageBatchRequestEntry(String channelQueueUrl, String id, Message message) { + SendMessageRequest request = getSendMessageRequest(channelQueueUrl, message); + return SendMessageBatchRequestEntry.builder() + .id(id) + .delaySeconds(request.delaySeconds()) + .messageAttributes(request.messageAttributes()) + .messageGroupId(request.messageGroupId()) + .messageDeduplicationId(request.messageDeduplicationId()) + .messageSystemAttributes(request.messageSystemAttributes()) + .messageBody(request.messageBody()) + .build(); + } + private SendMessageRequest getSendMessageRequest(String channelQueueUrl, Message m) { Object payload = m.getPayload(); String queueUrl = channelQueueUrl; diff --git a/smallrye-reactive-messaging-aws-sqs/src/test/java/io/smallrye/reactive/messaging/aws/sqs/BatchOutboundMessageTest.java b/smallrye-reactive-messaging-aws-sqs/src/test/java/io/smallrye/reactive/messaging/aws/sqs/BatchOutboundMessageTest.java new file mode 100644 index 000000000..a1cda65ae --- /dev/null +++ b/smallrye-reactive-messaging-aws-sqs/src/test/java/io/smallrye/reactive/messaging/aws/sqs/BatchOutboundMessageTest.java @@ -0,0 +1,204 @@ +package io.smallrye.reactive.messaging.aws.sqs; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Message; +import org.eclipse.microprofile.reactive.messaging.Metadata; +import org.eclipse.microprofile.reactive.messaging.Outgoing; +import org.junit.jupiter.api.Test; + +import io.smallrye.mutiny.Multi; +import io.smallrye.reactive.messaging.GenericPayload; +import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; +import software.amazon.awssdk.services.sqs.model.MessageAttributeValue; +import software.amazon.awssdk.services.sqs.model.QueueAttributeName; +import software.amazon.awssdk.services.sqs.model.SendMessageRequest; + +public class BatchOutboundMessageTest extends SqsTestBase { + + @Test + void testOutboundMessage() { + SqsClientProvider.client = getSqsClient(); + addBeans(SqsClientProvider.class); + int expected = 10; + String queueUrl = createQueue(queue); + MapBasedConfig config = new MapBasedConfig() + .with("mp.messaging.outgoing.sink.connector", SqsConnector.CONNECTOR_NAME) + .with("mp.messaging.outgoing.sink.queue", queue) + .with("mp.messaging.outgoing.sink.batch", true); + + runApplication(config, RequestBuilderProducingApp.class); + var received = receiveMessages(queueUrl, r -> r.messageAttributeNames("key"), expected, Duration.ofSeconds(10)); + assertThat(received).hasSize(10) + .allSatisfy(m -> assertThat(m.messageAttributes()).containsKey("key")) + .extracting(m -> m.body()).containsExactly("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"); + } + + @Test + void testOutboundMessageBatchSizeDelay() { + SqsClientProvider.client = getSqsClient(); + addBeans(SqsClientProvider.class); + int expected = 10; + String queueUrl = createQueue(queue); + MapBasedConfig config = new MapBasedConfig() + .with("mp.messaging.outgoing.sink.connector", SqsConnector.CONNECTOR_NAME) + .with("mp.messaging.outgoing.sink.queue", queue) + .with("mp.messaging.outgoing.sink.batch", true) + .with("mp.messaging.outgoing.sink.batch-size", 3) + .with("mp.messaging.outgoing.sink.batch-delay", 1000); + + runApplication(config, RequestBuilderProducingApp.class); + var received = receiveMessages(queueUrl, r -> r.messageAttributeNames("key"), expected, Duration.ofSeconds(10)); + assertThat(received).hasSize(10) + .allSatisfy(m -> assertThat(m.messageAttributes()).containsKey("key")) + .extracting(m -> m.body()).containsExactly("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"); + } + + @Test + void testOutboundMetadataMessage() { + SqsClientProvider.client = getSqsClient(); + addBeans(SqsClientProvider.class); + int expected = 10; + String queueUrl = createQueue(queue); + MapBasedConfig config = new MapBasedConfig() + .with("mp.messaging.outgoing.sink.connector", SqsConnector.CONNECTOR_NAME) + .with("mp.messaging.outgoing.sink.queue", queue) + .with("mp.messaging.outgoing.sink.batch", true); + + runApplication(config, OutgoingMetadataProducingApp.class); + var received = receiveMessages(queueUrl, r -> r.messageAttributeNames("key"), expected, Duration.ofSeconds(10)); + assertThat(received).hasSize(10) + .allSatisfy(m -> assertThat(m.messageAttributes()).containsKey("key")) + .extracting(m -> m.body()).containsExactly("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"); + } + + @Test + void testMessage() { + SqsClientProvider.client = getSqsClient(); + addBeans(SqsClientProvider.class); + int expected = 10; + String queueSink = queue + "-sink.fifo"; + queue = queue + ".fifo"; + String queueUrl = createQueue(queue, r -> r.attributes( + Map.of(QueueAttributeName.FIFO_QUEUE, "true", + QueueAttributeName.DEDUPLICATION_SCOPE, "messageGroup"))); + + sendMessage(queueUrl, 10, (i, r) -> r.messageGroupId("group") + .messageDeduplicationId("m-" + i) + .messageAttributes(Map.of("key", MessageAttributeValue.builder() + .dataType("String").stringValue("value").build())) + .messageBody(String.valueOf(i))); + + String queueSinkUrl = createQueue(queueSink, r -> r.attributes(Map.of(QueueAttributeName.FIFO_QUEUE, "true", + QueueAttributeName.DEDUPLICATION_SCOPE, "messageGroup"))); + MapBasedConfig config = new MapBasedConfig() + .with("mp.messaging.incoming.data.connector", SqsConnector.CONNECTOR_NAME) + .with("mp.messaging.incoming.data.queue", queue) + .with("mp.messaging.outgoing.sink.connector", SqsConnector.CONNECTOR_NAME) + .with("mp.messaging.outgoing.sink.queue", queueSink) + .with("mp.messaging.outgoing.sink.batch", true); + + runApplication(config, MessageProducingApp.class); + await().pollDelay(3, TimeUnit.SECONDS).until(() -> true); + var received = receiveMessages(queueSinkUrl, r -> r.messageAttributeNames("key"), expected, Duration.ofMinutes(1)); + assertThat(received).hasSizeGreaterThanOrEqualTo(10) + .allSatisfy(m -> assertThat(m.messageAttributes()).containsKey("key")) + .extracting(m -> m.body()).contains("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"); + } + + @Test + void testOutboundMessageNacking() { + SqsClientProvider.client = getSqsClient(); + addBeans(SqsClientProvider.class); + createQueue(queue); + MapBasedConfig config = new MapBasedConfig() + .with("mp.messaging.outgoing.sink.connector", SqsConnector.CONNECTOR_NAME) + .with("mp.messaging.outgoing.sink.queue", queue); + + MessageAckingApp app = runApplication(config, MessageAckingApp.class); + await().until(() -> app.nacked().size() == 10); + } + + @ApplicationScoped + public static class RequestBuilderProducingApp { + @Outgoing("sink") + public Multi produce() { + return Multi.createFrom().range(0, 10) + .map(i -> SendMessageRequest.builder() + .messageAttributes(Map.of("key", MessageAttributeValue.builder() + .dataType("String").stringValue("value").build())) + .messageBody(String.valueOf(i))); + } + + } + + @ApplicationScoped + public static class OutgoingMetadataProducingApp { + @Outgoing("sink") + public Multi> produce() { + return Multi.createFrom().range(0, 10) + .map(i -> Message.of(String.valueOf(i), Metadata.of(SqsOutboundMetadata.builder() + .messageAttributes(Map.of("key", MessageAttributeValue.builder() + .dataType("String").stringValue("value").build())) + .build()))); + } + + } + + @ApplicationScoped + public static class MessageProducingApp { + @Incoming("data") + @Outgoing("sink") + public GenericPayload process( + software.amazon.awssdk.services.sqs.model.Message message) { + return GenericPayload.of(message, Metadata.of(SqsOutboundMetadata.builder() + .groupId("group") + .deduplicationId(message.messageId()) + .messageAttributes(Map.of("key", MessageAttributeValue.builder() + .dataType("String").stringValue("value").build())) + .build())); + } + + } + + @ApplicationScoped + public static class MessageAckingApp { + + List acked = new CopyOnWriteArrayList<>(); + List nacked = new CopyOnWriteArrayList<>(); + + @Outgoing("sink") + Multi> produce() { + return Multi.createFrom().range(0, 10) + .map(i -> Message.of(i, Metadata.of(SqsOutboundMetadata.builder() + .messageAttributes(Map.of("key", MessageAttributeValue.builder() + .stringValue("value").build())) + .build())).withAck(() -> { + acked.add(i); + return CompletableFuture.completedFuture(null); + }).withNack(throwable -> { + nacked.add(i); + return CompletableFuture.completedFuture(null); + })); + } + + public List acked() { + return acked; + } + + public List nacked() { + return nacked; + } + } +} diff --git a/smallrye-reactive-messaging-aws-sqs/src/test/java/io/smallrye/reactive/messaging/aws/sqs/OutboundMessageTest.java b/smallrye-reactive-messaging-aws-sqs/src/test/java/io/smallrye/reactive/messaging/aws/sqs/OutboundMessageTest.java index a12523c1b..cfe9ed8af 100644 --- a/smallrye-reactive-messaging-aws-sqs/src/test/java/io/smallrye/reactive/messaging/aws/sqs/OutboundMessageTest.java +++ b/smallrye-reactive-messaging-aws-sqs/src/test/java/io/smallrye/reactive/messaging/aws/sqs/OutboundMessageTest.java @@ -8,6 +8,7 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; import jakarta.enterprise.context.ApplicationScoped; @@ -86,8 +87,9 @@ void testMessage() { .with("mp.messaging.outgoing.sink.queue", queueSink); runApplication(config, MessageProducingApp.class); + await().pollDelay(3, TimeUnit.SECONDS).until(() -> true); var received = receiveMessages(queueSinkUrl, r -> r.messageAttributeNames("key"), expected, Duration.ofMinutes(1)); - assertThat(received).hasSizeGreaterThan(10) + assertThat(received).hasSizeGreaterThanOrEqualTo(10) .allSatisfy(m -> assertThat(m.messageAttributes()).containsKey("key")) .extracting(m -> m.body()).contains("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"); } diff --git a/smallrye-reactive-messaging-aws-sqs/src/test/java/io/smallrye/reactive/messaging/aws/sqs/SqsTestBase.java b/smallrye-reactive-messaging-aws-sqs/src/test/java/io/smallrye/reactive/messaging/aws/sqs/SqsTestBase.java index 06d70e09a..6e75caa47 100644 --- a/smallrye-reactive-messaging-aws-sqs/src/test/java/io/smallrye/reactive/messaging/aws/sqs/SqsTestBase.java +++ b/smallrye-reactive-messaging-aws-sqs/src/test/java/io/smallrye/reactive/messaging/aws/sqs/SqsTestBase.java @@ -3,6 +3,7 @@ import java.lang.reflect.Method; import java.time.Duration; import java.time.Instant; +import java.util.ArrayList; import java.util.List; import java.util.UUID; import java.util.concurrent.CopyOnWriteArrayList; @@ -28,8 +29,11 @@ import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.sqs.SqsAsyncClient; import software.amazon.awssdk.services.sqs.model.CreateQueueRequest; +import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest; +import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry; import software.amazon.awssdk.services.sqs.model.Message; import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; +import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse; import software.amazon.awssdk.services.sqs.model.SendMessageRequest; @Testcontainers @@ -102,17 +106,32 @@ public synchronized SqsAsyncClient getSqsClient() { return client; } - public List receiveMessages(String queueUrl, + public List receiveAndDeleteMessages(String queueUrl, Consumer receiveMessageRequest, Predicate> stopCondition) { var sqsClient = getSqsClient(); var received = new CopyOnWriteArrayList(); while (!stopCondition.test(received)) { try { - received.addAll(sqsClient - .receiveMessage(r -> receiveMessageRequest.accept(r.queueUrl(queueUrl).maxNumberOfMessages(10))) - .get() - .messages()); + ReceiveMessageResponse receiveMessageResponse = sqsClient + .receiveMessage(r -> receiveMessageRequest + .accept(r.queueUrl(queueUrl).waitTimeSeconds(1).maxNumberOfMessages(10))) + .get(); + if (receiveMessageResponse.hasMessages()) { + List entries = new ArrayList<>(); + for (Message message : receiveMessageResponse.messages()) { + entries.add(DeleteMessageBatchRequestEntry.builder() + .id(message.messageId()) + .receiptHandle(message.receiptHandle()) + .build()); + } + sqsClient.deleteMessageBatch(DeleteMessageBatchRequest.builder() + .queueUrl(queueUrl) + .entries(entries) + .build()) + .get(); + received.addAll(receiveMessageResponse.messages()); + } } catch (ExecutionException | InterruptedException e) { throw new RuntimeException(e); } @@ -121,7 +140,7 @@ public List receiveMessages(String queueUrl, } public List receiveMessages(String queueUrl, Predicate> stopCondition) { - return receiveMessages(queueUrl, r -> { + return receiveAndDeleteMessages(queueUrl, r -> { }, stopCondition); } @@ -133,7 +152,7 @@ public List receiveMessages(String queueUrl, int numberOfMessages, Dura public List receiveMessages(String queueUrl, Consumer receiveMessageRequest, int numberOfMessages, Duration timeout) { Instant timeoutTs = Instant.now().plus(timeout); - return receiveMessages(queueUrl, receiveMessageRequest, + return receiveAndDeleteMessages(queueUrl, receiveMessageRequest, messages -> messages.size() >= numberOfMessages || Instant.now().isAfter(timeoutTs)); }