Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix monoid instance for consumer. #181

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/Kafka/Consumer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ newConsumer :: MonadIO m
-> Subscription
-> m (Either KafkaError KafkaConsumer)
newConsumer props (Subscription ts tp) = liftIO $ do
let cp = case cpCallbackPollMode props of
let cp = case getCallbackPollMode props of
CallbackPollModeAsync -> setCallback (rebalanceCallback (\_ _ -> return ())) <> props
CallbackPollModeSync -> props
kc@(KafkaConf kc' qref _) <- newConsumerConf cp
Expand All @@ -127,7 +127,7 @@ newConsumer props (Subscription ts tp) = liftIO $ do
case rdk of
Left err -> return . Left $ KafkaError err
Right rdk' -> do
when (cpCallbackPollMode props == CallbackPollModeAsync) $ do
when (getCallbackPollMode props == CallbackPollModeAsync) $ do
msgq <- rdKafkaQueueNew rdk'
writeIORef qref (Just msgq)
let kafka = KafkaConsumer (Kafka rdk') kc
Expand All @@ -138,7 +138,7 @@ newConsumer props (Subscription ts tp) = liftIO $ do
forM_ (cpLogLevel cp) (setConsumerLogLevel kafka)
sub <- subscribe kafka ts
case sub of
Nothing -> (when (cpCallbackPollMode props == CallbackPollModeAsync) $
Nothing -> (when (getCallbackPollMode props == CallbackPollModeAsync) $
runConsumerLoop kafka (Just $ Timeout 100)) >> return (Right kafka)
Just err -> closeConsumer kafka >> return (Left err)

Expand Down
17 changes: 11 additions & 6 deletions src/Kafka/Consumer/ConsumerProperties.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
-----------------------------------------------------------------------------
module Kafka.Consumer.ConsumerProperties
( ConsumerProperties(..)
, CallbackPollMode(..)
, CallbackPollMode(..), getCallbackPollMode
, brokersList
, autoCommit
, noAutoCommit
Expand All @@ -30,6 +30,7 @@ where
import Control.Monad (MonadPlus (mplus))
import Data.Map (Map)
import qualified Data.Map as M
import Data.Maybe (fromMaybe)
import Data.Semigroup as Sem
import Data.Text (Text)
import qualified Data.Text as Text
Expand All @@ -54,12 +55,12 @@ data ConsumerProperties = ConsumerProperties
{ cpProps :: Map Text Text
, cpLogLevel :: Maybe KafkaLogLevel
, cpCallbacks :: [Callback]
, cpCallbackPollMode :: CallbackPollMode
, cpCallbackPollMode :: Maybe CallbackPollMode
}

instance Sem.Semigroup ConsumerProperties where
(ConsumerProperties m1 ll1 cb1 _) <> (ConsumerProperties m2 ll2 cb2 cup2) =
ConsumerProperties (M.union m2 m1) (ll2 `mplus` ll1) (cb1 `mplus` cb2) cup2
(ConsumerProperties m1 ll1 cb1 cup1) <> (ConsumerProperties m2 ll2 cb2 cup2) =
ConsumerProperties (M.union m2 m1) (ll2 `mplus` ll1) (cb1 `mplus` cb2) (cup1 `mplus` cup2)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't it be (cup2 `mplus` cup1) instead? Otherwise it'll be impossible to override once set...

{-# INLINE (<>) #-}

-- | /Right biased/ so we prefer newer properties over older ones.
Expand All @@ -68,12 +69,16 @@ instance Monoid ConsumerProperties where
{ cpProps = M.empty
, cpLogLevel = Nothing
, cpCallbacks = []
, cpCallbackPollMode = CallbackPollModeAsync
, cpCallbackPollMode = Nothing
}
{-# INLINE mempty #-}
mappend = (Sem.<>)
{-# INLINE mappend #-}

-- | The actual poll mode, or async if non set explicitly.
getCallbackPollMode :: ConsumerProperties -> CallbackPollMode
getCallbackPollMode cp = fromMaybe CallbackPollModeAsync (cpCallbackPollMode cp)

-- | Set the <https://kafka.apache.org/documentation/#bootstrap.servers list of brokers> to contact to connect to the Kafka cluster.
brokersList :: [BrokerAddress] -> ConsumerProperties
brokersList bs =
Expand Down Expand Up @@ -171,4 +176,4 @@ queuedMaxMessagesKBytes kBytes =

-- | Set the callback poll mode. Default value is 'CallbackPollModeAsync'.
callbackPollMode :: CallbackPollMode -> ConsumerProperties
callbackPollMode mode = mempty { cpCallbackPollMode = mode }
callbackPollMode mode = mempty { cpCallbackPollMode = Just mode }