Skip to content

Commit

Permalink
upstream merge
Browse files Browse the repository at this point in the history
  • Loading branch information
mensfeld committed Jan 7, 2025
2 parents 296e7b0 + dc3bd4f commit 44959ca
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 311 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Rdkafka Changelog

## 0.19.0 (Unreleased)
- **[Breaking]** Deprecate and remove `#each_batch` due to data consistency concerns.
- [Fix] Restore `Rdkafka::Bindings.rd_kafka_global_init` as it was not the source of the original issue.

## 0.18.1 (2024-12-04)
- [Fix] Do not run `Rdkafka::Bindings.rd_kafka_global_init` on require to prevent some of macos versions from hanging on Puma fork.

Expand Down
2 changes: 2 additions & 0 deletions lib/rdkafka.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,5 @@
# Main Rdkafka namespace of this gem
module Rdkafka
end

Rdkafka::Bindings.rd_kafka_global_init
96 changes: 16 additions & 80 deletions lib/rdkafka/consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -619,87 +619,23 @@ def each
end
end

# Poll for new messages and yield them in batches that may contain
# messages from more than one partition.
#
# Rather than yield each message immediately as soon as it is received,
# each_batch will attempt to wait for as long as `timeout_ms` in order
# to create a batch of up to but no more than `max_items` in size.
#
# Said differently, if more than `max_items` are available within
# `timeout_ms`, then `each_batch` will yield early with `max_items` in the
# array, but if `timeout_ms` passes by with fewer messages arriving, it
# will yield an array of fewer messages, quite possibly zero.
#
# In order to prevent wrongly auto committing many messages at once across
# possibly many partitions, callers must explicitly indicate which messages
# have been successfully processed as some consumed messages may not have
# been yielded yet. To do this, the caller should set
# `enable.auto.offset.store` to false and pass processed messages to
# {store_offset}. It is also possible, though more complex, to set
# 'enable.auto.commit' to false and then pass a manually assembled
# TopicPartitionList to {commit}.
#
# As with `each`, iteration will end when the consumer is closed.
#
# Exception behavior is more complicated than with `each`, in that if
# :yield_on_error is true, and an exception is raised during the
# poll, and messages have already been received, they will be yielded to
# the caller before the exception is allowed to propagate.
#
# If you are setting either auto.commit or auto.offset.store to false in
# the consumer configuration, then you should let yield_on_error keep its
# default value of false because you are guaranteed to see these messages
# again. However, if both auto.commit and auto.offset.store are set to
# true, you should set yield_on_error to true so you can process messages
# that you may or may not see again.
#
# @param max_items [Integer] Maximum size of the yielded array of messages
# @param bytes_threshold [Integer] Threshold number of total message bytes in the yielded array of messages
# @param timeout_ms [Integer] max time to wait for up to max_items
#
# @yieldparam messages [Array] An array of received Message
# @yieldparam pending_exception [Exception] normally nil, or an exception
#
# @yield [messages, pending_exception]
# which will be propagated after processing of the partial batch is complete.
#
# @return [nil]
#
# @raise [RdkafkaError] When polling fails
# Deprecated. Please read the error message for more details.
def each_batch(max_items: 100, bytes_threshold: Float::INFINITY, timeout_ms: 250, yield_on_error: false, &block)
closed_consumer_check(__method__)
slice = []
bytes = 0
end_time = monotonic_now + timeout_ms / 1000.0
loop do
break if closed?
max_wait = end_time - monotonic_now
max_wait_ms = if max_wait <= 0
0 # should not block, but may retrieve a message
else
(max_wait * 1000).floor
end
message = nil
begin
message = poll max_wait_ms
rescue Rdkafka::RdkafkaError => error
raise unless yield_on_error
raise if slice.empty?
yield slice.dup, error
raise
end
if message
slice << message
bytes += message.payload.bytesize if message.payload
end
if slice.size == max_items || bytes >= bytes_threshold || monotonic_now >= end_time - 0.001
yield slice.dup, nil
slice.clear
bytes = 0
end_time = monotonic_now + timeout_ms / 1000.0
end
end
raise NotImplementedError, <<~ERROR
`each_batch` has been removed due to data consistency concerns.
This method was removed because it did not properly handle partition reassignments,
which could lead to processing messages from partitions that were no longer owned
by this consumer, resulting in duplicate message processing and data inconsistencies.
Recommended alternatives:
1. Implement your own batching logic using rebalance callbacks to properly handle
partition revocations and ensure message processing correctness.
2. Use a high-level batching library that supports proper partition reassignment
handling out of the box (such as the Karafka framework).
ERROR
end

# Returns pointer to the consumer group metadata. It is used only in the context of
Expand Down
235 changes: 4 additions & 231 deletions spec/rdkafka/consumer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -948,236 +948,10 @@ def send_one_message(val)
end

describe "#each_batch" do
let(:message_payload) { 'a' * 10 }

before do
@topic = SecureRandom.base64(10).tr('+=/', '')
end

after do
@topic = nil
end

def topic_name
@topic
end

def produce_n(n)
handles = []
n.times do |i|
handles << producer.produce(
topic: topic_name,
payload: i % 10 == 0 ? nil : Time.new.to_f.to_s,
key: i.to_s,
partition: 0
)
end
handles.each(&:wait)
end

def new_message
instance_double("Rdkafka::Consumer::Message").tap do |message|
allow(message).to receive(:payload).and_return(message_payload)
end
end

it "retrieves messages produced into a topic" do
# This is the only each_batch test that actually produces real messages
# into a topic in the real kafka of the container.
#
# The other tests stub 'poll' which makes them faster and more reliable,
# but it makes sense to keep a single test with a fully integrated flow.
# This will help to catch breaking changes in the behavior of 'poll',
# libdrkafka, or Kafka.
#
# This is, in effect, an integration test and the subsequent specs are
# unit tests.
admin = rdkafka_config.admin
create_topic_handle = admin.create_topic(topic_name, 1, 1)
create_topic_handle.wait(max_wait_timeout: 15.0)
consumer.subscribe(topic_name)
produce_n 42
all_yields = []
consumer.each_batch(max_items: 10) do |batch|
all_yields << batch
break if all_yields.flatten.size >= 42
end
expect(all_yields.flatten.first).to be_a Rdkafka::Consumer::Message
expect(all_yields.flatten.size).to eq 42
expect(all_yields.size).to be > 4
expect(all_yields.flatten.map(&:key)).to eq (0..41).map { |x| x.to_s }
admin.close
end

it "should batch poll results and yield arrays of messages" do
consumer.subscribe(topic_name)
all_yields = []
expect(consumer)
.to receive(:poll)
.exactly(10).times
.and_return(new_message)
consumer.each_batch(max_items: 10) do |batch|
all_yields << batch
break if all_yields.flatten.size >= 10
end
expect(all_yields.first).to be_instance_of(Array)
expect(all_yields.flatten.size).to eq 10
non_empty_yields = all_yields.reject { |batch| batch.empty? }
expect(non_empty_yields.size).to be < 10
end

it "should yield a partial batch if the timeout is hit with some messages" do
consumer.subscribe(topic_name)
poll_count = 0
expect(consumer)
.to receive(:poll)
.at_least(3).times do
poll_count = poll_count + 1
if poll_count > 2
sleep 0.1
nil
else
new_message
end
end
all_yields = []
consumer.each_batch(max_items: 10) do |batch|
all_yields << batch
break if all_yields.flatten.size >= 2
end
expect(all_yields.flatten.size).to eq 2
end

it "should yield [] if nothing is received before the timeout" do
admin = rdkafka_config.admin
create_topic_handle = admin.create_topic(topic_name, 1, 1)
create_topic_handle.wait(max_wait_timeout: 15.0)
consumer.subscribe(topic_name)
consumer.each_batch do |batch|
expect(batch).to eq([])
break
end
admin.close
end

it "should yield batchs of max_items in size if messages are already fetched" do
yielded_batches = []
expect(consumer)
.to receive(:poll)
.with(anything)
.exactly(20).times
.and_return(new_message)

consumer.each_batch(max_items: 10, timeout_ms: 500) do |batch|
yielded_batches << batch
break if yielded_batches.flatten.size >= 20
break if yielded_batches.size >= 20 # so failure doesn't hang
end
expect(yielded_batches.size).to eq 2
expect(yielded_batches.map(&:size)).to eq 2.times.map { 10 }
end

it "should yield batchs as soon as bytes_threshold is hit" do
yielded_batches = []
expect(consumer)
.to receive(:poll)
.with(anything)
.exactly(20).times
.and_return(new_message)

consumer.each_batch(bytes_threshold: message_payload.size * 4, timeout_ms: 500) do |batch|
yielded_batches << batch
break if yielded_batches.flatten.size >= 20
break if yielded_batches.size >= 20 # so failure doesn't hang
end
expect(yielded_batches.size).to eq 5
expect(yielded_batches.map(&:size)).to eq 5.times.map { 4 }
end

context "error raised from poll and yield_on_error is true" do
it "should yield buffered exceptions on rebalance, then break" do
config = rdkafka_consumer_config(
{
:"enable.auto.commit" => false,
:"enable.auto.offset.store" => false
}
)
consumer = config.consumer
consumer.subscribe(topic_name)
batches_yielded = []
exceptions_yielded = []
each_batch_iterations = 0
poll_count = 0
expect(consumer)
.to receive(:poll)
.with(anything)
.exactly(3).times
.and_wrap_original do |method, *args|
poll_count = poll_count + 1
if poll_count == 3
raise Rdkafka::RdkafkaError.new(27,
"partitions ... too ... heavy ... must ... rebalance")
else
new_message
end
end
expect {
consumer.each_batch(max_items: 30, yield_on_error: true) do |batch, pending_error|
batches_yielded << batch
exceptions_yielded << pending_error
each_batch_iterations = each_batch_iterations + 1
end
}.to raise_error(Rdkafka::RdkafkaError)
expect(poll_count).to eq 3
expect(each_batch_iterations).to eq 1
expect(batches_yielded.size).to eq 1
expect(batches_yielded.first.size).to eq 2
expect(exceptions_yielded.flatten.size).to eq 1
expect(exceptions_yielded.flatten.first).to be_instance_of(Rdkafka::RdkafkaError)
consumer.close
end
end

context "error raised from poll and yield_on_error is false" do
it "should yield buffered exceptions on rebalance, then break" do
config = rdkafka_consumer_config(
{
:"enable.auto.commit" => false,
:"enable.auto.offset.store" => false
}
)
consumer = config.consumer
consumer.subscribe(topic_name)
batches_yielded = []
exceptions_yielded = []
each_batch_iterations = 0
poll_count = 0
expect(consumer)
.to receive(:poll)
.with(anything)
.exactly(3).times
.and_wrap_original do |method, *args|
poll_count = poll_count + 1
if poll_count == 3
raise Rdkafka::RdkafkaError.new(27,
"partitions ... too ... heavy ... must ... rebalance")
else
new_message
end
end
expect {
consumer.each_batch(max_items: 30, yield_on_error: false) do |batch, pending_error|
batches_yielded << batch
exceptions_yielded << pending_error
each_batch_iterations = each_batch_iterations + 1
end
}.to raise_error(Rdkafka::RdkafkaError)
expect(poll_count).to eq 3
expect(each_batch_iterations).to eq 0
expect(batches_yielded.size).to eq 0
expect(exceptions_yielded.size).to eq 0
consumer.close
end
it 'expect to raise an error' do
expect do
consumer.each_batch {}
end.to raise_error(NotImplementedError)
end
end

Expand Down Expand Up @@ -1344,7 +1118,6 @@ def on_partitions_revoked(list)
{
:subscribe => [ nil ],
:unsubscribe => nil,
:each_batch => nil,
:pause => [ nil ],
:resume => [ nil ],
:subscription => nil,
Expand Down

0 comments on commit 44959ca

Please sign in to comment.