From dc3bd4fafdcc2571f1be48d8b80b0dae0c8b89a0 Mon Sep 17 00:00:00 2001 From: Maciej Mensfeld Date: Mon, 6 Jan 2025 10:02:11 +0100 Subject: [PATCH] remove each_batch method due to data consistency issues (#541) * remove each_batch method due to data consistency issues * fix spec --- CHANGELOG.md | 3 +- lib/rdkafka/consumer.rb | 96 +++----------- spec/rdkafka/consumer_spec.rb | 235 +--------------------------------- 3 files changed, 22 insertions(+), 312 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 98ec0b83..f22ab2d2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,8 @@ # Rdkafka Changelog ## 0.20.0 (Unreleased) -- [Enhancement] Bump librdkafka to 2.6.1 +- **[Breaking]** Deprecate and remove `#each_batch` due to data consistency concerns. +- [Enhancement] Bump librdkafka to `2.6.1` - [Enhancement] Expose `rd_kafka_global_init` to mitigate macos forking issues. - [Enhancement] Avoid clobbering LDFLAGS and CPPFLAGS if in a nix prepared environment (secobarbital). - [Patch] Retire no longer needed cooperative-sticky patch. diff --git a/lib/rdkafka/consumer.rb b/lib/rdkafka/consumer.rb index 8c62c348..6933c98a 100644 --- a/lib/rdkafka/consumer.rb +++ b/lib/rdkafka/consumer.rb @@ -609,87 +609,23 @@ def each end end - # Poll for new messages and yield them in batches that may contain - # messages from more than one partition. - # - # Rather than yield each message immediately as soon as it is received, - # each_batch will attempt to wait for as long as `timeout_ms` in order - # to create a batch of up to but no more than `max_items` in size. - # - # Said differently, if more than `max_items` are available within - # `timeout_ms`, then `each_batch` will yield early with `max_items` in the - # array, but if `timeout_ms` passes by with fewer messages arriving, it - # will yield an array of fewer messages, quite possibly zero. - # - # In order to prevent wrongly auto committing many messages at once across - # possibly many partitions, callers must explicitly indicate which messages - # have been successfully processed as some consumed messages may not have - # been yielded yet. To do this, the caller should set - # `enable.auto.offset.store` to false and pass processed messages to - # {store_offset}. It is also possible, though more complex, to set - # 'enable.auto.commit' to false and then pass a manually assembled - # TopicPartitionList to {commit}. - # - # As with `each`, iteration will end when the consumer is closed. - # - # Exception behavior is more complicated than with `each`, in that if - # :yield_on_error is true, and an exception is raised during the - # poll, and messages have already been received, they will be yielded to - # the caller before the exception is allowed to propagate. - # - # If you are setting either auto.commit or auto.offset.store to false in - # the consumer configuration, then you should let yield_on_error keep its - # default value of false because you are guaranteed to see these messages - # again. However, if both auto.commit and auto.offset.store are set to - # true, you should set yield_on_error to true so you can process messages - # that you may or may not see again. - # - # @param max_items [Integer] Maximum size of the yielded array of messages - # @param bytes_threshold [Integer] Threshold number of total message bytes in the yielded array of messages - # @param timeout_ms [Integer] max time to wait for up to max_items - # - # @yieldparam messages [Array] An array of received Message - # @yieldparam pending_exception [Exception] normally nil, or an exception - # - # @yield [messages, pending_exception] - # which will be propagated after processing of the partial batch is complete. - # - # @return [nil] - # - # @raise [RdkafkaError] When polling fails + # Deprecated. Please read the error message for more details. def each_batch(max_items: 100, bytes_threshold: Float::INFINITY, timeout_ms: 250, yield_on_error: false, &block) - closed_consumer_check(__method__) - slice = [] - bytes = 0 - end_time = monotonic_now + timeout_ms / 1000.0 - loop do - break if closed? - max_wait = end_time - monotonic_now - max_wait_ms = if max_wait <= 0 - 0 # should not block, but may retrieve a message - else - (max_wait * 1000).floor - end - message = nil - begin - message = poll max_wait_ms - rescue Rdkafka::RdkafkaError => error - raise unless yield_on_error - raise if slice.empty? - yield slice.dup, error - raise - end - if message - slice << message - bytes += message.payload.bytesize if message.payload - end - if slice.size == max_items || bytes >= bytes_threshold || monotonic_now >= end_time - 0.001 - yield slice.dup, nil - slice.clear - bytes = 0 - end_time = monotonic_now + timeout_ms / 1000.0 - end - end + raise NotImplementedError, <<~ERROR + `each_batch` has been removed due to data consistency concerns. + + This method was removed because it did not properly handle partition reassignments, + which could lead to processing messages from partitions that were no longer owned + by this consumer, resulting in duplicate message processing and data inconsistencies. + + Recommended alternatives: + + 1. Implement your own batching logic using rebalance callbacks to properly handle + partition revocations and ensure message processing correctness. + + 2. Use a high-level batching library that supports proper partition reassignment + handling out of the box (such as the Karafka framework). + ERROR end # Returns pointer to the consumer group metadata. It is used only in the context of diff --git a/spec/rdkafka/consumer_spec.rb b/spec/rdkafka/consumer_spec.rb index e5da460d..062f37d3 100644 --- a/spec/rdkafka/consumer_spec.rb +++ b/spec/rdkafka/consumer_spec.rb @@ -921,236 +921,10 @@ def send_one_message(val) end describe "#each_batch" do - let(:message_payload) { 'a' * 10 } - - before do - @topic = SecureRandom.base64(10).tr('+=/', '') - end - - after do - @topic = nil - end - - def topic_name - @topic - end - - def produce_n(n) - handles = [] - n.times do |i| - handles << producer.produce( - topic: topic_name, - payload: i % 10 == 0 ? nil : Time.new.to_f.to_s, - key: i.to_s, - partition: 0 - ) - end - handles.each(&:wait) - end - - def new_message - instance_double("Rdkafka::Consumer::Message").tap do |message| - allow(message).to receive(:payload).and_return(message_payload) - end - end - - it "retrieves messages produced into a topic" do - # This is the only each_batch test that actually produces real messages - # into a topic in the real kafka of the container. - # - # The other tests stub 'poll' which makes them faster and more reliable, - # but it makes sense to keep a single test with a fully integrated flow. - # This will help to catch breaking changes in the behavior of 'poll', - # libdrkafka, or Kafka. - # - # This is, in effect, an integration test and the subsequent specs are - # unit tests. - admin = rdkafka_config.admin - create_topic_handle = admin.create_topic(topic_name, 1, 1) - create_topic_handle.wait(max_wait_timeout: 15.0) - consumer.subscribe(topic_name) - produce_n 42 - all_yields = [] - consumer.each_batch(max_items: 10) do |batch| - all_yields << batch - break if all_yields.flatten.size >= 42 - end - expect(all_yields.flatten.first).to be_a Rdkafka::Consumer::Message - expect(all_yields.flatten.size).to eq 42 - expect(all_yields.size).to be > 4 - expect(all_yields.flatten.map(&:key)).to eq (0..41).map { |x| x.to_s } - admin.close - end - - it "should batch poll results and yield arrays of messages" do - consumer.subscribe(topic_name) - all_yields = [] - expect(consumer) - .to receive(:poll) - .exactly(10).times - .and_return(new_message) - consumer.each_batch(max_items: 10) do |batch| - all_yields << batch - break if all_yields.flatten.size >= 10 - end - expect(all_yields.first).to be_instance_of(Array) - expect(all_yields.flatten.size).to eq 10 - non_empty_yields = all_yields.reject { |batch| batch.empty? } - expect(non_empty_yields.size).to be < 10 - end - - it "should yield a partial batch if the timeout is hit with some messages" do - consumer.subscribe(topic_name) - poll_count = 0 - expect(consumer) - .to receive(:poll) - .at_least(3).times do - poll_count = poll_count + 1 - if poll_count > 2 - sleep 0.1 - nil - else - new_message - end - end - all_yields = [] - consumer.each_batch(max_items: 10) do |batch| - all_yields << batch - break if all_yields.flatten.size >= 2 - end - expect(all_yields.flatten.size).to eq 2 - end - - it "should yield [] if nothing is received before the timeout" do - admin = rdkafka_config.admin - create_topic_handle = admin.create_topic(topic_name, 1, 1) - create_topic_handle.wait(max_wait_timeout: 15.0) - consumer.subscribe(topic_name) - consumer.each_batch do |batch| - expect(batch).to eq([]) - break - end - admin.close - end - - it "should yield batchs of max_items in size if messages are already fetched" do - yielded_batches = [] - expect(consumer) - .to receive(:poll) - .with(anything) - .exactly(20).times - .and_return(new_message) - - consumer.each_batch(max_items: 10, timeout_ms: 500) do |batch| - yielded_batches << batch - break if yielded_batches.flatten.size >= 20 - break if yielded_batches.size >= 20 # so failure doesn't hang - end - expect(yielded_batches.size).to eq 2 - expect(yielded_batches.map(&:size)).to eq 2.times.map { 10 } - end - - it "should yield batchs as soon as bytes_threshold is hit" do - yielded_batches = [] - expect(consumer) - .to receive(:poll) - .with(anything) - .exactly(20).times - .and_return(new_message) - - consumer.each_batch(bytes_threshold: message_payload.size * 4, timeout_ms: 500) do |batch| - yielded_batches << batch - break if yielded_batches.flatten.size >= 20 - break if yielded_batches.size >= 20 # so failure doesn't hang - end - expect(yielded_batches.size).to eq 5 - expect(yielded_batches.map(&:size)).to eq 5.times.map { 4 } - end - - context "error raised from poll and yield_on_error is true" do - it "should yield buffered exceptions on rebalance, then break" do - config = rdkafka_consumer_config( - { - :"enable.auto.commit" => false, - :"enable.auto.offset.store" => false - } - ) - consumer = config.consumer - consumer.subscribe(topic_name) - batches_yielded = [] - exceptions_yielded = [] - each_batch_iterations = 0 - poll_count = 0 - expect(consumer) - .to receive(:poll) - .with(anything) - .exactly(3).times - .and_wrap_original do |method, *args| - poll_count = poll_count + 1 - if poll_count == 3 - raise Rdkafka::RdkafkaError.new(27, - "partitions ... too ... heavy ... must ... rebalance") - else - new_message - end - end - expect { - consumer.each_batch(max_items: 30, yield_on_error: true) do |batch, pending_error| - batches_yielded << batch - exceptions_yielded << pending_error - each_batch_iterations = each_batch_iterations + 1 - end - }.to raise_error(Rdkafka::RdkafkaError) - expect(poll_count).to eq 3 - expect(each_batch_iterations).to eq 1 - expect(batches_yielded.size).to eq 1 - expect(batches_yielded.first.size).to eq 2 - expect(exceptions_yielded.flatten.size).to eq 1 - expect(exceptions_yielded.flatten.first).to be_instance_of(Rdkafka::RdkafkaError) - consumer.close - end - end - - context "error raised from poll and yield_on_error is false" do - it "should yield buffered exceptions on rebalance, then break" do - config = rdkafka_consumer_config( - { - :"enable.auto.commit" => false, - :"enable.auto.offset.store" => false - } - ) - consumer = config.consumer - consumer.subscribe(topic_name) - batches_yielded = [] - exceptions_yielded = [] - each_batch_iterations = 0 - poll_count = 0 - expect(consumer) - .to receive(:poll) - .with(anything) - .exactly(3).times - .and_wrap_original do |method, *args| - poll_count = poll_count + 1 - if poll_count == 3 - raise Rdkafka::RdkafkaError.new(27, - "partitions ... too ... heavy ... must ... rebalance") - else - new_message - end - end - expect { - consumer.each_batch(max_items: 30, yield_on_error: false) do |batch, pending_error| - batches_yielded << batch - exceptions_yielded << pending_error - each_batch_iterations = each_batch_iterations + 1 - end - }.to raise_error(Rdkafka::RdkafkaError) - expect(poll_count).to eq 3 - expect(each_batch_iterations).to eq 0 - expect(batches_yielded.size).to eq 0 - expect(exceptions_yielded.size).to eq 0 - consumer.close - end + it 'expect to raise an error' do + expect do + consumer.each_batch {} + end.to raise_error(NotImplementedError) end end @@ -1317,7 +1091,6 @@ def on_partitions_revoked(list) { :subscribe => [ nil ], :unsubscribe => nil, - :each_batch => nil, :pause => [ nil ], :resume => [ nil ], :subscription => nil,