From 91cfdd5b37c4206a8c0983663b4f8be0c21d0a9b Mon Sep 17 00:00:00 2001 From: Marcelo Lazaroni <lazaronijunior@gmail.com> Date: Fri, 8 Mar 2024 23:32:08 +0000 Subject: [PATCH] Clean-up partition list on storeoffsetMessage and storeOffsets --- src/Kafka/Consumer.hs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Kafka/Consumer.hs b/src/Kafka/Consumer.hs index 6b3a14c..f84484b 100644 --- a/src/Kafka/Consumer.hs +++ b/src/Kafka/Consumer.hs @@ -82,7 +82,7 @@ import Data.Set (Set) import qualified Data.Set as Set import qualified Data.Text as Text import Foreign hiding (void) -import Kafka.Consumer.Convert (fromMessagePtr, fromNativeTopicPartitionList'', offsetCommitToBool, offsetToInt64, toMap, toNativeTopicPartitionList, toNativeTopicPartitionList', toNativeTopicPartitionListNoDispose, topicPartitionFromMessageForCommit) +import Kafka.Consumer.Convert (fromMessagePtr, fromNativeTopicPartitionList'', offsetCommitToBool, offsetToInt64, toMap, toNativeTopicPartitionList, toNativeTopicPartitionList', topicPartitionFromMessageForCommit) import Kafka.Consumer.Types (KafkaConsumer (..)) import Kafka.Internal.RdKafka (RdKafkaRespErrT (..), RdKafkaTopicPartitionListTPtr, RdKafkaTypeT (..), rdKafkaSeekPartitions, rdKafkaErrorDestroy, rdKafkaErrorCode, newRdKafkaT, newRdKafkaTopicPartitionListT, newRdKafkaTopicT, rdKafkaAssign, rdKafkaAssignment, rdKafkaCommit, rdKafkaCommitted, rdKafkaConfSetDefaultTopicConf, rdKafkaConsumeBatchQueue, rdKafkaConsumeQueue, rdKafkaConsumerClose, rdKafkaConsumerPoll, rdKafkaOffsetsStore, rdKafkaPausePartitions, rdKafkaPollSetConsumer, rdKafkaPosition, rdKafkaQueueDestroy, rdKafkaQueueNew, rdKafkaResumePartitions, rdKafkaSeek, rdKafkaSetLogLevel, rdKafkaSubscribe, rdKafkaSubscription, rdKafkaTopicConfDup, rdKafkaTopicPartitionListAdd) import Kafka.Internal.Setup (CallbackPollStatus (..), Kafka (..), KafkaConf (..), KafkaProps (..), TopicConf (..), TopicProps (..), getKafkaConf, getRdKafka, kafkaConf, topicConf, Callback(..)) @@ -185,7 +185,7 @@ storeOffsetMessage :: MonadIO m -> ConsumerRecord k v -> m (Maybe KafkaError) storeOffsetMessage k m = - liftIO $ toNativeTopicPartitionListNoDispose [topicPartitionFromMessageForCommit m] >>= commitOffsetsStore k + liftIO $ toNativeTopicPartitionList [topicPartitionFromMessageForCommit m] >>= commitOffsetsStore k -- | Stores offsets locally storeOffsets :: MonadIO m @@ -193,7 +193,7 @@ storeOffsets :: MonadIO m -> [TopicPartition] -> m (Maybe KafkaError) storeOffsets k ps = - liftIO $ toNativeTopicPartitionListNoDispose ps >>= commitOffsetsStore k + liftIO $ toNativeTopicPartitionList ps >>= commitOffsetsStore k -- | Commit offsets for all currently assigned partitions. commitAllOffsets :: MonadIO m