Skip to content

Commit

Permalink
Allow for consumers with separated event and consumer queues (#40)
Browse files Browse the repository at this point in the history
* separate queue code

* config spec

* specs

* remarks and specs
  • Loading branch information
mensfeld authored Nov 7, 2023
1 parent 2531433 commit 0b00bc0
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 3 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Rdkafka Changelog

## 0.13.9 (Unreleased)
- [Enhancement] Expose alternative way of managing consumer events via a separate queue.
- [Enhancement] Allow for setting `statistics_callback` as nil to reset predefined settings configured by a different gem.

## 0.13.8 (2023-10-31)
- [Enhancement] Get consumer position (thijsc & mensfeld)

Expand Down
23 changes: 20 additions & 3 deletions lib/rdkafka/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def self.logger=(logger)
#
# @return [nil]
def self.statistics_callback=(callback)
raise TypeError.new("Callback has to be callable") unless callback.respond_to?(:call)
raise TypeError.new("Callback has to be callable") unless callback.respond_to?(:call) || callback == nil
@@statistics_callback = callback
end

Expand Down Expand Up @@ -112,6 +112,7 @@ def self.opaques
def initialize(config_hash = {})
@config_hash = DEFAULT_CONFIG.merge(config_hash)
@consumer_rebalance_listener = nil
@consumer_poll_set = true
end

# Set a config option.
Expand Down Expand Up @@ -140,6 +141,22 @@ def consumer_rebalance_listener=(listener)
@consumer_rebalance_listener = listener
end

# Should we use a single queue for the underlying consumer and events.
#
# This is an advanced API that allows for more granular control of the polling process.
# When this value is set to `false` (`true` by defualt), there will be two queues that need to
# be polled:
# - main librdkafka queue for events
# - consumer queue with messages and rebalances
#
# It is recommended to use the defaults and only set it to `false` in advance multi-threaded
# and complex cases where granular events handling control is needed.
#
# @param poll_set [Boolean]
def consumer_poll_set=(poll_set)
@consumer_poll_set = poll_set
end

# Creates a consumer with this configuration.
#
# @return [Consumer] The created consumer
Expand All @@ -158,8 +175,8 @@ def consumer
# Create native client
kafka = native_kafka(config, :rd_kafka_consumer)

# Redirect the main queue to the consumer
Rdkafka::Bindings.rd_kafka_poll_set_consumer(kafka)
# Redirect the main queue to the consumer queue
Rdkafka::Bindings.rd_kafka_poll_set_consumer(kafka) if @consumer_poll_set

# Return consumer with Kafka client
Rdkafka::Consumer.new(
Expand Down
26 changes: 26 additions & 0 deletions lib/rdkafka/consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,32 @@ def poll(timeout_ms)
end
end

# Polls the main rdkafka queue (not the consumer one). Do **NOT** use it if `consumer_poll_set`
# was set to `true`.
#
# Events will cause application-provided callbacks to be called.
#
# Events (in the context of the consumer):
# - error callbacks
# - stats callbacks
# - any other callbacks supported by librdkafka that are not part of the consumer_poll, that
# would have a callback configured and activated.
#
# This method needs to be called at regular intervals to serve any queued callbacks waiting to
# be called. When in use, does **NOT** replace `#poll` but needs to run complementary with it.
#
# @param timeout_ms [Integer] poll timeout. If set to 0 will run async, when set to -1 will
# block until any events available.
#
# @note This method technically should be called `#poll` and the current `#poll` should be
# called `#consumer_poll` though we keep the current naming convention to make it backward
# compatible.
def events_poll(timeout_ms = 0)
@native_kafka.with_inner do |inner|
Rdkafka::Bindings.rd_kafka_poll(inner, timeout_ms)
end
end

# Poll for new messages and yield for each received one. Iteration
# will end when the consumer is closed.
#
Expand Down
8 changes: 8 additions & 0 deletions spec/rdkafka/config_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,14 @@ def call(stats); end
consumer.close
end

it "should create a consumer with consumer_poll_set set to false" do
config = rdkafka_consumer_config
config.consumer_poll_set = false
consumer = config.consumer
expect(consumer).to be_a Rdkafka::Consumer
consumer.close
end

it "should raise an error when creating a consumer with invalid config" do
config = Rdkafka::Config.new('invalid.key' => 'value')
expect {
Expand Down
47 changes: 47 additions & 0 deletions spec/rdkafka/consumer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,30 @@
consumer.subscription
}.to raise_error(Rdkafka::RdkafkaError)
end

context "when using consumer without the poll set" do
let(:consumer) do
config = rdkafka_consumer_config
config.consumer_poll_set = false
config.consumer
end

it "should subscribe, unsubscribe and return the subscription" do
expect(consumer.subscription).to be_empty

consumer.subscribe("consume_test_topic")

expect(consumer.subscription).not_to be_empty
expected_subscription = Rdkafka::Consumer::TopicPartitionList.new.tap do |list|
list.add_topic("consume_test_topic")
end
expect(consumer.subscription).to eq expected_subscription

consumer.unsubscribe

expect(consumer.subscription).to be_empty
end
end
end

describe "#pause and #resume" do
Expand Down Expand Up @@ -1076,6 +1100,29 @@ def send_one_message(val)
end
end

# Only relevant in case of a consumer with separate queues
describe '#events_poll' do
let(:stats) { [] }

before { Rdkafka::Config.statistics_callback = ->(published) { stats << published } }

after { Rdkafka::Config.statistics_callback = nil }

let(:consumer) do
config = rdkafka_consumer_config('statistics.interval.ms': 100)
config.consumer_poll_set = false
config.consumer
end

it "expect to run events_poll, operate and propagate stats on events_poll and not poll" do
consumer.subscribe("consume_test_topic")
consumer.poll(1_000)
expect(stats).to be_empty
consumer.events_poll(-1)
expect(stats).not_to be_empty
end
end

describe "a rebalance listener" do
let(:consumer) do
config = rdkafka_consumer_config
Expand Down

0 comments on commit 0b00bc0

Please sign in to comment.