From 06bc5fd4c9a57e9f532a6ad8eab39667ce92678d Mon Sep 17 00:00:00 2001 From: Ben Pirt Date: Wed, 21 Feb 2024 12:43:23 +0000 Subject: [PATCH] Optimise the postgres queries for popping messages (#76) --- .../postgres/dao/PostgresQueueDAO.java | 44 ++++--------------- 1 file changed, 9 insertions(+), 35 deletions(-) diff --git a/postgres-persistence/src/main/java/com/netflix/conductor/postgres/dao/PostgresQueueDAO.java b/postgres-persistence/src/main/java/com/netflix/conductor/postgres/dao/PostgresQueueDAO.java index 693b48c06..71af60838 100644 --- a/postgres-persistence/src/main/java/com/netflix/conductor/postgres/dao/PostgresQueueDAO.java +++ b/postgres-persistence/src/main/java/com/netflix/conductor/postgres/dao/PostgresQueueDAO.java @@ -422,17 +422,19 @@ private boolean removeMessage(Connection connection, String queueName, String me q -> q.addParameter(queueName).addParameter(messageId).executeDelete()); } - private List peekMessages(Connection connection, String queueName, int count) { - if (count < 1) { - return Collections.emptyList(); - } + private List popMessages( + Connection connection, String queueName, int count, int timeout) { - final String PEEK_MESSAGES = - "SELECT message_id, priority, payload FROM queue_message WHERE queue_name = ? AND popped = false AND deliver_on <= (current_timestamp + (1000 ||' microseconds')::interval) ORDER BY priority DESC, deliver_on, created_on LIMIT ? FOR UPDATE SKIP LOCKED"; + String POP_QUERY = + "UPDATE queue_message SET popped = true WHERE message_id IN (" + + "SELECT message_id FROM queue_message WHERE queue_name = ? AND popped = false AND " + + "deliver_on <= (current_timestamp + (1000 ||' microseconds')::interval) " + + "ORDER BY priority DESC, deliver_on, created_on LIMIT ? FOR UPDATE SKIP LOCKED" + + ") RETURNING message_id, priority, payload"; return query( connection, - PEEK_MESSAGES, + POP_QUERY, p -> p.addParameter(queueName) .addParameter(count) @@ -450,34 +452,6 @@ private List peekMessages(Connection connection, String queueName, int })); } - private List popMessages( - Connection connection, String queueName, int count, int timeout) { - List messages = peekMessages(connection, queueName, count); - - if (messages.isEmpty()) { - return messages; - } - - List poppedMessages = new ArrayList<>(); - for (Message message : messages) { - final String POP_MESSAGE = - "UPDATE queue_message SET popped = true WHERE queue_name = ? AND message_id = ? AND popped = false"; - int result = - query( - connection, - POP_MESSAGE, - q -> - q.addParameter(queueName) - .addParameter(message.getId()) - .executeUpdate()); - - if (result == 1) { - poppedMessages.add(message); - } - } - return poppedMessages; - } - @Override public boolean containsMessage(String queueName, String messageId) { return getWithRetriedTransactions(tx -> existsMessage(tx, queueName, messageId));