Skip to content

Commit

Permalink
upstream merge
Browse files Browse the repository at this point in the history
  • Loading branch information
mensfeld committed Nov 3, 2023
2 parents d4b1fa1 + 7fc07fa commit d761230
Show file tree
Hide file tree
Showing 27 changed files with 65 additions and 67 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# Ignore bundler config.
/.bundle

Gemfile.lock
ext/ports
ext/tmp
Expand All @@ -6,3 +9,4 @@ ext/librdkafka.*
.yardoc
doc
coverage
vendor
1 change: 1 addition & 0 deletions .rspec
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
--require spec_helper
--format documentation
5 changes: 5 additions & 0 deletions lib/rdkafka.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

require "rdkafka/version"

require "rdkafka/helpers/time"
require "rdkafka/abstract_handle"
require "rdkafka/admin"
require "rdkafka/admin/create_topic_handle"
Expand All @@ -24,3 +25,7 @@
require "rdkafka/producer"
require "rdkafka/producer/delivery_handle"
require "rdkafka/producer/delivery_report"

# Main Rdkafka namespace of this gem
module Rdkafka
end
59 changes: 37 additions & 22 deletions lib/rdkafka/abstract_handle.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,37 @@
require "ffi"

module Rdkafka
# This class serves as an abstract base class to represent handles within the Rdkafka module.
# As a subclass of `FFI::Struct`, this class provides a blueprint for other specific handle
# classes to inherit from, ensuring they adhere to a particular structure and behavior.
#
# Subclasses must define their own layout, and the layout must start with:
#
# layout :pending, :bool,
# :response, :int
class AbstractHandle < FFI::Struct
# Subclasses must define their own layout, and the layout must start with:
#
# layout :pending, :bool,
# :response, :int
include Helpers::Time

# Registry for registering all the handles.
REGISTRY = {}

CURRENT_TIME = -> { Process.clock_gettime(Process::CLOCK_MONOTONIC) }.freeze

private_constant :CURRENT_TIME
class << self
# Adds handle to the register
#
# @param handle [AbstractHandle] any handle we want to register
def register(handle)
address = handle.to_ptr.address
REGISTRY[address] = handle
end

def self.register(handle)
address = handle.to_ptr.address
REGISTRY[address] = handle
# Removes handle from the register based on the handle address
#
# @param address [Integer] address of the registered handle we want to remove
def remove(address)
REGISTRY.delete(address)
end
end

def self.remove(address)
REGISTRY.delete(address)
end

# Whether the handle is still pending.
#
Expand All @@ -32,27 +43,31 @@ def pending?
end

# Wait for the operation to complete or raise an error if this takes longer than the timeout.
# If there is a timeout this does not mean the operation failed, rdkafka might still be working on the operation.
# In this case it is possible to call wait again.
# If there is a timeout this does not mean the operation failed, rdkafka might still be working
# on the operation. In this case it is possible to call wait again.
#
# @param max_wait_timeout [Numeric, nil] Amount of time to wait before timing out. If this is nil it does not time out.
# @param wait_timeout [Numeric] Amount of time we should wait before we recheck if the operation has completed
# @param max_wait_timeout [Numeric, nil] Amount of time to wait before timing out.
# If this is nil it does not time out.
# @param wait_timeout [Numeric] Amount of time we should wait before we recheck if the
# operation has completed
# @param raise_response_error [Boolean] should we raise error when waiting finishes
#
# @return [Object] Operation-specific result
#
# @raise [RdkafkaError] When the operation failed
# @raise [WaitTimeoutError] When the timeout has been reached and the handle is still pending
#
# @return [Object] Operation-specific result
def wait(max_wait_timeout: 60, wait_timeout: 0.1, raise_response_error: true)
timeout = if max_wait_timeout
CURRENT_TIME.call + max_wait_timeout
monotonic_now + max_wait_timeout
else
nil
end
loop do
if pending?
if timeout && timeout <= CURRENT_TIME.call
raise WaitTimeoutError.new("Waiting for #{operation_name} timed out after #{max_wait_timeout} seconds")
if timeout && timeout <= monotonic_now
raise WaitTimeoutError.new(
"Waiting for #{operation_name} timed out after #{max_wait_timeout} seconds"
)
end
sleep wait_timeout
elsif self[:response] != 0 && raise_response_error
Expand Down
5 changes: 1 addition & 4 deletions lib/rdkafka/consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ module Rdkafka
# `each_slice` to consume batches of messages.
class Consumer
include Enumerable
include Helpers::Time

# @private
def initialize(native_kafka)
Expand Down Expand Up @@ -673,10 +674,6 @@ def each_batch(max_items: 100, bytes_threshold: Float::INFINITY, timeout_ms: 250
end

private
def monotonic_now
# needed because Time.now can go backwards
Process.clock_gettime(Process::CLOCK_MONOTONIC)
end

def closed_consumer_check(method)
raise Rdkafka::ClosedConsumerError.new(method) if closed?
Expand Down
14 changes: 14 additions & 0 deletions lib/rdkafka/helpers/time.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# frozen_string_literal: true

module Rdkafka
# Namespace for some small utilities used in multiple components
module Helpers
# Time related methods used across Karafka
module Time
# @return [Float] current monotonic time in seconds with microsecond precision
def monotonic_now
::Process.clock_gettime(::Process::CLOCK_MONOTONIC)
end
end
end
end
9 changes: 3 additions & 6 deletions lib/rdkafka/producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
module Rdkafka
# A producer for Kafka messages. To create a producer set up a {Config} and call {Config#producer producer} on that.
class Producer
include Helpers::Time

# Cache partitions count for 30 seconds
PARTITIONS_COUNT_TTL = 30

Expand Down Expand Up @@ -177,7 +179,7 @@ def purge
# This prevents us in case someone uses `partition_key` from querying for the count with
# each message. Instead we query once every 30 seconds at most
#
# @param topic [String] topic name
# @param [String] topic name
# @return [Integer] partition count for a given topic
def partition_count(topic)
closed_producer_check(__method__)
Expand Down Expand Up @@ -308,11 +310,6 @@ def arity(callback)

private

def monotonic_now
# needed because Time.now can go backwards
Process.clock_gettime(Process::CLOCK_MONOTONIC)
end

def closed_producer_check(method)
raise Rdkafka::ClosedProducerError.new(method) if closed?
end
Expand Down
2 changes: 0 additions & 2 deletions spec/rdkafka/abstract_handle_spec.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
# frozen_string_literal: true

require "spec_helper"

describe Rdkafka::AbstractHandle do
let(:response) { 0 }
let(:result) { -1 }
Expand Down
2 changes: 0 additions & 2 deletions spec/rdkafka/admin/create_topic_handle_spec.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
# frozen_string_literal: true

require "spec_helper"

describe Rdkafka::Admin::CreateTopicHandle do
let(:response) { 0 }

Expand Down
2 changes: 0 additions & 2 deletions spec/rdkafka/admin/create_topic_report_spec.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
# frozen_string_literal: true

require "spec_helper"

describe Rdkafka::Admin::CreateTopicReport do
subject { Rdkafka::Admin::CreateTopicReport.new(
FFI::MemoryPointer.from_string("error string"),
Expand Down
2 changes: 0 additions & 2 deletions spec/rdkafka/admin/delete_topic_handle_spec.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
# frozen_string_literal: true

require "spec_helper"

describe Rdkafka::Admin::DeleteTopicHandle do
let(:response) { 0 }

Expand Down
2 changes: 0 additions & 2 deletions spec/rdkafka/admin/delete_topic_report_spec.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
# frozen_string_literal: true

require "spec_helper"

describe Rdkafka::Admin::DeleteTopicReport do
subject { Rdkafka::Admin::DeleteTopicReport.new(
FFI::MemoryPointer.from_string("error string"),
Expand Down
1 change: 0 additions & 1 deletion spec/rdkafka/admin_spec.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# frozen_string_literal: true

require "spec_helper"
require "ostruct"

describe Rdkafka::Admin do
Expand Down
1 change: 0 additions & 1 deletion spec/rdkafka/bindings_spec.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# frozen_string_literal: true

require "spec_helper"
require 'zlib'

describe Rdkafka::Bindings do
Expand Down
2 changes: 0 additions & 2 deletions spec/rdkafka/callbacks_spec.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
# frozen_string_literal: true

require "spec_helper"

describe Rdkafka::Callbacks do

# The code in the call back functions is 100% covered by other specs. Due to
Expand Down
2 changes: 0 additions & 2 deletions spec/rdkafka/config_spec.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
# frozen_string_literal: true

require "spec_helper"

describe Rdkafka::Config do
context "logger" do
it "should have a default logger" do
Expand Down
2 changes: 0 additions & 2 deletions spec/rdkafka/consumer/headers_spec.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
# frozen_string_literal: true

require "spec_helper"

describe Rdkafka::Consumer::Headers do
let(:headers) do
{ # Note String keys!
Expand Down
2 changes: 0 additions & 2 deletions spec/rdkafka/consumer/message_spec.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
# frozen_string_literal: true

require "spec_helper"

describe Rdkafka::Consumer::Message do
let(:native_client) { new_native_client }
let(:native_topic) { new_native_topic(native_client: native_client) }
Expand Down
2 changes: 0 additions & 2 deletions spec/rdkafka/consumer/partition_spec.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
# frozen_string_literal: true

require "spec_helper"

describe Rdkafka::Consumer::Partition do
let(:offset) { 100 }
let(:err) { 0 }
Expand Down
2 changes: 0 additions & 2 deletions spec/rdkafka/consumer/topic_partition_list_spec.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
# frozen_string_literal: true

require "spec_helper"

describe Rdkafka::Consumer::TopicPartitionList do
it "should create a new list and add unassigned topics" do
list = Rdkafka::Consumer::TopicPartitionList.new
Expand Down
1 change: 0 additions & 1 deletion spec/rdkafka/consumer_spec.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# frozen_string_literal: true

require "spec_helper"
require "ostruct"
require 'securerandom'

Expand Down
2 changes: 0 additions & 2 deletions spec/rdkafka/error_spec.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
# frozen_string_literal: true

require "spec_helper"

describe Rdkafka::RdkafkaError do
it "should raise a type error for a nil response" do
expect {
Expand Down
1 change: 0 additions & 1 deletion spec/rdkafka/metadata_spec.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# frozen_string_literal: true

require "spec_helper"
require "securerandom"

describe Rdkafka::Metadata do
Expand Down
2 changes: 0 additions & 2 deletions spec/rdkafka/native_kafka_spec.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
# frozen_string_literal: true

require "spec_helper"

describe Rdkafka::NativeKafka do
let(:config) { rdkafka_producer_config }
let(:native) { config.send(:native_kafka, config.send(:native_config), :rd_kafka_producer) }
Expand Down
2 changes: 0 additions & 2 deletions spec/rdkafka/producer/delivery_handle_spec.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
# frozen_string_literal: true

require "spec_helper"

describe Rdkafka::Producer::DeliveryHandle do
let(:response) { 0 }

Expand Down
2 changes: 0 additions & 2 deletions spec/rdkafka/producer/delivery_report_spec.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
# frozen_string_literal: true

require "spec_helper"

describe Rdkafka::Producer::DeliveryReport do
subject { Rdkafka::Producer::DeliveryReport.new(2, 100, "topic", -1) }

Expand Down
1 change: 0 additions & 1 deletion spec/rdkafka/producer_spec.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# frozen_string_literal: true

require "spec_helper"
require "zlib"

describe Rdkafka::Producer do
Expand Down

0 comments on commit d761230

Please sign in to comment.