diff --git a/CHANGELOG.md b/CHANGELOG.md index c718aa56..07bfbe90 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,5 @@ # 0.13.6 +* Provide `#purge` to remove any outstanding requests from the producer. * Fix `#flush` does not handle the timeouts errors by making it return true if all flushed or false if failed. We do **not** raise an exception here to keep it backwards compatible. # 0.13.5 diff --git a/lib/rdkafka/bindings.rb b/lib/rdkafka/bindings.rb index 1a0e88c9..435cba9f 100644 --- a/lib/rdkafka/bindings.rb +++ b/lib/rdkafka/bindings.rb @@ -255,10 +255,13 @@ class TopicPartitionList < FFI::Struct RD_KAFKA_VTYPE_TIMESTAMP = 8 RD_KAFKA_VTYPE_HEADER = 9 RD_KAFKA_VTYPE_HEADERS = 10 + RD_KAFKA_PURGE_F_QUEUE = 1 + RD_KAFKA_PURGE_F_INFLIGHT = 2 RD_KAFKA_MSG_F_COPY = 0x2 attach_function :rd_kafka_producev, [:pointer, :varargs], :int, blocking: true + attach_function :rd_kafka_purge, [:pointer, :int], :int, blocking: true callback :delivery_cb, [:pointer, :pointer, :pointer], :void attach_function :rd_kafka_conf_set_dr_msg_cb, [:pointer, :delivery_cb], :void diff --git a/lib/rdkafka/producer.rb b/lib/rdkafka/producer.rb index 61b60973..03fbe4df 100644 --- a/lib/rdkafka/producer.rb +++ b/lib/rdkafka/producer.rb @@ -105,6 +105,32 @@ def flush(timeout_ms=5_000) raise(error) end + # Purges the outgoing queue and releases all resources. + # + # Useful when closing the producer with outgoing messages to unstable clusters or when for + # any other reasons waiting cannot go on anymore. This purges both the queue and all the + # inflight requests + updates the delivery handles statuses so they can be materialized into + # `purge_queue` errors. + def purge + closed_producer_check(__method__) + + code = nil + + @native_kafka.with_inner do |inner| + code = Bindings.rd_kafka_purge( + inner, + Bindings::RD_KAFKA_PURGE_F_QUEUE | Bindings::RD_KAFKA_PURGE_F_INFLIGHT + ) + end + + code.zero? || raise(Rdkafka::RdkafkaError.new(code)) + + # Wait for the purge to affect everything + sleep(0.001) until flush(100) + + true + end + # Partition count for a given topic. # NOTE: If 'allow.auto.create.topics' is set to true in the broker, the topic will be auto-created after returning nil. # diff --git a/spec/rdkafka/producer_spec.rb b/spec/rdkafka/producer_spec.rb index 969c9d3c..7d9de11e 100644 --- a/spec/rdkafka/producer_spec.rb +++ b/spec/rdkafka/producer_spec.rb @@ -689,4 +689,69 @@ def call(_, handle) end end end + + describe '#purge' do + context 'when no outgoing messages' do + it { expect(producer.purge).to eq(true) } + end + + context 'when librdkafka purge returns an error' do + before { expect(Rdkafka::Bindings).to receive(:rd_kafka_purge).and_return(-153) } + + it 'expect to raise an error' do + expect { producer.purge }.to raise_error(Rdkafka::RdkafkaError, /retry/) + end + end + + context 'when there are outgoing things in the queue' do + let(:producer) do + rdkafka_producer_config( + "bootstrap.servers": "localhost:9093", + "message.timeout.ms": 2_000 + ).producer + end + + it "should should purge and move forward" do + producer.produce( + topic: "produce_test_topic", + payload: "payload headers" + ) + + expect(producer.purge).to eq(true) + expect(producer.flush(1_000)).to eq(true) + end + + it "should materialize the delivery handles" do + handle = producer.produce( + topic: "produce_test_topic", + payload: "payload headers" + ) + + expect(producer.purge).to eq(true) + + expect { handle.wait }.to raise_error(Rdkafka::RdkafkaError, /purge_queue/) + end + + context "when using delivery_callback" do + let(:delivery_reports) { [] } + + let(:delivery_callback) do + ->(delivery_report) { delivery_reports << delivery_report } + end + + before { producer.delivery_callback = delivery_callback } + + it "should run the callback" do + handle = producer.produce( + topic: "produce_test_topic", + payload: "payload headers" + ) + + expect(producer.purge).to eq(true) + # queue purge + expect(delivery_reports[0].error).to eq(-152) + end + end + end + end end