Skip to content

Commit

Permalink
Merge branch 'main' of github.com:appsignal/rdkafka-ruby
Browse files Browse the repository at this point in the history
  • Loading branch information
mensfeld committed Nov 4, 2023
2 parents d761230 + 9709183 commit 476b118
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 87 deletions.
6 changes: 5 additions & 1 deletion lib/rdkafka.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
# frozen_string_literal: true

require "rdkafka/version"
require "logger"
require "objspace"
require "ffi"
require "json"

require "rdkafka/version"
require "rdkafka/helpers/time"
require "rdkafka/abstract_handle"
require "rdkafka/admin"
Expand Down
2 changes: 0 additions & 2 deletions lib/rdkafka/abstract_handle.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
# frozen_string_literal: true

require "ffi"

module Rdkafka
# This class serves as an abstract base class to represent handles within the Rdkafka module.
# As a subclass of `FFI::Struct`, this class provides a blueprint for other specific handle
Expand Down
13 changes: 6 additions & 7 deletions lib/rdkafka/admin.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
# frozen_string_literal: true

require "objspace"

module Rdkafka
class Admin
# @private
Expand Down Expand Up @@ -30,11 +28,12 @@ def closed?

# Create a topic with the given partition count and replication factor
#
# @return [CreateTopicHandle] Create topic handle that can be used to wait for the result of
# creating the topic
#
# @raise [ConfigError] When the partition count or replication factor are out of valid range
# @raise [RdkafkaError] When the topic name is invalid or the topic already exists
# @raise [RdkafkaError] When the topic configuration is invalid
#
# @return [CreateTopicHandle] Create topic handle that can be used to wait for the result of creating the topic
def create_topic(topic_name, partition_count, replication_factor, topic_config={})
closed_admin_check(__method__)

Expand Down Expand Up @@ -107,11 +106,11 @@ def create_topic(topic_name, partition_count, replication_factor, topic_config={
create_topic_handle
end

# Delete the named topic
# Deletes the named topic
#
# @return [DeleteTopicHandle] Delete topic handle that can be used to wait for the result of
# deleting the topic
# @raise [RdkafkaError] When the topic name is invalid or the topic does not exist
#
# @return [DeleteTopicHandle] Delete topic handle that can be used to wait for the result of deleting the topic
def delete_topic(topic_name)
closed_admin_check(__method__)

Expand Down
4 changes: 0 additions & 4 deletions lib/rdkafka/bindings.rb
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
# frozen_string_literal: true

require "ffi"
require "json"
require "logger"

module Rdkafka
# @private
#
Expand Down
18 changes: 8 additions & 10 deletions lib/rdkafka/config.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
# frozen_string_literal: true

require "logger"

module Rdkafka
# Configuration for a Kafka consumer or producer. You can create an instance and use
# the consumer and producer methods to create a client. Documentation of the available
Expand Down Expand Up @@ -142,12 +140,12 @@ def consumer_rebalance_listener=(listener)
@consumer_rebalance_listener = listener
end

# Create a consumer with this configuration.
# Creates a consumer with this configuration.
#
# @return [Consumer] The created consumer
#
# @raise [ConfigError] When the configuration contains invalid options
# @raise [ClientCreationError] When the native client cannot be created
#
# @return [Consumer] The created consumer
def consumer
opaque = Opaque.new
config = native_config(opaque)
Expand Down Expand Up @@ -175,10 +173,10 @@ def consumer

# Create a producer with this configuration.
#
# @return [Producer] The created producer
#
# @raise [ConfigError] When the configuration contains invalid options
# @raise [ClientCreationError] When the native client cannot be created
#
# @return [Producer] The created producer
def producer
# Create opaque
opaque = Opaque.new
Expand All @@ -200,12 +198,12 @@ def producer
end
end

# Create an admin instance with this configuration.
# Creates an admin instance with this configuration.
#
# @return [Admin] The created admin instance
#
# @raise [ConfigError] When the configuration contains invalid options
# @raise [ClientCreationError] When the native client cannot be created
#
# @return [Admin] The created admin instance
def admin
opaque = Opaque.new
config = native_config(opaque)
Expand Down
72 changes: 23 additions & 49 deletions lib/rdkafka/consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,11 @@ def closed?
@native_kafka.closed?
end

# Subscribe to one or more topics letting Kafka handle partition assignments.
# Subscribes to one or more topics letting Kafka handle partition assignments.
#
# @param topics [Array<String>] One or more topic names
#
# @raise [RdkafkaError] When subscribing fails
#
# @return [nil]
# @raise [RdkafkaError] When subscribing fails
def subscribe(*topics)
closed_consumer_check(__method__)

Expand All @@ -84,9 +82,8 @@ def subscribe(*topics)

# Unsubscribe from all subscribed topics.
#
# @raise [RdkafkaError] When unsubscribing fails
#
# @return [nil]
# @raise [RdkafkaError] When unsubscribing fails
def unsubscribe
closed_consumer_check(__method__)

Expand All @@ -102,10 +99,8 @@ def unsubscribe
# Pause producing or consumption for the provided list of partitions
#
# @param list [TopicPartitionList] The topic with partitions to pause
#
# @raise [RdkafkaTopicPartitionListError] When pausing subscription fails.
#
# @return [nil]
# @raise [RdkafkaTopicPartitionListError] When pausing subscription fails.
def pause(list)
closed_consumer_check(__method__)

Expand All @@ -129,13 +124,11 @@ def pause(list)
end
end

# Resume producing consumption for the provided list of partitions
# Resumes producing consumption for the provided list of partitions
#
# @param list [TopicPartitionList] The topic with partitions to pause
#
# @raise [RdkafkaError] When resume subscription fails.
#
# @return [nil]
# @raise [RdkafkaError] When resume subscription fails.
def resume(list)
closed_consumer_check(__method__)

Expand All @@ -158,11 +151,10 @@ def resume(list)
end
end

# Return the current subscription to topics and partitions
#
# @raise [RdkafkaError] When getting the subscription fails.
# Returns the current subscription to topics and partitions
#
# @return [TopicPartitionList]
# @raise [RdkafkaError] When getting the subscription fails.
def subscription
closed_consumer_check(__method__)

Expand All @@ -185,7 +177,6 @@ def subscription
# Atomic assignment of partitions to consume
#
# @param list [TopicPartitionList] The topic with partitions to assign
#
# @raise [RdkafkaError] When assigning fails
def assign(list)
closed_consumer_check(__method__)
Expand All @@ -209,9 +200,8 @@ def assign(list)

# Returns the current partition assignment.
#
# @raise [RdkafkaError] When getting the assignment fails.
#
# @return [TopicPartitionList]
# @raise [RdkafkaError] When getting the assignment fails.
def assignment
closed_consumer_check(__method__)

Expand Down Expand Up @@ -245,14 +235,14 @@ def assignment_lost?
end

# Return the current committed offset per partition for this consumer group.
# The offset field of each requested partition will either be set to stored offset or to -1001 in case there was no stored offset for that partition.
# The offset field of each requested partition will either be set to stored offset or to -1001
# in case there was no stored offset for that partition.
#
# @param list [TopicPartitionList, nil] The topic with partitions to get the offsets for or nil to use the current subscription.
# @param list [TopicPartitionList, nil] The topic with partitions to get the offsets for or nil
# to use the current subscription.
# @param timeout_ms [Integer] The timeout for fetching this information.
#
# @raise [RdkafkaError] When getting the committed positions fails.
#
# @return [TopicPartitionList]
# @raise [RdkafkaError] When getting the committed positions fails.
def committed(list=nil, timeout_ms=1200)
closed_consumer_check(__method__)

Expand Down Expand Up @@ -308,10 +298,8 @@ def position(list=nil)
# @param topic [String] The topic to query
# @param partition [Integer] The partition to query
# @param timeout_ms [Integer] The timeout for querying the broker
#
# @raise [RdkafkaError] When querying the broker fails.
#
# @return [Integer] The low and high watermark
# @raise [RdkafkaError] When querying the broker fails.
def query_watermark_offsets(topic, partition, timeout_ms=200)
closed_consumer_check(__method__)

Expand Down Expand Up @@ -344,10 +332,9 @@ def query_watermark_offsets(topic, partition, timeout_ms=200)
#
# @param topic_partition_list [TopicPartitionList] The list to calculate lag for.
# @param watermark_timeout_ms [Integer] The timeout for each query watermark call.
#
# @return [Hash<String, Hash<Integer, Integer>>] A hash containing all topics with the lag
# per partition
# @raise [RdkafkaError] When querying the broker fails.
#
# @return [Hash<String, Hash<Integer, Integer>>] A hash containing all topics with the lag per partition
def lag(topic_partition_list, watermark_timeout_ms=100)
out = {}

Expand Down Expand Up @@ -396,10 +383,8 @@ def member_id
# When using this `enable.auto.offset.store` should be set to `false` in the config.
#
# @param message [Rdkafka::Consumer::Message] The message which offset will be stored
#
# @raise [RdkafkaError] When storing the offset fails
#
# @return [nil]
# @raise [RdkafkaError] When storing the offset fails
def store_offset(message)
closed_consumer_check(__method__)

Expand Down Expand Up @@ -431,10 +416,8 @@ def store_offset(message)
# message at the given offset.
#
# @param message [Rdkafka::Consumer::Message] The message to which to seek
#
# @raise [RdkafkaError] When seeking fails
#
# @return [nil]
# @raise [RdkafkaError] When seeking fails
def seek(message)
closed_consumer_check(__method__)

Expand Down Expand Up @@ -504,10 +487,8 @@ def offsets_for_times(list, timeout_ms = 1000)
#
# @param list [TopicPartitionList,nil] The topic with partitions to commit
# @param async [Boolean] Whether to commit async or wait for the commit to finish
#
# @raise [RdkafkaError] When committing fails
#
# @return [nil]
# @raise [RdkafkaError] When committing fails
def commit(list=nil, async=false)
closed_consumer_check(__method__)

Expand All @@ -533,10 +514,8 @@ def commit(list=nil, async=false)
# Poll for the next message on one of the subscribed topics
#
# @param timeout_ms [Integer] Timeout of this poll
#
# @raise [RdkafkaError] When polling fails
#
# @return [Message, nil] A message or nil if there was no new message within the timeout
# @raise [RdkafkaError] When polling fails
def poll(timeout_ms)
closed_consumer_check(__method__)

Expand Down Expand Up @@ -565,14 +544,11 @@ def poll(timeout_ms)
# Poll for new messages and yield for each received one. Iteration
# will end when the consumer is closed.
#
# If `enable.partition.eof` is turned on in the config this will raise an
# error when an eof is reached, so you probably want to disable that when
# using this method of iteration.
# If `enable.partition.eof` is turned on in the config this will raise an error when an eof is
# reached, so you probably want to disable that when using this method of iteration.
#
# @raise [RdkafkaError] When polling fails
#
# @yieldparam message [Message] Received message
#
# @return [nil]
def each
loop do
Expand Down Expand Up @@ -625,9 +601,7 @@ def each
# that you may or may not see again.
#
# @param max_items [Integer] Maximum size of the yielded array of messages
#
# @param bytes_threshold [Integer] Threshold number of total message bytes in the yielded array of messages
#
# @param timeout_ms [Integer] max time to wait for up to max_items
#
# @raise [RdkafkaError] When polling fails
Expand Down
6 changes: 2 additions & 4 deletions lib/rdkafka/consumer/headers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@ def [](key)

# Reads a librdkafka native message's headers and returns them as a Ruby Hash
#
# @param [librdkakfa message] native_message
# @private
#
# @param [librdkakfa message] native_message
# @return [Hash<String, String>] headers Hash for the native_message
#
# @raise [Rdkafka::RdkafkaError] when fail to read headers
#
# @private
def self.from_native(native_message)
headers_ptrptr = FFI::MemoryPointer.new(:pointer)
err = Rdkafka::Bindings.rd_kafka_message_headers(native_message, headers_ptrptr)
Expand Down
16 changes: 6 additions & 10 deletions lib/rdkafka/producer.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
# frozen_string_literal: true

require "objspace"

module Rdkafka
# A producer for Kafka messages. To create a producer set up a {Config} and call {Config#producer producer} on that.
class Producer
Expand Down Expand Up @@ -169,18 +167,16 @@ def purge
end

# Partition count for a given topic.
# NOTE: If 'allow.auto.create.topics' is set to true in the broker, the topic will be auto-created after returning nil.
#
# @param topic [String] The topic name.
# @return [Integer] partition count for a given topic
#
# @return partition count [Integer,nil]
#
# We cache the partition count for a given topic for given time
# This prevents us in case someone uses `partition_key` from querying for the count with
# each message. Instead we query once every 30 seconds at most
# @note If 'allow.auto.create.topics' is set to true in the broker, the topic will be
# auto-created after returning nil.
#
# @param [String] topic name
# @return [Integer] partition count for a given topic
# @note We cache the partition count for a given topic for given time.
# This prevents us in case someone uses `partition_key` from querying for the count with
# each message. Instead we query once every 30 seconds at most
def partition_count(topic)
closed_producer_check(__method__)

Expand Down

0 comments on commit 476b118

Please sign in to comment.