Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement purge for producer #23

Merged
merged 2 commits into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# 0.13.6
* Provide `#purge` to remove any outstanding requests from the producer.
* Fix `#flush` does not handle the timeouts errors by making it return true if all flushed or false if failed. We do **not** raise an exception here to keep it backwards compatible.

# 0.13.5
Expand Down
3 changes: 3 additions & 0 deletions lib/rdkafka/bindings.rb
Original file line number Diff line number Diff line change
Expand Up @@ -255,10 +255,13 @@ class TopicPartitionList < FFI::Struct
RD_KAFKA_VTYPE_TIMESTAMP = 8
RD_KAFKA_VTYPE_HEADER = 9
RD_KAFKA_VTYPE_HEADERS = 10
RD_KAFKA_PURGE_F_QUEUE = 1
RD_KAFKA_PURGE_F_INFLIGHT = 2

RD_KAFKA_MSG_F_COPY = 0x2

attach_function :rd_kafka_producev, [:pointer, :varargs], :int, blocking: true
attach_function :rd_kafka_purge, [:pointer, :int], :int, blocking: true
callback :delivery_cb, [:pointer, :pointer, :pointer], :void
attach_function :rd_kafka_conf_set_dr_msg_cb, [:pointer, :delivery_cb], :void

Expand Down
26 changes: 26 additions & 0 deletions lib/rdkafka/producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,32 @@ def flush(timeout_ms=5_000)
raise(error)
end

# Purges the outgoing queue and releases all resources.
#
# Useful when closing the producer with outgoing messages to unstable clusters or when for
# any other reasons waiting cannot go on anymore. This purges both the queue and all the
# inflight requests + updates the delivery handles statuses so they can be materialized into
# `purge_queue` errors.
def purge
closed_producer_check(__method__)

code = nil

@native_kafka.with_inner do |inner|
code = Bindings.rd_kafka_purge(
inner,
Bindings::RD_KAFKA_PURGE_F_QUEUE | Bindings::RD_KAFKA_PURGE_F_INFLIGHT
)
end

code.zero? || Rdkafka::RdkafkaError.new(code)

# Wait for the purge to affect everything
sleep(0.001) until flush(100)

true
end

# Partition count for a given topic.
# NOTE: If 'allow.auto.create.topics' is set to true in the broker, the topic will be auto-created after returning nil.
#
Expand Down
57 changes: 57 additions & 0 deletions spec/rdkafka/producer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -689,4 +689,61 @@ def call(_, handle)
end
end
end

describe '#purge' do
context 'when no outgoing messages' do
it { expect(producer.purge).to eq(true) }
end

context 'when there are outgoing things in the queue' do
let(:producer) do
rdkafka_producer_config(
"bootstrap.servers": "localhost:9093",
"message.timeout.ms": 2_000
).producer
end

it "should should purge and move forward" do
producer.produce(
topic: "produce_test_topic",
payload: "payload headers"
)

expect(producer.purge).to eq(true)
expect(producer.flush(1_000)).to eq(true)
end

it "should materialize the delivery handles" do
handle = producer.produce(
topic: "produce_test_topic",
payload: "payload headers"
)

expect(producer.purge).to eq(true)

expect { handle.wait }.to raise_error(Rdkafka::RdkafkaError, /purge_queue/)
end

context "when using delivery_callback" do
let(:delivery_reports) { [] }

let(:delivery_callback) do
->(delivery_report) { delivery_reports << delivery_report }
end

before { producer.delivery_callback = delivery_callback }

it "should run the callback" do
handle = producer.produce(
topic: "produce_test_topic",
payload: "payload headers"
)

expect(producer.purge).to eq(true)
# queue purge
expect(delivery_reports[0].error).to eq(-152)
end
end
end
end
end