From 580ae9f7492abde47eea2feae5352d3507e42239 Mon Sep 17 00:00:00 2001 From: Robin Palotai Date: Sun, 26 Sep 2021 08:50:29 +0200 Subject: [PATCH] Can alter offsets on RebalanceBeforeAssign. This is useful if you don't want the broker to manage group offsets on your behalf, but you rather store the offsets locally (typically in a transaction together with the effects of the processing of that batch of messages). See the documentation of rd_kafka_conf_set_rebalance_cb (https://docs.confluent.io/2.0.0/clients/librdkafka/rdkafka_8h.html#a10db731dc1a295bd9884e4f8cb199311). --- src/Kafka/Consumer.hs | 2 +- src/Kafka/Consumer/Callbacks.hs | 29 +++++++++++++++++++---------- src/Kafka/Consumer/Types.hs | 10 +++++----- 3 files changed, 25 insertions(+), 16 deletions(-) diff --git a/src/Kafka/Consumer.hs b/src/Kafka/Consumer.hs index 8aa04fc..1e99f12 100644 --- a/src/Kafka/Consumer.hs +++ b/src/Kafka/Consumer.hs @@ -118,7 +118,7 @@ newConsumer :: MonadIO m -> m (Either KafkaError KafkaConsumer) newConsumer props (Subscription ts tp) = liftIO $ do let cp = case cpCallbackPollMode props of - CallbackPollModeAsync -> setCallback (rebalanceCallback (\_ _ -> return ())) <> props + CallbackPollModeAsync -> setCallback (rebalanceCallback (\_ _ -> return Nothing)) <> props CallbackPollModeSync -> props kc@(KafkaConf kc' qref _) <- newConsumerConf cp tp' <- topicConf (TopicProps tp) diff --git a/src/Kafka/Consumer/Callbacks.hs b/src/Kafka/Consumer/Callbacks.hs index 347dd76..c4860be 100644 --- a/src/Kafka/Consumer/Callbacks.hs +++ b/src/Kafka/Consumer/Callbacks.hs @@ -11,7 +11,7 @@ import Control.Monad (forM_, void) import Foreign.ForeignPtr (newForeignPtr_) import Foreign.Ptr (nullPtr) import Kafka.Callbacks as X -import Kafka.Consumer.Convert (fromNativeTopicPartitionList', fromNativeTopicPartitionList'') +import Kafka.Consumer.Convert (fromNativeTopicPartitionList', fromNativeTopicPartitionList'', toNativeTopicPartitionList) import Kafka.Consumer.Types (KafkaConsumer (..), RebalanceEvent (..), TopicPartition (..)) import Kafka.Internal.RdKafka import Kafka.Internal.Setup (HasKafka (..), HasKafkaConf (..), Kafka (..), KafkaConf (..), getRdMsgQueue, Callback (..)) @@ -19,8 +19,12 @@ import Kafka.Types (KafkaError (..), PartitionId (..), TopicName (..) import qualified Data.Text as Text --- | Sets a callback that is called when rebalance is needed. -rebalanceCallback :: (KafkaConsumer -> RebalanceEvent -> IO ()) -> Callback +-- | Sets a callback that is called when rebalance is happening. +-- +-- If you want to store the offsets locally, return the TopicPartition list with +-- modified offsets from RebalanceBeforeAssign. Callback return value on other +-- events is ignored. +rebalanceCallback :: (KafkaConsumer -> RebalanceEvent -> IO (Maybe [TopicPartition])) -> Callback rebalanceCallback callback = Callback $ \kc@(KafkaConf con _ _) -> rdKafkaConfSetRebalanceCb con (realCb kc) where @@ -54,19 +58,24 @@ redirectPartitionQueue (Kafka k) (TopicName t) (PartitionId p) q = do Nothing -> return () Just pq -> rdKafkaQueueForward pq q -setRebalanceCallback :: (KafkaConsumer -> RebalanceEvent -> IO ()) +setRebalanceCallback :: (KafkaConsumer -> RebalanceEvent -> IO (Maybe [TopicPartition])) -> KafkaConsumer -> KafkaError -> RdKafkaTopicPartitionListTPtr -> IO () setRebalanceCallback f k e pls = do ps <- fromNativeTopicPartitionList'' pls - let assignment = (tpTopicName &&& tpPartition) <$> ps let (Kafka kptr) = getKafka k case e of KafkaResponseError RdKafkaRespErrAssignPartitions -> do - f k (RebalanceBeforeAssign assignment) - void $ rdKafkaAssign kptr pls + -- Consumer may want to alter the offsets if they are stored locally. + mbAltered <- f k (RebalanceBeforeAssign ps) + (pls', assigned) <- case mbAltered of + Nothing -> pure (pls, ps) + Just alt -> do + pls' <- toNativeTopicPartitionList alt + pure (pls', alt) + void $ rdKafkaAssign kptr pls' mbq <- getRdMsgQueue $ getKafkaConf k case mbq of @@ -80,10 +89,10 @@ setRebalanceCallback f k e pls = do forM_ ps (\tp -> redirectPartitionQueue (getKafka k) (tpTopicName tp) (tpPartition tp) mq) void $ rdKafkaResumePartitions kptr pls - f k (RebalanceAssign assignment) + void $ f k (RebalanceAssign assigned) KafkaResponseError RdKafkaRespErrRevokePartitions -> do - f k (RebalanceBeforeRevoke assignment) + void $ f k (RebalanceBeforeRevoke ps) void $ newForeignPtr_ nullPtr >>= rdKafkaAssign kptr - f k (RebalanceRevoke assignment) + void $ f k (RebalanceRevoke ps) x -> error $ "Rebalance: UNKNOWN response: " <> show x diff --git a/src/Kafka/Consumer/Types.hs b/src/Kafka/Consumer/Types.hs index e0da633..3bdb5dd 100644 --- a/src/Kafka/Consumer/Types.hs +++ b/src/Kafka/Consumer/Types.hs @@ -72,20 +72,20 @@ newtype ConsumerGroupId = ConsumerGroupId newtype Offset = Offset { unOffset :: Int64 } deriving (Show, Eq, Ord, Read, Generic) -- | Where to reset the offset when there is no initial offset in Kafka --- +-- -- See data OffsetReset = Earliest | Latest deriving (Show, Eq, Generic) -- | A set of events which happen during the rebalancing process data RebalanceEvent = -- | Happens before Kafka Client confirms new assignment - RebalanceBeforeAssign [(TopicName, PartitionId)] + RebalanceBeforeAssign [TopicPartition] -- | Happens after the new assignment is confirmed - | RebalanceAssign [(TopicName, PartitionId)] + | RebalanceAssign [TopicPartition] -- | Happens before Kafka Client confirms partitions rejection - | RebalanceBeforeRevoke [(TopicName, PartitionId)] + | RebalanceBeforeRevoke [TopicPartition] -- | Happens after the rejection is confirmed - | RebalanceRevoke [(TopicName, PartitionId)] + | RebalanceRevoke [TopicPartition] deriving (Eq, Show, Generic) -- | The partition offset