diff --git a/hw-kafka-client.cabal b/hw-kafka-client.cabal index 474cacf..4f43ef3 100644 --- a/hw-kafka-client.cabal +++ b/hw-kafka-client.cabal @@ -55,9 +55,8 @@ library build-tool-depends: c2hs:c2hs if impl(ghc <8.0) build-depends: semigroups - exposed-modules: Kafka.Admin - Kafka.Admin.AdminProperties - Kafka.Admin.Types + exposed-modules: Kafka.Topic + Kafka.Topic.Types Kafka.Consumer Kafka.Consumer.ConsumerProperties Kafka.Consumer.Subscription diff --git a/src/Kafka/Admin/AdminProperties.hs b/src/Kafka/Admin/AdminProperties.hs deleted file mode 100644 index 30d858f..0000000 --- a/src/Kafka/Admin/AdminProperties.hs +++ /dev/null @@ -1,43 +0,0 @@ -{-# LANGUAGE OverloadedStrings #-} - -module Kafka.Admin.AdminProperties where - -import Data.Map -import qualified Data.Map as M -import Data.Text - -import Kafka.Types - -newtype AdminProperties = AdminProperties { - adminProps :: Map Text Text -} - -instance Semigroup AdminProperties where - ( AdminProperties props1 ) <> ( AdminProperties props2 ) = - AdminProperties ( props2 `union` props1 ) - {-# INLINE (<>) #-} - -instance Monoid AdminProperties where - mempty = AdminProperties { - adminProps = M.empty - } - {-# INLINE mempty #-} - mappend = (<>) - {-# INLINE mappend #-} - -brokers :: [BrokerAddress] -> AdminProperties -brokers b = - let b' = intercalate "," ((\( BrokerAddress i ) -> i ) <$> b ) - in extraProps $ fromList [("bootstrap.servers", b')] - -clientId :: ClientId -> AdminProperties -clientId (ClientId cid) = - extraProps $ M.fromList [("client.id", cid)] - -timeOut :: Timeout -> AdminProperties -timeOut (Timeout to) = - let to' = ( pack $ show to ) - in extraProps $ fromList [("request.timeout.ms", to')] - -extraProps :: Map Text Text -> AdminProperties -extraProps m = mempty { adminProps = m } diff --git a/src/Kafka/Internal/RdKafka.chs b/src/Kafka/Internal/RdKafka.chs index 9ec9f52..e408a6d 100644 --- a/src/Kafka/Internal/RdKafka.chs +++ b/src/Kafka/Internal/RdKafka.chs @@ -1161,11 +1161,10 @@ rdKafkaErrorIsRetriable ptr = boolFromCInt <$> rdKafkaErrorIsRetriable' ptr rdKafkaErrorTxnRequiresAbort :: RdKafkaErrorTPtr -> IO Bool rdKafkaErrorTxnRequiresAbort ptr = boolFromCInt <$> rdKafkaErrorTxnRequiresAbort' ptr --- Admin API +-- Topics {#enum rd_kafka_admin_op_t as ^ {underscoreToCase} deriving (Show, Eq) #} -data RdKafkaTopicResultT -{#pointer *rd_kafka_topic_result_t as RdKafkaTopicResultTPtr foreign -> RdKafkaTopicResultT #} + data RdKafkaAdminOptionsT {#pointer *rd_kafka_AdminOptions_t as RdKafkaAdminOptionsTPtr foreign -> RdKafkaAdminOptionsT #} diff --git a/src/Kafka/Admin.hs b/src/Kafka/Topic.hs similarity index 62% rename from src/Kafka/Admin.hs rename to src/Kafka/Topic.hs index 50f1ab6..d03cb98 100644 --- a/src/Kafka/Admin.hs +++ b/src/Kafka/Topic.hs @@ -1,60 +1,42 @@ -module Kafka.Admin( +module Kafka.Topic( module X -, newKAdmin , createTopic , deleteTopic -, closeKAdmin ) where -import Control.Monad -import Control.Monad.IO.Class -import Data.Text -import Data.List.NonEmpty -import qualified Data.List.NonEmpty as NEL -import qualified Data.Text as T +import Control.Monad.IO.Class +import Data.List.NonEmpty +import qualified Data.List.NonEmpty as NEL +import Data.Text +import qualified Data.Text as T import Kafka.Internal.RdKafka import Kafka.Internal.Setup -import Kafka.Types as X -import Kafka.Admin.AdminProperties as X -import Kafka.Admin.Types as X +import Kafka.Topic.Types as X +import Kafka.Types as X -newKAdmin ::( MonadIO m ) - => AdminProperties - -> m (Either KafkaError KAdmin) -newKAdmin properties = liftIO $ do - kafkaConfig@(KafkaConf kafkaConf' _ _) <- kafkaConf ( KafkaProps $ adminProps properties) - maybeKafka <- newRdKafkaT RdKafkaConsumer kafkaConf' - case maybeKafka of - Left err -> pure $ Left $ KafkaError err - Right kafka -> pure $ Right $ KAdmin (Kafka kafka) kafkaConfig - -closeKAdmin :: KAdmin - -> IO () -closeKAdmin ka = void $ rdKafkaConsumerClose (getRdKafka ka) --- CREATE TOPIC --- -createTopic :: KAdmin - -> NewTopic - -> IO (Either KafkaError TopicName) -createTopic kAdmin topic = liftIO $ do - let kafkaPtr = getRdKafka kAdmin +createTopic :: HasKafka k => k -> NewTopic -> IO (Either KafkaError TopicName) +createTopic k topic = do + let kafkaPtr = getRdKafka k queue <- newRdKafkaQueue kafkaPtr - opts <- newRdKAdminOptions kafkaPtr RdKafkaAdminOpAny + opts <- newRdKAdminOptions kafkaPtr RdKafkaAdminOpAny topicRes <- withNewTopic topic $ \topic' -> rdKafkaCreateTopic kafkaPtr topic' opts queue case topicRes of - Left err -> do + Left err -> do pure $ Left (NEL.head err) Right _ -> do pure $ Right $ topicName topic --- DELETE TOPIC --- -deleteTopic :: KAdmin +deleteTopic :: HasKafka k + => k -> TopicName -> IO (Either KafkaError TopicName) -deleteTopic kAdmin topic = liftIO $ do - let kafkaPtr = getRdKafka kAdmin +deleteTopic k topic = liftIO $ do + let kafkaPtr = getRdKafka k queue <- newRdKafkaQueue kafkaPtr opts <- newRdKAdminOptions kafkaPtr RdKafkaAdminOpAny @@ -65,7 +47,7 @@ deleteTopic kAdmin topic = liftIO $ do Right _ -> do pure $ Right topic -withNewTopic :: NewTopic +withNewTopic :: NewTopic -> (RdKafkaNewTopicTPtr -> IO a) -> IO (Either (NonEmpty KafkaError) a) withNewTopic t transform = do @@ -73,7 +55,7 @@ withNewTopic t transform = do case mkNewTopicRes of Left err -> do return $ Left err - Right topic -> do + Right topic -> do res <- transform topic return $ Right res @@ -93,23 +75,23 @@ newTopicPtr :: NewTopic -> IO (Either KafkaError RdKafkaNewTopicTPtr) newTopicPtr topic = do ptrRes <- newRdKafkaNewTopic (unpack $ unTopicName $ topicName topic) (unPartitionCount $ topicPartitionCount topic) (unReplicationFactor $ topicReplicationFactor topic) case ptrRes of - Left str -> pure $ Left (KafkaError $ T.pack str) + Left str -> pure $ Left (KafkaError $ T.pack str) Right ptr -> pure $ Right ptr oldTopicPtr :: TopicName -> IO (Either KafkaError RdKafkaDeleteTopicTPtr) oldTopicPtr tName = do res <- newRdKafkaDeleteTopic $ unpack . unTopicName $ tName case res of - Left str -> pure $ Left (KafkaError $ T.pack str) + Left str -> pure $ Left (KafkaError $ T.pack str) Right ptr -> pure $ Right ptr -mkNewTopic :: NewTopic +mkNewTopic :: NewTopic -> (NewTopic -> IO (Either KafkaError a)) -> IO (Either (NonEmpty KafkaError) a) mkNewTopic topic create = do res <- create topic case res of - Left err -> pure $ Left (NEL.singleton err) + Left err -> pure $ Left (singletonList err) Right resource -> pure $ Right resource rmOldTopic :: TopicName @@ -118,5 +100,8 @@ rmOldTopic :: TopicName rmOldTopic tName remove = do res <- remove tName case res of - Left err -> pure $ Left (NEL.singleton err) + Left err -> pure $ Left (singletonList err) Right resource -> pure $ Right resource + +singletonList :: a -> NonEmpty a +singletonList x = x :| [] diff --git a/src/Kafka/Admin/Types.hs b/src/Kafka/Topic/Types.hs similarity index 57% rename from src/Kafka/Admin/Types.hs rename to src/Kafka/Topic/Types.hs index 54f606e..e42e2c3 100644 --- a/src/Kafka/Admin/Types.hs +++ b/src/Kafka/Topic/Types.hs @@ -1,6 +1,5 @@ -module Kafka.Admin.Types ( -KAdmin(..) -, PartitionCount (..) +module Kafka.Topic.Types ( +PartitionCount (..) , ReplicationFactor (..) , NewTopic (..) ) where @@ -8,20 +7,6 @@ KAdmin(..) import Data.Map import Kafka.Types -import Kafka.Internal.Setup - -data KAdmin = KAdmin { - kaKafkaPtr :: !Kafka - , kaKafkaConf :: !KafkaConf -} - -instance HasKafka KAdmin where - getKafka = kaKafkaPtr - {-# INLINE getKafka #-} - -instance HasKafkaConf KAdmin where - getKafkaConf = kaKafkaConf - {-# INLINE getKafkaConf #-} newtype PartitionCount = PartitionCount { unPartitionCount :: Int } deriving (Show, Eq) newtype ReplicationFactor = ReplicationFactor { unReplicationFactor :: Int } deriving (Show, Eq) diff --git a/tests-it/Kafka/IntegrationSpec.hs b/tests-it/Kafka/IntegrationSpec.hs index 5fc2ea0..6ed1853 100644 --- a/tests-it/Kafka/IntegrationSpec.hs +++ b/tests-it/Kafka/IntegrationSpec.hs @@ -18,7 +18,7 @@ import Kafka.Consumer import qualified Data.Text as T import Kafka.Metadata import Kafka.Producer -import Kafka.Admin +import Kafka.Topic import Kafka.TestEnv import Test.Hspec @@ -174,20 +174,19 @@ spec = do forM_ res $ \rcs -> forM_ rcs ((`shouldBe` Set.fromList (headersToList testHeaders)) . Set.fromList . headersToList . crHeaders) - describe "Kafka.Admin.Spec" $ do + describe "Kafka.Topic.Spec" $ do let topicName = addRandomChars "admin.topic.created." 5 topicsMVar <- runIO newEmptyMVar - specWithAdmin "Create topic" $ do - - it "should create a new topic" $ \(admin :: KAdmin) -> do + specWithConsumer "Read all topics" consumerProps $ do + + it "should create a topic" $ \(consumer :: KafkaConsumer) -> do tName <- topicName let newTopic = mkNewTopic (TopicName ( T.pack(tName) )) - result <- createTopic admin newTopic + result <- createTopic consumer newTopic result `shouldSatisfy` isRight - specWithConsumer "Read all topics" consumerProps $ do it "should return all the topics" $ \(consumer :: KafkaConsumer) -> do res <- allTopicsMetadata consumer (Timeout 1000) @@ -205,13 +204,12 @@ spec = do topicsLen `shouldSatisfy` (>0) hasTopic `shouldBe` True - specWithAdmin "Remove topics" $ do - - it "should delete all the topics currently existing" $ \(admin ::KAdmin) -> do + it "should delete all the topics currently existing" $ \(consumer :: KafkaConsumer) -> do topics <- takeMVar topicsMVar forM_ topics $ \topic -> do - result <- deleteTopic admin topic + result <- deleteTopic consumer topic result `shouldSatisfy` isRight + ---------------------------------------------------------------------------------------------------------------- data ReadState = Skip | Read diff --git a/tests-it/Kafka/TestEnv.hs b/tests-it/Kafka/TestEnv.hs index e588137..daee4a2 100644 --- a/tests-it/Kafka/TestEnv.hs +++ b/tests-it/Kafka/TestEnv.hs @@ -15,7 +15,6 @@ import qualified System.Random as Rnd import Control.Concurrent import Kafka.Consumer as C import Kafka.Producer as P -import Kafka.Admin as A import Test.Hspec @@ -58,9 +57,6 @@ producerProps = P.brokersList [brokerAddress] <> P.setCallback (logCallback (\l s1 s2 -> print $ "[Producer] " <> show l <> ": " <> s1 <> ", " <> s2)) <> P.setCallback (errorCallback (\e r -> print $ "[Producer] " <> show e <> ": " <> r)) -adminProperties :: AdminProperties -adminProperties = A.brokers [brokerAddress] - testSubscription :: TopicName -> Subscription testSubscription t = topics [t] <> offsetReset Earliest @@ -80,9 +76,6 @@ mkConsumerWith props = do (RebalanceAssign _) -> putMVar var True _ -> pure () -mkAdmin :: IO KAdmin -mkAdmin = newKAdmin adminProperties >>= \(Right k) -> pure k - specWithConsumer :: String -> ConsumerProperties -> SpecWith KafkaConsumer -> Spec specWithConsumer s p f = beforeAll (mkConsumerWith p) @@ -97,9 +90,3 @@ specWithKafka s p f = beforeAll ((,) <$> mkConsumerWith p <*> mkProducer) $ afterAll (\(consumer, producer) -> void $ closeProducer producer >> closeConsumer consumer) $ describe s f - -specWithAdmin :: String -> SpecWith KAdmin -> Spec -specWithAdmin s f = - beforeAll mkAdmin - $ afterAll (void . closeKAdmin) - $ describe s f