diff --git a/README.md b/README.md index 115c3d4..12d9216 100644 --- a/README.md +++ b/README.md @@ -223,7 +223,7 @@ To be able to run tests locally, `$KAFKA_TEST_BROKER` environment variable is ex `$KAFKA_TEST_BROKER` should contain an IP address of an accessible Kafka broker that will be used to run integration tests against. -With [Docker Compose](./docker-compose.yml) this variable is used to configure Kafka broker to listen on this address: +With [Docker Compose](./docker-compose.yml) this variable is used to configure a Kafka broker with a UI on localhost:8080 to listen on this address: ``` $ docker-compose up diff --git a/docker-compose.yml b/docker-compose.yml index 9c79ea9..fec8e00 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,24 +1,57 @@ -version: "3.8" - +version: '3.7' services: - zookeeper: - image: confluentinc/cp-zookeeper - hostname: zookeeper - ports: - - 2182:2181 - environment: - SERVICE_NAME: zookeeper - ZOOKEEPER_CLIENT_PORT: 2181 - - kafka: - image: confluentinc/cp-kafka:latest - hostname: localhost + # Redpanda cluster + redpanda-1: + image: docker.redpanda.com/redpandadata/redpanda:v23.1.1 + container_name: redpanda-1 + command: + - redpanda + - start + - --smp + - '1' + - --reserve-memory + - 0M + - --overprovisioned + - --node-id + - '1' + - --kafka-addr + - PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092 + - --advertise-kafka-addr + - PLAINTEXT://redpanda-1:29092,OUTSIDE://localhost:9092 + - --pandaproxy-addr + - PLAINTEXT://0.0.0.0:28082,OUTSIDE://0.0.0.0:8082 + - --advertise-pandaproxy-addr + - PLAINTEXT://redpanda-1:28082,OUTSIDE://localhost:8082 + - --rpc-addr + - 0.0.0.0:33145 + - --advertise-rpc-addr + - redpanda-1:33145 ports: + - 8082:8082 - 9092:9092 - links: - - zookeeper:zookeeper + - 9644:9644 + - 28082:28082 + - 29092:29092 + + redpanda-console: + image: docker.redpanda.com/redpandadata/console:v2.2.2 + container_name: redpanda-console + entrypoint: /bin/sh + command: -c "echo \"$$CONSOLE_CONFIG_FILE\" > /tmp/config.yml; /app/console" environment: - KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" - KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://$KAFKA_TEST_BROKER:9092" - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 - KAFKA_CREATE_TOPICS: + CONFIG_FILEPATH: /tmp/config.yml + CONSOLE_CONFIG_FILE: | + kafka: + brokers: ["redpanda-1:29092"] + schemaRegistry: + enabled: false + redpanda: + adminApi: + enabled: true + urls: ["http://redpanda-1:9644"] + connect: + enabled: false + ports: + - 8080:8080 + depends_on: + - redpanda-1 diff --git a/hw-kafka-client.cabal b/hw-kafka-client.cabal index e3e6729..083a0bd 100644 --- a/hw-kafka-client.cabal +++ b/hw-kafka-client.cabal @@ -25,7 +25,7 @@ extra-source-files: README.md source-repository head type: git - location: git://github.com/haskell-works/hw-kafka-client.git + location: https://github.com/haskell-works/hw-kafka-client.git flag examples description: Also compile examples @@ -55,7 +55,9 @@ library build-tool-depends: c2hs:c2hs if impl(ghc <8.0) build-depends: semigroups - exposed-modules: Kafka.Consumer + exposed-modules: Kafka.Topic + Kafka.Topic.Types + Kafka.Consumer Kafka.Consumer.ConsumerProperties Kafka.Consumer.Subscription Kafka.Consumer.Types diff --git a/src/Kafka/Internal/RdKafka.chs b/src/Kafka/Internal/RdKafka.chs index 86ed69a..e408a6d 100644 --- a/src/Kafka/Internal/RdKafka.chs +++ b/src/Kafka/Internal/RdKafka.chs @@ -10,15 +10,14 @@ import qualified Data.Text as Text import Control.Monad (liftM) import Data.Int (Int32, Int64) import Data.Word (Word8) -import Foreign.Concurrent (newForeignPtr) import qualified Foreign.Concurrent as Concurrent import Foreign.Marshal.Alloc (alloca, allocaBytes) import Foreign.Marshal.Array (peekArray, allocaArray, withArrayLen) import Foreign.Storable (Storable(..)) import Foreign.Ptr (Ptr, FunPtr, castPtr, nullPtr) -import Foreign.ForeignPtr (FinalizerPtr, addForeignPtrFinalizer, newForeignPtr_, withForeignPtr) +import Foreign.ForeignPtr (FinalizerPtr, addForeignPtrFinalizer, newForeignPtr_, withForeignPtr, ForeignPtr, newForeignPtr) import Foreign.C.Error (Errno(..), getErrno) -import Foreign.C.String (CString, newCString, withCAString, peekCAString, peekCString) +import Foreign.C.String (CString, newCString, withCString, withCAString, peekCAString, peekCString) import Foreign.C.Types (CFile, CInt(..), CSize, CChar, CLong) import System.IO (Handle, stdin, stdout, stderr) import System.Posix.IO (handleToFd) @@ -271,14 +270,12 @@ instance Storable RdKafkaMetadataT where {`Int'} -> `RdKafkaTopicPartitionListTPtr' #} foreign import ccall unsafe "rdkafka.h &rd_kafka_topic_partition_list_destroy" - rdKafkaTopicPartitionListDestroyF :: FinalizerPtr RdKafkaTopicPartitionListT -foreign import ccall unsafe "rdkafka.h rd_kafka_topic_partition_list_destroy" - rdKafkaTopicPartitionListDestroy :: Ptr RdKafkaTopicPartitionListT -> IO () + rdKafkaTopicPartitionListDestroy :: FinalizerPtr RdKafkaTopicPartitionListT newRdKafkaTopicPartitionListT :: Int -> IO RdKafkaTopicPartitionListTPtr newRdKafkaTopicPartitionListT size = do ret <- rdKafkaTopicPartitionListNew size - addForeignPtrFinalizer rdKafkaTopicPartitionListDestroyF ret + addForeignPtrFinalizer rdKafkaTopicPartitionListDestroy ret return ret {# fun rd_kafka_topic_partition_list_add as ^ @@ -293,7 +290,7 @@ newRdKafkaTopicPartitionListT size = do copyRdKafkaTopicPartitionList :: RdKafkaTopicPartitionListTPtr -> IO RdKafkaTopicPartitionListTPtr copyRdKafkaTopicPartitionList pl = do cp <- rdKafkaTopicPartitionListCopy pl - addForeignPtrFinalizer rdKafkaTopicPartitionListDestroyF cp + addForeignPtrFinalizer rdKafkaTopicPartitionListDestroy cp return cp {# fun rd_kafka_topic_partition_list_set_offset as ^ @@ -521,6 +518,16 @@ rdKafkaTopicConfSetPartitionerCb conf cb = do {#fun rd_kafka_resume_partitions as ^ {`RdKafkaTPtr', `RdKafkaTopicPartitionListTPtr'} -> `RdKafkaRespErrT' cIntToEnum #} +---- EVENT +foreign import ccall unsafe "rdkafka.h &rd_kafka_event_destroy" + rdKafkaEventDestroyF :: FinalizerPtr RdKafkaEventT + +data RdKafkaEventT +{#pointer *rd_kafka_event_t as RdKafkaEventTPtr foreign -> RdKafkaEventT #} + +{#fun rd_kafka_event_destroy as ^ + {`RdKafkaEventTPtr'} -> `()'#} + ---- QUEUE data RdKafkaQueueT {#pointer *rd_kafka_queue_t as RdKafkaQueueTPtr foreign -> RdKafkaQueueT #} @@ -528,18 +535,26 @@ data RdKafkaQueueT {#fun rd_kafka_queue_new as ^ {`RdKafkaTPtr'} -> `RdKafkaQueueTPtr' #} -foreign import ccall unsafe "rdkafka.h &rd_kafka_queue_destroy" - rdKafkaQueueDestroyF :: FinalizerPtr RdKafkaQueueT - {#fun rd_kafka_queue_destroy as ^ {`RdKafkaQueueTPtr'} -> `()'#} +foreign import ccall unsafe "rdkafka.h &rd_kafka_queue_destroy" + rdKafkaQueueDestroyF :: FinalizerPtr RdKafkaQueueT + newRdKafkaQueue :: RdKafkaTPtr -> IO RdKafkaQueueTPtr newRdKafkaQueue k = do q <- rdKafkaQueueNew k addForeignPtrFinalizer rdKafkaQueueDestroyF q return q +rdKafkaQueuePoll :: RdKafkaQueueTPtr -> Int -> IO (Maybe RdKafkaEventTPtr) +rdKafkaQueuePoll qPtr timeout = + withForeignPtr qPtr $ \qPtr' -> do + res <- {#call rd_kafka_queue_poll#} qPtr' (fromIntegral timeout) + if res == nullPtr + then pure Nothing + else Just <$> newForeignPtr rdKafkaEventDestroyF res + {#fun rd_kafka_consume_queue as ^ {`RdKafkaQueueTPtr', `Int'} -> `RdKafkaMessageTPtr' #} @@ -566,7 +581,7 @@ rdKafkaConsumeBatchQueue :: RdKafkaQueueTPtr -> Int -> Int -> IO [RdKafkaMessage rdKafkaConsumeBatchQueue qptr timeout batchSize = do allocaArray batchSize $ \pArr -> do rSize <- rdKafkaConsumeBatchQueue' qptr timeout pArr (fromIntegral batchSize) - peekArray (fromIntegral rSize) pArr >>= traverse (flip newForeignPtr (return ())) + peekArray (fromIntegral rSize) pArr >>= traverse newForeignPtr_ ------------------------------------------------------------------------------------------------- ---- High-level KafkaConsumer @@ -588,7 +603,7 @@ rdKafkaSubscription k = do (err, sub) <- rdKafkaSubscription' k case err of RdKafkaRespErrNoError -> - Right <$> newForeignPtr sub (rdKafkaTopicPartitionListDestroy sub) + Right <$> newForeignPtr rdKafkaTopicPartitionListDestroy sub e -> return (Left e) {#fun rd_kafka_consumer_poll as ^ @@ -623,7 +638,7 @@ rdKafkaAssignment k = do (err, ass) <- rdKafkaAssignment' k case err of RdKafkaRespErrNoError -> - Right <$> newForeignPtr ass (rdKafkaTopicPartitionListDestroy ass) + Right <$> newForeignPtr rdKafkaTopicPartitionListDestroy ass e -> return (Left e) {#fun rd_kafka_commit as ^ @@ -732,8 +747,8 @@ instance Storable RdKafkaGroupListT where foreign import ccall "rdkafka.h &rd_kafka_group_list_destroy" rdKafkaGroupListDestroyF :: FinalizerPtr RdKafkaGroupListT -foreign import ccall "rdkafka.h rd_kafka_group_list_destroy" - rdKafkaGroupListDestroy :: Ptr RdKafkaGroupListT -> IO () +foreign import ccall "rdkafka.h &rd_kafka_group_list_destroy" + rdKafkaGroupListDestroy :: FinalizerPtr RdKafkaGroupListT rdKafkaListGroups :: RdKafkaTPtr -> Maybe String -> Int -> IO (Either RdKafkaRespErrT RdKafkaGroupListTPtr) rdKafkaListGroups k g t = case g of @@ -743,7 +758,7 @@ rdKafkaListGroups k g t = case g of listGroups grp = do (err, res) <- rdKafkaListGroups' k grp t case err of - RdKafkaRespErrNoError -> Right <$> newForeignPtr res (rdKafkaGroupListDestroy res) + RdKafkaRespErrNoError -> Right <$> newForeignPtr rdKafkaGroupListDestroy res e -> return $ Left e ------------------------------------------------------------------------------------------------- @@ -924,15 +939,15 @@ rdKafkaConsumeStop topicPtr partition = do alloca- `Ptr RdKafkaMetadataT' peekPtr*, `Int'} -> `RdKafkaRespErrT' cIntToEnum #} -foreign import ccall unsafe "rdkafka.h rd_kafka_metadata_destroy" - rdKafkaMetadataDestroy :: Ptr RdKafkaMetadataT -> IO () +foreign import ccall unsafe "rdkafka.h &rd_kafka_metadata_destroy" + rdKafkaMetadataDestroy :: FinalizerPtr RdKafkaMetadataT rdKafkaMetadata :: RdKafkaTPtr -> Bool -> Maybe RdKafkaTopicTPtr -> Int -> IO (Either RdKafkaRespErrT RdKafkaMetadataTPtr) rdKafkaMetadata k allTopics mt timeout = do tptr <- maybe (newForeignPtr_ nullPtr) pure mt (err, res) <- rdKafkaMetadata' k allTopics tptr timeout case err of - RdKafkaRespErrNoError -> Right <$> newForeignPtr res (rdKafkaMetadataDestroy res) + RdKafkaRespErrNoError -> Right <$> newForeignPtr rdKafkaMetadataDestroy res e -> return (Left e) {#fun rd_kafka_poll as ^ @@ -1146,6 +1161,86 @@ rdKafkaErrorIsRetriable ptr = boolFromCInt <$> rdKafkaErrorIsRetriable' ptr rdKafkaErrorTxnRequiresAbort :: RdKafkaErrorTPtr -> IO Bool rdKafkaErrorTxnRequiresAbort ptr = boolFromCInt <$> rdKafkaErrorTxnRequiresAbort' ptr +-- Topics +{#enum rd_kafka_admin_op_t as ^ {underscoreToCase} deriving (Show, Eq) #} + + + +data RdKafkaAdminOptionsT +{#pointer *rd_kafka_AdminOptions_t as RdKafkaAdminOptionsTPtr foreign -> RdKafkaAdminOptionsT #} + +{#fun rd_kafka_AdminOptions_new as ^ + {`RdKafkaTPtr', enumToCInt `RdKafkaAdminOpT'} -> `RdKafkaAdminOptionsTPtr' #} + +data RdKafkaNewTopicT +{#pointer *rd_kafka_NewTopic_t as RdKafkaNewTopicTPtr foreign -> RdKafkaNewTopicT #} + +{#fun rd_kafka_NewTopic_new as ^ {`String', `Int', `Int', id `Ptr CChar', cIntConv `CSize'} -> `RdKafkaNewTopicTPtr' #} + +foreign import ccall unsafe "rdkafka.h &rd_kafka_AdminOptions_destroy" -- prevent memory leak + finalRdKafkaAdminOptionsDestroy :: FinalizerPtr RdKafkaAdminOptionsT + +newRdKAdminOptions :: RdKafkaTPtr -> RdKafkaAdminOpT -> IO RdKafkaAdminOptionsTPtr +newRdKAdminOptions kafkaPtr opt = do + res <- rdKafkaAdminOptionsNew kafkaPtr opt + addForeignPtrFinalizer finalRdKafkaAdminOptionsDestroy res + pure res + +rdKafkaNewTopicDestroy :: RdKafkaNewTopicTPtr -> IO () -- prevent memory leak +rdKafkaNewTopicDestroy tPtr = do + withForeignPtr tPtr {#call rd_kafka_NewTopic_destroy#} + +foreign import ccall "&rd_kafka_NewTopic_destroy" + rdKafkaNewTopicDestroyFinalizer :: FinalizerPtr RdKafkaNewTopicT + +newRdKafkaNewTopic :: String -> Int -> Int -> IO (Either String RdKafkaNewTopicTPtr) +newRdKafkaNewTopic topicName topicPartitions topicReplicationFactor = do + allocaBytes nErrorBytes $ \ptr -> do + res <- rdKafkaNewTopicNew topicName topicPartitions topicReplicationFactor ptr (fromIntegral nErrorBytes) + withForeignPtr res $ \realPtr -> do + if realPtr == nullPtr + then peekCString ptr >>= pure . Left + else addForeignPtrFinalizer rdKafkaNewTopicDestroyFinalizer res >> pure (Right res) + +--- Create topic +rdKafkaCreateTopic :: RdKafkaTPtr + -> RdKafkaNewTopicTPtr + -> RdKafkaAdminOptionsTPtr + -> RdKafkaQueueTPtr + -> IO () +rdKafkaCreateTopic kafkaPtr topic opts queue = do + let topics = [topic] + withForeignPtrs kafkaPtr opts queue $ \kPtr oPtr qPtr -> + withForeignPtrsArrayLen topics $ \tLen tPtr -> do + {#call rd_kafka_CreateTopics#} kPtr tPtr (fromIntegral tLen) oPtr qPtr + +--- Delete topic +foreign import ccall unsafe "rdkafka.h &rd_kafka_DeleteTopic_destroy" + rdKafkaDeleteTopicDestroy :: FinalizerPtr RdKafkaDeleteTopicT + +data RdKafkaDeleteTopicT +{#pointer *rd_kafka_DeleteTopic_t as RdKafkaDeleteTopicTPtr foreign -> RdKafkaDeleteTopicT #} + +data RdKafkaDeleteTopicsResultT +{#pointer *rd_kafka_DeleteTopics_result_t as RdKafkaDeleteTopicResultTPtr foreign -> RdKafkaDeleteTopicsResultT #} + +newRdKafkaDeleteTopic :: String -> IO (Either String RdKafkaDeleteTopicTPtr) +newRdKafkaDeleteTopic topicNameStr = + withCString topicNameStr $ \topicNameStrPtr -> do + res <- {#call rd_kafka_DeleteTopic_new#} topicNameStrPtr + if (res == nullPtr) + then return $ Left $ "Something went wrong while deleting topic " ++ topicNameStr + else Right <$> newForeignPtr rdKafkaDeleteTopicDestroy res + +rdKafkaDeleteTopics :: RdKafkaTPtr + -> [RdKafkaDeleteTopicTPtr] + -> RdKafkaAdminOptionsTPtr + -> RdKafkaQueueTPtr + -> IO () +rdKafkaDeleteTopics kafkaPtr topics opts queue = do + withForeignPtrs kafkaPtr opts queue $ \kPtr oPtr qPtr -> + withForeignPtrsArrayLen topics $ \tLen tPtr -> do + {#call rd_kafka_DeleteTopics#} kPtr tPtr (fromIntegral tLen) oPtr qPtr -- Marshall / Unmarshall enumToCInt :: Enum a => a -> CInt @@ -1170,6 +1265,9 @@ boolFromCInt (CInt 0) = False boolFromCInt (CInt _) = True {-# INLINE boolFromCInt #-} +peekIntConv :: (Storable a, Integral a, Integral b) => Ptr a -> IO b +peekIntConv = liftM fromIntegral . peek + peekInt64Conv :: (Storable a, Integral a) => Ptr a -> IO Int64 peekInt64Conv = liftM cIntConv . peek {-# INLINE peekInt64Conv #-} @@ -1194,3 +1292,26 @@ c_stdout :: IO CFilePtr c_stdout = handleToCFile stdout "w" c_stderr :: IO CFilePtr c_stderr = handleToCFile stderr "w" + + +withForeignPtrs :: ForeignPtr kafkaPtr + -> ForeignPtr optPtr + -> ForeignPtr queuePtr + -> (Ptr kafkaPtr -> Ptr optPtr -> Ptr queuePtr -> IO x) + -> IO x +withForeignPtrs kafkaPtr optPtr queuePtr f = + withForeignPtr kafkaPtr $ \kafkaPtr' -> + withForeignPtr optPtr $ \optPtr' -> + withForeignPtr queuePtr $ \queuePtr' -> f kafkaPtr' optPtr' queuePtr' + +withForeignPtrsArrayLen :: [ForeignPtr a] + -> (Int -> Ptr (Ptr a) -> IO b) + -> IO b +withForeignPtrsArrayLen as f = + let withForeignPtrsList [] g = g [] + withForeignPtrsList (x:xs) g = + withForeignPtr x $ \x' -> + withForeignPtrsList xs $ \xs' -> + g (x' : xs') + in withForeignPtrsList as $ \ptrs -> + withArrayLen ptrs $ \llen pptrs -> f llen pptrs diff --git a/src/Kafka/Producer.hs b/src/Kafka/Producer.hs index f07815e..35f052e 100644 --- a/src/Kafka/Producer.hs +++ b/src/Kafka/Producer.hs @@ -209,4 +209,4 @@ withBS (Just bs) f = in withForeignPtr d $ \p -> f (p `plusPtr` o) l outboundQueueLength :: Kafka -> IO Int -outboundQueueLength (Kafka k) = rdKafkaOutqLen k \ No newline at end of file +outboundQueueLength (Kafka k) = rdKafkaOutqLen k diff --git a/src/Kafka/Topic.hs b/src/Kafka/Topic.hs new file mode 100644 index 0000000..d03cb98 --- /dev/null +++ b/src/Kafka/Topic.hs @@ -0,0 +1,107 @@ +module Kafka.Topic( +module X +, createTopic +, deleteTopic +) where + +import Control.Monad.IO.Class +import Data.List.NonEmpty +import qualified Data.List.NonEmpty as NEL +import Data.Text +import qualified Data.Text as T + +import Kafka.Internal.RdKafka +import Kafka.Internal.Setup + +import Kafka.Topic.Types as X +import Kafka.Types as X + +--- CREATE TOPIC --- +createTopic :: HasKafka k => k -> NewTopic -> IO (Either KafkaError TopicName) +createTopic k topic = do + let kafkaPtr = getRdKafka k + queue <- newRdKafkaQueue kafkaPtr + opts <- newRdKAdminOptions kafkaPtr RdKafkaAdminOpAny + + topicRes <- withNewTopic topic $ \topic' -> rdKafkaCreateTopic kafkaPtr topic' opts queue + case topicRes of + Left err -> do + pure $ Left (NEL.head err) + Right _ -> do + pure $ Right $ topicName topic + +--- DELETE TOPIC --- +deleteTopic :: HasKafka k + => k + -> TopicName + -> IO (Either KafkaError TopicName) +deleteTopic k topic = liftIO $ do + let kafkaPtr = getRdKafka k + queue <- newRdKafkaQueue kafkaPtr + opts <- newRdKAdminOptions kafkaPtr RdKafkaAdminOpAny + + topicRes <- withOldTopic topic $ \topic' -> rdKafkaDeleteTopics kafkaPtr [topic'] opts queue + case topicRes of + Left err -> do + pure $ Left (NEL.head err) + Right _ -> do + pure $ Right topic + +withNewTopic :: NewTopic + -> (RdKafkaNewTopicTPtr -> IO a) + -> IO (Either (NonEmpty KafkaError) a) +withNewTopic t transform = do + mkNewTopicRes <- mkNewTopic t newTopicPtr + case mkNewTopicRes of + Left err -> do + return $ Left err + Right topic -> do + res <- transform topic + return $ Right res + +withOldTopic :: TopicName + -> (RdKafkaDeleteTopicTPtr -> IO a) + -> IO (Either (NonEmpty KafkaError) a) +withOldTopic tName transform = do + rmOldTopicRes <- rmOldTopic tName oldTopicPtr + case rmOldTopicRes of + Left err -> do + return $ Left err + Right topic -> do + res <- transform topic + return $ Right res + +newTopicPtr :: NewTopic -> IO (Either KafkaError RdKafkaNewTopicTPtr) +newTopicPtr topic = do + ptrRes <- newRdKafkaNewTopic (unpack $ unTopicName $ topicName topic) (unPartitionCount $ topicPartitionCount topic) (unReplicationFactor $ topicReplicationFactor topic) + case ptrRes of + Left str -> pure $ Left (KafkaError $ T.pack str) + Right ptr -> pure $ Right ptr + +oldTopicPtr :: TopicName -> IO (Either KafkaError RdKafkaDeleteTopicTPtr) +oldTopicPtr tName = do + res <- newRdKafkaDeleteTopic $ unpack . unTopicName $ tName + case res of + Left str -> pure $ Left (KafkaError $ T.pack str) + Right ptr -> pure $ Right ptr + +mkNewTopic :: NewTopic + -> (NewTopic -> IO (Either KafkaError a)) + -> IO (Either (NonEmpty KafkaError) a) +mkNewTopic topic create = do + res <- create topic + case res of + Left err -> pure $ Left (singletonList err) + Right resource -> pure $ Right resource + +rmOldTopic :: TopicName + -> (TopicName -> IO (Either KafkaError a)) + -> IO (Either (NonEmpty KafkaError) a) +rmOldTopic tName remove = do + res <- remove tName + case res of + Left err -> pure $ Left (singletonList err) + Right resource -> pure $ Right resource + +singletonList :: a -> NonEmpty a +singletonList x = x :| [] diff --git a/src/Kafka/Topic/Types.hs b/src/Kafka/Topic/Types.hs new file mode 100644 index 0000000..e42e2c3 --- /dev/null +++ b/src/Kafka/Topic/Types.hs @@ -0,0 +1,19 @@ +module Kafka.Topic.Types ( +PartitionCount (..) +, ReplicationFactor (..) +, NewTopic (..) +) where + +import Data.Map + +import Kafka.Types + +newtype PartitionCount = PartitionCount { unPartitionCount :: Int } deriving (Show, Eq) +newtype ReplicationFactor = ReplicationFactor { unReplicationFactor :: Int } deriving (Show, Eq) + +data NewTopic = NewTopic { + topicName :: TopicName + , topicPartitionCount :: PartitionCount + , topicReplicationFactor :: ReplicationFactor + , topicConfig :: Map String String +} deriving (Show) diff --git a/tests-it/Kafka/IntegrationSpec.hs b/tests-it/Kafka/IntegrationSpec.hs index 601b29a..6ed1853 100644 --- a/tests-it/Kafka/IntegrationSpec.hs +++ b/tests-it/Kafka/IntegrationSpec.hs @@ -5,6 +5,8 @@ module Kafka.IntegrationSpec where +import System.Random (randomRIO) +import Control.Concurrent import Control.Concurrent.MVar (newEmptyMVar, putMVar, takeMVar) import Control.Monad (forM, forM_, void) import Control.Monad.Loops @@ -13,8 +15,10 @@ import Data.Map (fromList) import qualified Data.Set as Set import Data.Monoid ((<>)) import Kafka.Consumer +import qualified Data.Text as T import Kafka.Metadata import Kafka.Producer +import Kafka.Topic import Kafka.TestEnv import Test.Hspec @@ -170,6 +174,42 @@ spec = do forM_ res $ \rcs -> forM_ rcs ((`shouldBe` Set.fromList (headersToList testHeaders)) . Set.fromList . headersToList . crHeaders) + describe "Kafka.Topic.Spec" $ do + let topicName = addRandomChars "admin.topic.created." 5 + + topicsMVar <- runIO newEmptyMVar + + specWithConsumer "Read all topics" consumerProps $ do + + it "should create a topic" $ \(consumer :: KafkaConsumer) -> do + tName <- topicName + let newTopic = mkNewTopic (TopicName ( T.pack(tName) )) + result <- createTopic consumer newTopic + result `shouldSatisfy` isRight + + + it "should return all the topics" $ \(consumer :: KafkaConsumer) -> do + res <- allTopicsMetadata consumer (Timeout 1000) + res `shouldSatisfy` isRight + let filterUserTopics m = m { kmTopics = filter (\t -> topicType (tmTopicName t) == User) (kmTopics m) } + let res' = fmap filterUserTopics res + length . kmBrokers <$> res' `shouldBe` Right 1 + + let topics = either (const []) (map tmTopicName . kmTopics) res' + putMVar topicsMVar topics + + let topicsLen = either (const 0) (length . kmTopics) res' + let hasTopic = either (const False) (any (\t -> tmTopicName t == testTopic) . kmTopics) res' + + topicsLen `shouldSatisfy` (>0) + hasTopic `shouldBe` True + + it "should delete all the topics currently existing" $ \(consumer :: KafkaConsumer) -> do + topics <- takeMVar topicsMVar + forM_ topics $ \topic -> do + result <- deleteTopic consumer topic + result `shouldSatisfy` isRight + ---------------------------------------------------------------------------------------------------------------- data ReadState = Skip | Read @@ -311,3 +351,12 @@ runConsumerSpec = do res `shouldBe` Nothing msg <- pollMessage k (Timeout 2000) crOffset <$> msg `shouldBe` Right (Offset 0) + +mkNewTopic :: TopicName -> NewTopic +mkNewTopic name = NewTopic name (PartitionCount 1) (ReplicationFactor 1) mempty + + +addRandomChars :: String -> Int -> IO String +addRandomChars baseStr n = do + randomChars <- mapM (\_ -> randomRIO ('a', 'z')) [1..n] + return $ baseStr ++ randomChars diff --git a/tests-it/Kafka/TestEnv.hs b/tests-it/Kafka/TestEnv.hs index d31b405..daee4a2 100644 --- a/tests-it/Kafka/TestEnv.hs +++ b/tests-it/Kafka/TestEnv.hs @@ -76,7 +76,6 @@ mkConsumerWith props = do (RebalanceAssign _) -> putMVar var True _ -> pure () - specWithConsumer :: String -> ConsumerProperties -> SpecWith KafkaConsumer -> Spec specWithConsumer s p f = beforeAll (mkConsumerWith p)