Skip to content

Commit

Permalink
separate queue code
Browse files Browse the repository at this point in the history
  • Loading branch information
mensfeld committed Nov 6, 2023
1 parent 2531433 commit a55b3d3
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 2 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Rdkafka Changelog

## 0.13.9 (Unreleased)
- [Enhancement] Expose alternative way of managing consumer events via a separate queue.

## 0.13.8 (2023-10-31)
- [Enhancement] Get consumer position (thijsc & mensfeld)

Expand Down
21 changes: 19 additions & 2 deletions lib/rdkafka/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ def self.opaques
def initialize(config_hash = {})
@config_hash = DEFAULT_CONFIG.merge(config_hash)
@consumer_rebalance_listener = nil
@consumer_poll_set = true
end

# Set a config option.
Expand Down Expand Up @@ -140,6 +141,22 @@ def consumer_rebalance_listener=(listener)
@consumer_rebalance_listener = listener
end

# Should we use a single queue for the underlying consumer and events.
#
# This is an advanced API that allows for more granular control of the polling process.
# When this value is set to `false` (`true` by defualt), there will be two queues that need to
# be polled:
# - main librdkafka queue for events
# - consumer queue with messages and rebalances
#
# It is recommended to use the defaults and only set it to `false` in advance multi-threaded
# and complex cases where granular events handling control is needed.
#
# @param poll_set [Boolean]
def consumer_poll_set=(poll_set)
@consumer_poll_set = poll_set
end

# Creates a consumer with this configuration.
#
# @return [Consumer] The created consumer
Expand All @@ -158,8 +175,8 @@ def consumer
# Create native client
kafka = native_kafka(config, :rd_kafka_consumer)

# Redirect the main queue to the consumer
Rdkafka::Bindings.rd_kafka_poll_set_consumer(kafka)
# Redirect the main queue to the consumer queue
Rdkafka::Bindings.rd_kafka_poll_set_consumer(kafka) if @consumer_poll_set

# Return consumer with Kafka client
Rdkafka::Consumer.new(
Expand Down
26 changes: 26 additions & 0 deletions lib/rdkafka/consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,32 @@ def poll(timeout_ms)
end
end

# Polls the main rdkafka queue (not the consumer one). Do **NOT** use it if `consumer_poll_set`
# was set to `true`.
#
# Events will cause application-provided callbacks to be called.
#
# Events (in the context of the consumer):
# - error callbacks
# - stats callbacks
# - any other callbacks supported by librdkafka that are not part of the consumer_poll, that
# would have a callback configured and activated.
#
# This method needs to be called at regular intervals to serve any queued callbacks waiting to
# be called. When in use, does **NOT** replace `#poll` but needs to run complementary with it.
#
# @param timeout_ms [Integer] poll timeout. If set to 0 will run async, when set to -1 will
# block until any events available.
#
# @note This method technically should be called `#poll` and the current `#poll` should be
# called `#consumer_poll` though we keep the current naming convention to make it backward
# compatible.
def events_poll(timeout_ms = 0)
@native_kafka.with_inner do |inner|
Rdkafka::Bindings.rd_kafka_poll(inner, timeout_ms)
end
end

# Poll for new messages and yield for each received one. Iteration
# will end when the consumer is closed.
#
Expand Down

0 comments on commit a55b3d3

Please sign in to comment.