Skip to content

Commit

Permalink
Merge pull request smallrye#2318 from xstefank/2317-rabbitmq-queue-name
Browse files Browse the repository at this point in the history
Make queue.name default to channel name in RabbitMQ
  • Loading branch information
ozangunalp authored Oct 6, 2023
2 parents 488eef1 + 35ad3e4 commit 60ac56d
Showing 1 changed file with 10 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@
@ConnectorAttribute(name = "exchange.declare", direction = INCOMING_AND_OUTGOING, description = "Whether to declare the exchange; set to false if the exchange is expected to be set up independently", type = "boolean", defaultValue = "true")

// Queue
@ConnectorAttribute(name = "queue.name", direction = INCOMING, description = "The queue from which messages are consumed.", type = "string", mandatory = true)
@ConnectorAttribute(name = "queue.name", direction = INCOMING, description = "The queue from which messages are consumed. If not set, the channel name is used.", type = "string")
@ConnectorAttribute(name = "queue.durable", direction = INCOMING, description = "Whether the queue is durable", type = "boolean", defaultValue = "true")
@ConnectorAttribute(name = "queue.exclusive", direction = INCOMING, description = "Whether the queue is for exclusive use", type = "boolean", defaultValue = "false")
@ConnectorAttribute(name = "queue.auto-delete", direction = INCOMING, description = "Whether the queue should be deleted after use", type = "boolean", defaultValue = "false")
Expand Down Expand Up @@ -202,14 +202,18 @@ public static String getExchangeName(final RabbitMQConnectorCommonConfiguration
return config.getExchangeName().map(s -> "\"\"".equals(s) ? "" : s).orElse(config.getChannel());
}

public static String getQueueName(final RabbitMQConnectorIncomingConfiguration config) {
return config.getQueueName().orElse(config.getChannel());
}

private Multi<? extends Message<?>> getStreamOfMessages(
RabbitMQConsumer receiver,
ConnectionHolder holder,
RabbitMQConnectorIncomingConfiguration ic,
RabbitMQFailureHandler onNack,
RabbitMQAckHandler onAck) {

final String queueName = ic.getQueueName();
final String queueName = getQueueName(ic);
final boolean isTracingEnabled = ic.getTracingEnabled();
final String contentTypeOverride = ic.getContentTypeOverride().orElse(null);
log.receiverListeningAddress(queueName);
Expand Down Expand Up @@ -292,7 +296,7 @@ public Flow.Publisher<? extends Message<?>> getPublisher(final Config config) {
}

private Uni<RabbitMQConsumer> createConsumer(RabbitMQConnectorIncomingConfiguration ic, RabbitMQClient client) {
return client.basicConsumer(serverQueueName(ic.getQueueName()), new QueueOptions()
return client.basicConsumer(serverQueueName(getQueueName(ic)), new QueueOptions()
.setAutoAck(ic.getAutoAcknowledgement())
.setMaxInternalQueueSize(ic.getMaxIncomingInternalQueueSize())
.setKeepMostRecent(ic.getKeepMostRecent()))
Expand All @@ -309,7 +313,7 @@ private Uni<RabbitMQConsumer> createConsumer(RabbitMQConnectorIncomingConfigurat
private Uni<?> establishDLQ(final RabbitMQClient client, final RabbitMQConnectorIncomingConfiguration ic) {
final String deadLetterQueueName = ic.getDeadLetterQueueName().orElse(String.format("%s.dlq", ic.getQueueName()));
final String deadLetterExchangeName = ic.getDeadLetterExchange();
final String deadLetterRoutingKey = ic.getDeadLetterRoutingKey().orElse(ic.getQueueName());
final String deadLetterRoutingKey = ic.getDeadLetterRoutingKey().orElse(getQueueName(ic));

// Declare the exchange if we have been asked to do so
final Uni<String> dlxFlow = Uni.createFrom()
Expand Down Expand Up @@ -510,7 +514,7 @@ private Uni<String> establishExchange(
private Uni<String> establishQueue(
final RabbitMQClient client,
final RabbitMQConnectorIncomingConfiguration ic) {
final String queueName = ic.getQueueName();
final String queueName = getQueueName(ic);

// Declare the queue (and its binding(s) to the exchange, and TTL) if we have been asked to do so
final JsonObject queueArgs = new JsonObject();
Expand Down Expand Up @@ -573,7 +577,7 @@ private Multi<String> establishBindings(
final RabbitMQClient client,
final RabbitMQConnectorIncomingConfiguration ic) {
final String exchangeName = getExchangeName(ic);
final String queueName = ic.getQueueName();
final String queueName = getQueueName(ic);
final List<String> routingKeys = Arrays.stream(ic.getRoutingKeys().split(","))
.map(String::trim).collect(Collectors.toList());
final Map<String, Object> arguments = parseArguments(ic.getArguments());
Expand Down

0 comments on commit 60ac56d

Please sign in to comment.