Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add topic creation and deletion #202

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ To be able to run tests locally, `$KAFKA_TEST_BROKER` environment variable is ex

`$KAFKA_TEST_BROKER` should contain an IP address of an accessible Kafka broker that will be used to run integration tests against.

With [Docker Compose](./docker-compose.yml) this variable is used to configure Kafka broker to listen on this address:
With [Docker Compose](./docker-compose.yml) this variable is used to configure a Kafka broker with a UI on localhost:8080 to listen on this address:

```
$ docker-compose up
Expand Down
73 changes: 53 additions & 20 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,24 +1,57 @@
version: "3.8"

version: '3.7'
services:
zookeeper:
image: confluentinc/cp-zookeeper
hostname: zookeeper
ports:
- 2182:2181
environment:
SERVICE_NAME: zookeeper
ZOOKEEPER_CLIENT_PORT: 2181

kafka:
image: confluentinc/cp-kafka:latest
hostname: localhost
# Redpanda cluster
redpanda-1:
image: docker.redpanda.com/redpandadata/redpanda:v23.1.1
container_name: redpanda-1
command:
- redpanda
- start
- --smp
- '1'
- --reserve-memory
- 0M
- --overprovisioned
- --node-id
- '1'
- --kafka-addr
- PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092
- --advertise-kafka-addr
- PLAINTEXT://redpanda-1:29092,OUTSIDE://localhost:9092
- --pandaproxy-addr
- PLAINTEXT://0.0.0.0:28082,OUTSIDE://0.0.0.0:8082
- --advertise-pandaproxy-addr
- PLAINTEXT://redpanda-1:28082,OUTSIDE://localhost:8082
- --rpc-addr
- 0.0.0.0:33145
- --advertise-rpc-addr
- redpanda-1:33145
ports:
- 8082:8082
- 9092:9092
links:
- zookeeper:zookeeper
- 9644:9644
- 28082:28082
- 29092:29092

redpanda-console:
image: docker.redpanda.com/redpandadata/console:v2.2.2
container_name: redpanda-console
entrypoint: /bin/sh
command: -c "echo \"$$CONSOLE_CONFIG_FILE\" > /tmp/config.yml; /app/console"
environment:
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://$KAFKA_TEST_BROKER:9092"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CREATE_TOPICS:
CONFIG_FILEPATH: /tmp/config.yml
CONSOLE_CONFIG_FILE: |
kafka:
brokers: ["redpanda-1:29092"]
schemaRegistry:
enabled: false
redpanda:
adminApi:
enabled: true
urls: ["http://redpanda-1:9644"]
connect:
enabled: false
ports:
- 8080:8080
depends_on:
- redpanda-1
6 changes: 4 additions & 2 deletions hw-kafka-client.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ extra-source-files: README.md

source-repository head
type: git
location: git://github.com/haskell-works/hw-kafka-client.git
location: https://github.com/haskell-works/hw-kafka-client.git

flag examples
description: Also compile examples
Expand Down Expand Up @@ -55,7 +55,9 @@ library
build-tool-depends: c2hs:c2hs
if impl(ghc <8.0)
build-depends: semigroups
exposed-modules: Kafka.Consumer
exposed-modules: Kafka.Topic
Kafka.Topic.Types
Kafka.Consumer
Kafka.Consumer.ConsumerProperties
Kafka.Consumer.Subscription
Kafka.Consumer.Types
Expand Down
161 changes: 141 additions & 20 deletions src/Kafka/Internal/RdKafka.chs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,14 @@ import qualified Data.Text as Text
import Control.Monad (liftM)
import Data.Int (Int32, Int64)
import Data.Word (Word8)
import Foreign.Concurrent (newForeignPtr)
import qualified Foreign.Concurrent as Concurrent
import Foreign.Marshal.Alloc (alloca, allocaBytes)
import Foreign.Marshal.Array (peekArray, allocaArray, withArrayLen)
import Foreign.Storable (Storable(..))
import Foreign.Ptr (Ptr, FunPtr, castPtr, nullPtr)
import Foreign.ForeignPtr (FinalizerPtr, addForeignPtrFinalizer, newForeignPtr_, withForeignPtr)
import Foreign.ForeignPtr (FinalizerPtr, addForeignPtrFinalizer, newForeignPtr_, withForeignPtr, ForeignPtr, newForeignPtr)
import Foreign.C.Error (Errno(..), getErrno)
import Foreign.C.String (CString, newCString, withCAString, peekCAString, peekCString)
import Foreign.C.String (CString, newCString, withCString, withCAString, peekCAString, peekCString)
import Foreign.C.Types (CFile, CInt(..), CSize, CChar, CLong)
import System.IO (Handle, stdin, stdout, stderr)
import System.Posix.IO (handleToFd)
Expand Down Expand Up @@ -271,14 +270,12 @@ instance Storable RdKafkaMetadataT where
{`Int'} -> `RdKafkaTopicPartitionListTPtr' #}

foreign import ccall unsafe "rdkafka.h &rd_kafka_topic_partition_list_destroy"
rdKafkaTopicPartitionListDestroyF :: FinalizerPtr RdKafkaTopicPartitionListT
foreign import ccall unsafe "rdkafka.h rd_kafka_topic_partition_list_destroy"
rdKafkaTopicPartitionListDestroy :: Ptr RdKafkaTopicPartitionListT -> IO ()
rdKafkaTopicPartitionListDestroy :: FinalizerPtr RdKafkaTopicPartitionListT

newRdKafkaTopicPartitionListT :: Int -> IO RdKafkaTopicPartitionListTPtr
newRdKafkaTopicPartitionListT size = do
ret <- rdKafkaTopicPartitionListNew size
addForeignPtrFinalizer rdKafkaTopicPartitionListDestroyF ret
addForeignPtrFinalizer rdKafkaTopicPartitionListDestroy ret
return ret

{# fun rd_kafka_topic_partition_list_add as ^
Expand All @@ -293,7 +290,7 @@ newRdKafkaTopicPartitionListT size = do
copyRdKafkaTopicPartitionList :: RdKafkaTopicPartitionListTPtr -> IO RdKafkaTopicPartitionListTPtr
copyRdKafkaTopicPartitionList pl = do
cp <- rdKafkaTopicPartitionListCopy pl
addForeignPtrFinalizer rdKafkaTopicPartitionListDestroyF cp
addForeignPtrFinalizer rdKafkaTopicPartitionListDestroy cp
return cp

{# fun rd_kafka_topic_partition_list_set_offset as ^
Expand Down Expand Up @@ -521,25 +518,43 @@ rdKafkaTopicConfSetPartitionerCb conf cb = do
{#fun rd_kafka_resume_partitions as ^
{`RdKafkaTPtr', `RdKafkaTopicPartitionListTPtr'} -> `RdKafkaRespErrT' cIntToEnum #}

---- EVENT
foreign import ccall unsafe "rdkafka.h &rd_kafka_event_destroy"
rdKafkaEventDestroyF :: FinalizerPtr RdKafkaEventT

data RdKafkaEventT
{#pointer *rd_kafka_event_t as RdKafkaEventTPtr foreign -> RdKafkaEventT #}

{#fun rd_kafka_event_destroy as ^
{`RdKafkaEventTPtr'} -> `()'#}

---- QUEUE
data RdKafkaQueueT
{#pointer *rd_kafka_queue_t as RdKafkaQueueTPtr foreign -> RdKafkaQueueT #}

{#fun rd_kafka_queue_new as ^
{`RdKafkaTPtr'} -> `RdKafkaQueueTPtr' #}

foreign import ccall unsafe "rdkafka.h &rd_kafka_queue_destroy"
rdKafkaQueueDestroyF :: FinalizerPtr RdKafkaQueueT

{#fun rd_kafka_queue_destroy as ^
{`RdKafkaQueueTPtr'} -> `()'#}

foreign import ccall unsafe "rdkafka.h &rd_kafka_queue_destroy"
rdKafkaQueueDestroyF :: FinalizerPtr RdKafkaQueueT

newRdKafkaQueue :: RdKafkaTPtr -> IO RdKafkaQueueTPtr
newRdKafkaQueue k = do
q <- rdKafkaQueueNew k
addForeignPtrFinalizer rdKafkaQueueDestroyF q
return q

rdKafkaQueuePoll :: RdKafkaQueueTPtr -> Int -> IO (Maybe RdKafkaEventTPtr)
rdKafkaQueuePoll qPtr timeout =
withForeignPtr qPtr $ \qPtr' -> do
res <- {#call rd_kafka_queue_poll#} qPtr' (fromIntegral timeout)
if res == nullPtr
then pure Nothing
else Just <$> newForeignPtr rdKafkaEventDestroyF res

{#fun rd_kafka_consume_queue as ^
{`RdKafkaQueueTPtr', `Int'} -> `RdKafkaMessageTPtr' #}

Expand All @@ -566,7 +581,7 @@ rdKafkaConsumeBatchQueue :: RdKafkaQueueTPtr -> Int -> Int -> IO [RdKafkaMessage
rdKafkaConsumeBatchQueue qptr timeout batchSize = do
allocaArray batchSize $ \pArr -> do
rSize <- rdKafkaConsumeBatchQueue' qptr timeout pArr (fromIntegral batchSize)
peekArray (fromIntegral rSize) pArr >>= traverse (flip newForeignPtr (return ()))
peekArray (fromIntegral rSize) pArr >>= traverse newForeignPtr_

-------------------------------------------------------------------------------------------------
---- High-level KafkaConsumer
Expand All @@ -588,7 +603,7 @@ rdKafkaSubscription k = do
(err, sub) <- rdKafkaSubscription' k
case err of
RdKafkaRespErrNoError ->
Right <$> newForeignPtr sub (rdKafkaTopicPartitionListDestroy sub)
Right <$> newForeignPtr rdKafkaTopicPartitionListDestroy sub
e -> return (Left e)

{#fun rd_kafka_consumer_poll as ^
Expand Down Expand Up @@ -623,7 +638,7 @@ rdKafkaAssignment k = do
(err, ass) <- rdKafkaAssignment' k
case err of
RdKafkaRespErrNoError ->
Right <$> newForeignPtr ass (rdKafkaTopicPartitionListDestroy ass)
Right <$> newForeignPtr rdKafkaTopicPartitionListDestroy ass
e -> return (Left e)

{#fun rd_kafka_commit as ^
Expand Down Expand Up @@ -732,8 +747,8 @@ instance Storable RdKafkaGroupListT where
foreign import ccall "rdkafka.h &rd_kafka_group_list_destroy"
rdKafkaGroupListDestroyF :: FinalizerPtr RdKafkaGroupListT

foreign import ccall "rdkafka.h rd_kafka_group_list_destroy"
rdKafkaGroupListDestroy :: Ptr RdKafkaGroupListT -> IO ()
foreign import ccall "rdkafka.h &rd_kafka_group_list_destroy"
rdKafkaGroupListDestroy :: FinalizerPtr RdKafkaGroupListT

rdKafkaListGroups :: RdKafkaTPtr -> Maybe String -> Int -> IO (Either RdKafkaRespErrT RdKafkaGroupListTPtr)
rdKafkaListGroups k g t = case g of
Expand All @@ -743,7 +758,7 @@ rdKafkaListGroups k g t = case g of
listGroups grp = do
(err, res) <- rdKafkaListGroups' k grp t
case err of
RdKafkaRespErrNoError -> Right <$> newForeignPtr res (rdKafkaGroupListDestroy res)
RdKafkaRespErrNoError -> Right <$> newForeignPtr rdKafkaGroupListDestroy res
e -> return $ Left e
-------------------------------------------------------------------------------------------------

Expand Down Expand Up @@ -924,15 +939,15 @@ rdKafkaConsumeStop topicPtr partition = do
alloca- `Ptr RdKafkaMetadataT' peekPtr*, `Int'}
-> `RdKafkaRespErrT' cIntToEnum #}

foreign import ccall unsafe "rdkafka.h rd_kafka_metadata_destroy"
rdKafkaMetadataDestroy :: Ptr RdKafkaMetadataT -> IO ()
foreign import ccall unsafe "rdkafka.h &rd_kafka_metadata_destroy"
rdKafkaMetadataDestroy :: FinalizerPtr RdKafkaMetadataT

rdKafkaMetadata :: RdKafkaTPtr -> Bool -> Maybe RdKafkaTopicTPtr -> Int -> IO (Either RdKafkaRespErrT RdKafkaMetadataTPtr)
rdKafkaMetadata k allTopics mt timeout = do
tptr <- maybe (newForeignPtr_ nullPtr) pure mt
(err, res) <- rdKafkaMetadata' k allTopics tptr timeout
case err of
RdKafkaRespErrNoError -> Right <$> newForeignPtr res (rdKafkaMetadataDestroy res)
RdKafkaRespErrNoError -> Right <$> newForeignPtr rdKafkaMetadataDestroy res
e -> return (Left e)

{#fun rd_kafka_poll as ^
Expand Down Expand Up @@ -1146,6 +1161,86 @@ rdKafkaErrorIsRetriable ptr = boolFromCInt <$> rdKafkaErrorIsRetriable' ptr
rdKafkaErrorTxnRequiresAbort :: RdKafkaErrorTPtr -> IO Bool
rdKafkaErrorTxnRequiresAbort ptr = boolFromCInt <$> rdKafkaErrorTxnRequiresAbort' ptr

-- Topics
{#enum rd_kafka_admin_op_t as ^ {underscoreToCase} deriving (Show, Eq) #}



data RdKafkaAdminOptionsT
{#pointer *rd_kafka_AdminOptions_t as RdKafkaAdminOptionsTPtr foreign -> RdKafkaAdminOptionsT #}

{#fun rd_kafka_AdminOptions_new as ^
{`RdKafkaTPtr', enumToCInt `RdKafkaAdminOpT'} -> `RdKafkaAdminOptionsTPtr' #}

data RdKafkaNewTopicT
{#pointer *rd_kafka_NewTopic_t as RdKafkaNewTopicTPtr foreign -> RdKafkaNewTopicT #}

{#fun rd_kafka_NewTopic_new as ^ {`String', `Int', `Int', id `Ptr CChar', cIntConv `CSize'} -> `RdKafkaNewTopicTPtr' #}

foreign import ccall unsafe "rdkafka.h &rd_kafka_AdminOptions_destroy" -- prevent memory leak
finalRdKafkaAdminOptionsDestroy :: FinalizerPtr RdKafkaAdminOptionsT

newRdKAdminOptions :: RdKafkaTPtr -> RdKafkaAdminOpT -> IO RdKafkaAdminOptionsTPtr
newRdKAdminOptions kafkaPtr opt = do
res <- rdKafkaAdminOptionsNew kafkaPtr opt
addForeignPtrFinalizer finalRdKafkaAdminOptionsDestroy res
pure res

rdKafkaNewTopicDestroy :: RdKafkaNewTopicTPtr -> IO () -- prevent memory leak
rdKafkaNewTopicDestroy tPtr = do
withForeignPtr tPtr {#call rd_kafka_NewTopic_destroy#}

foreign import ccall "&rd_kafka_NewTopic_destroy"
rdKafkaNewTopicDestroyFinalizer :: FinalizerPtr RdKafkaNewTopicT

newRdKafkaNewTopic :: String -> Int -> Int -> IO (Either String RdKafkaNewTopicTPtr)
newRdKafkaNewTopic topicName topicPartitions topicReplicationFactor = do
allocaBytes nErrorBytes $ \ptr -> do
res <- rdKafkaNewTopicNew topicName topicPartitions topicReplicationFactor ptr (fromIntegral nErrorBytes)
withForeignPtr res $ \realPtr -> do
if realPtr == nullPtr
then peekCString ptr >>= pure . Left
else addForeignPtrFinalizer rdKafkaNewTopicDestroyFinalizer res >> pure (Right res)

--- Create topic
rdKafkaCreateTopic :: RdKafkaTPtr
-> RdKafkaNewTopicTPtr
-> RdKafkaAdminOptionsTPtr
-> RdKafkaQueueTPtr
-> IO ()
rdKafkaCreateTopic kafkaPtr topic opts queue = do
let topics = [topic]
withForeignPtrs kafkaPtr opts queue $ \kPtr oPtr qPtr ->
withForeignPtrsArrayLen topics $ \tLen tPtr -> do
{#call rd_kafka_CreateTopics#} kPtr tPtr (fromIntegral tLen) oPtr qPtr

--- Delete topic
foreign import ccall unsafe "rdkafka.h &rd_kafka_DeleteTopic_destroy"
rdKafkaDeleteTopicDestroy :: FinalizerPtr RdKafkaDeleteTopicT

data RdKafkaDeleteTopicT
{#pointer *rd_kafka_DeleteTopic_t as RdKafkaDeleteTopicTPtr foreign -> RdKafkaDeleteTopicT #}

data RdKafkaDeleteTopicsResultT
{#pointer *rd_kafka_DeleteTopics_result_t as RdKafkaDeleteTopicResultTPtr foreign -> RdKafkaDeleteTopicsResultT #}

newRdKafkaDeleteTopic :: String -> IO (Either String RdKafkaDeleteTopicTPtr)
newRdKafkaDeleteTopic topicNameStr =
withCString topicNameStr $ \topicNameStrPtr -> do
res <- {#call rd_kafka_DeleteTopic_new#} topicNameStrPtr
if (res == nullPtr)
then return $ Left $ "Something went wrong while deleting topic " ++ topicNameStr
else Right <$> newForeignPtr rdKafkaDeleteTopicDestroy res

rdKafkaDeleteTopics :: RdKafkaTPtr
-> [RdKafkaDeleteTopicTPtr]
-> RdKafkaAdminOptionsTPtr
-> RdKafkaQueueTPtr
-> IO ()
rdKafkaDeleteTopics kafkaPtr topics opts queue = do
withForeignPtrs kafkaPtr opts queue $ \kPtr oPtr qPtr ->
withForeignPtrsArrayLen topics $ \tLen tPtr -> do
{#call rd_kafka_DeleteTopics#} kPtr tPtr (fromIntegral tLen) oPtr qPtr

-- Marshall / Unmarshall
enumToCInt :: Enum a => a -> CInt
Expand All @@ -1170,6 +1265,9 @@ boolFromCInt (CInt 0) = False
boolFromCInt (CInt _) = True
{-# INLINE boolFromCInt #-}

peekIntConv :: (Storable a, Integral a, Integral b) => Ptr a -> IO b
peekIntConv = liftM fromIntegral . peek

peekInt64Conv :: (Storable a, Integral a) => Ptr a -> IO Int64
peekInt64Conv = liftM cIntConv . peek
{-# INLINE peekInt64Conv #-}
Expand All @@ -1194,3 +1292,26 @@ c_stdout :: IO CFilePtr
c_stdout = handleToCFile stdout "w"
c_stderr :: IO CFilePtr
c_stderr = handleToCFile stderr "w"


withForeignPtrs :: ForeignPtr kafkaPtr
-> ForeignPtr optPtr
-> ForeignPtr queuePtr
-> (Ptr kafkaPtr -> Ptr optPtr -> Ptr queuePtr -> IO x)
-> IO x
withForeignPtrs kafkaPtr optPtr queuePtr f =
withForeignPtr kafkaPtr $ \kafkaPtr' ->
withForeignPtr optPtr $ \optPtr' ->
withForeignPtr queuePtr $ \queuePtr' -> f kafkaPtr' optPtr' queuePtr'

withForeignPtrsArrayLen :: [ForeignPtr a]
-> (Int -> Ptr (Ptr a) -> IO b)
-> IO b
withForeignPtrsArrayLen as f =
let withForeignPtrsList [] g = g []
withForeignPtrsList (x:xs) g =
withForeignPtr x $ \x' ->
withForeignPtrsList xs $ \xs' ->
g (x' : xs')
in withForeignPtrsList as $ \ptrs ->
withArrayLen ptrs $ \llen pptrs -> f llen pptrs
2 changes: 1 addition & 1 deletion src/Kafka/Producer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -209,4 +209,4 @@ withBS (Just bs) f =
in withForeignPtr d $ \p -> f (p `plusPtr` o) l

outboundQueueLength :: Kafka -> IO Int
outboundQueueLength (Kafka k) = rdKafkaOutqLen k
outboundQueueLength (Kafka k) = rdKafkaOutqLen k
Loading
Loading