diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java index 0b288b19cb..48223860cb 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java @@ -100,7 +100,7 @@ @ConnectorAttribute(name = "exchange.declare", direction = INCOMING_AND_OUTGOING, description = "Whether to declare the exchange; set to false if the exchange is expected to be set up independently", type = "boolean", defaultValue = "true") // Queue -@ConnectorAttribute(name = "queue.name", direction = INCOMING, description = "The queue from which messages are consumed.", type = "string", mandatory = true) +@ConnectorAttribute(name = "queue.name", direction = INCOMING, description = "The queue from which messages are consumed. If not set, the channel name is used.", type = "string") @ConnectorAttribute(name = "queue.durable", direction = INCOMING, description = "Whether the queue is durable", type = "boolean", defaultValue = "true") @ConnectorAttribute(name = "queue.exclusive", direction = INCOMING, description = "Whether the queue is for exclusive use", type = "boolean", defaultValue = "false") @ConnectorAttribute(name = "queue.auto-delete", direction = INCOMING, description = "Whether the queue should be deleted after use", type = "boolean", defaultValue = "false") @@ -202,6 +202,10 @@ public static String getExchangeName(final RabbitMQConnectorCommonConfiguration return config.getExchangeName().map(s -> "\"\"".equals(s) ? "" : s).orElse(config.getChannel()); } + public static String getQueueName(final RabbitMQConnectorIncomingConfiguration config) { + return config.getQueueName().orElse(config.getChannel()); + } + private Multi> getStreamOfMessages( RabbitMQConsumer receiver, ConnectionHolder holder, @@ -209,7 +213,7 @@ private Multi> getStreamOfMessages( RabbitMQFailureHandler onNack, RabbitMQAckHandler onAck) { - final String queueName = ic.getQueueName(); + final String queueName = getQueueName(ic); final boolean isTracingEnabled = ic.getTracingEnabled(); final String contentTypeOverride = ic.getContentTypeOverride().orElse(null); log.receiverListeningAddress(queueName); @@ -292,7 +296,7 @@ public Flow.Publisher> getPublisher(final Config config) { } private Uni createConsumer(RabbitMQConnectorIncomingConfiguration ic, RabbitMQClient client) { - return client.basicConsumer(serverQueueName(ic.getQueueName()), new QueueOptions() + return client.basicConsumer(serverQueueName(getQueueName(ic)), new QueueOptions() .setAutoAck(ic.getAutoAcknowledgement()) .setMaxInternalQueueSize(ic.getMaxIncomingInternalQueueSize()) .setKeepMostRecent(ic.getKeepMostRecent())) @@ -309,7 +313,7 @@ private Uni createConsumer(RabbitMQConnectorIncomingConfigurat private Uni establishDLQ(final RabbitMQClient client, final RabbitMQConnectorIncomingConfiguration ic) { final String deadLetterQueueName = ic.getDeadLetterQueueName().orElse(String.format("%s.dlq", ic.getQueueName())); final String deadLetterExchangeName = ic.getDeadLetterExchange(); - final String deadLetterRoutingKey = ic.getDeadLetterRoutingKey().orElse(ic.getQueueName()); + final String deadLetterRoutingKey = ic.getDeadLetterRoutingKey().orElse(getQueueName(ic)); // Declare the exchange if we have been asked to do so final Uni dlxFlow = Uni.createFrom() @@ -510,7 +514,7 @@ private Uni establishExchange( private Uni establishQueue( final RabbitMQClient client, final RabbitMQConnectorIncomingConfiguration ic) { - final String queueName = ic.getQueueName(); + final String queueName = getQueueName(ic); // Declare the queue (and its binding(s) to the exchange, and TTL) if we have been asked to do so final JsonObject queueArgs = new JsonObject(); @@ -573,7 +577,7 @@ private Multi establishBindings( final RabbitMQClient client, final RabbitMQConnectorIncomingConfiguration ic) { final String exchangeName = getExchangeName(ic); - final String queueName = ic.getQueueName(); + final String queueName = getQueueName(ic); final List routingKeys = Arrays.stream(ic.getRoutingKeys().split(",")) .map(String::trim).collect(Collectors.toList()); final Map arguments = parseArguments(ic.getArguments());