Skip to content

Commit

Permalink
chore: use HasKafka in favor of KAdmin
Browse files Browse the repository at this point in the history
  • Loading branch information
JoranVanBelle committed Jan 14, 2025
1 parent ba286d3 commit 7bc649a
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 115 deletions.
5 changes: 2 additions & 3 deletions hw-kafka-client.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,8 @@ library
build-tool-depends: c2hs:c2hs
if impl(ghc <8.0)
build-depends: semigroups
exposed-modules: Kafka.Admin
Kafka.Admin.AdminProperties
Kafka.Admin.Types
exposed-modules: Kafka.Topic
Kafka.Topic.Types
Kafka.Consumer
Kafka.Consumer.ConsumerProperties
Kafka.Consumer.Subscription
Expand Down
43 changes: 0 additions & 43 deletions src/Kafka/Admin/AdminProperties.hs

This file was deleted.

5 changes: 2 additions & 3 deletions src/Kafka/Internal/RdKafka.chs
Original file line number Diff line number Diff line change
Expand Up @@ -1161,11 +1161,10 @@ rdKafkaErrorIsRetriable ptr = boolFromCInt <$> rdKafkaErrorIsRetriable' ptr
rdKafkaErrorTxnRequiresAbort :: RdKafkaErrorTPtr -> IO Bool
rdKafkaErrorTxnRequiresAbort ptr = boolFromCInt <$> rdKafkaErrorTxnRequiresAbort' ptr

-- Admin API
-- Topics
{#enum rd_kafka_admin_op_t as ^ {underscoreToCase} deriving (Show, Eq) #}

data RdKafkaTopicResultT
{#pointer *rd_kafka_topic_result_t as RdKafkaTopicResultTPtr foreign -> RdKafkaTopicResultT #}


data RdKafkaAdminOptionsT
{#pointer *rd_kafka_AdminOptions_t as RdKafkaAdminOptionsTPtr foreign -> RdKafkaAdminOptionsT #}
Expand Down
35 changes: 9 additions & 26 deletions src/Kafka/Admin.hs → src/Kafka/Topic.hs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
module Kafka.Admin(
module Kafka.Topic(
module X
, newKAdmin
, createTopic
, deleteTopic
, closeKAdmin
) where

import Control.Monad

Check warning on line 7 in src/Kafka/Topic.hs

View workflow job for this annotation

GitHub Actions / build (9.10.1, ubuntu-latest)

The import of ‘Control.Monad’ is redundant

Check warning on line 7 in src/Kafka/Topic.hs

View workflow job for this annotation

GitHub Actions / build (9.2.4, ubuntu-latest)

The import of ‘Control.Monad’ is redundant

Check warning on line 7 in src/Kafka/Topic.hs

View workflow job for this annotation

GitHub Actions / build (9.0.2, ubuntu-latest)

The import of ‘Control.Monad’ is redundant

Check warning on line 7 in src/Kafka/Topic.hs

View workflow job for this annotation

GitHub Actions / build (9.4.2, ubuntu-latest)

The import of ‘Control.Monad’ is redundant
Expand All @@ -17,28 +15,12 @@ import Kafka.Internal.RdKafka
import Kafka.Internal.Setup

import Kafka.Types as X
import Kafka.Admin.AdminProperties as X
import Kafka.Admin.Types as X
import Kafka.Topic.Types as X

newKAdmin ::( MonadIO m )
=> AdminProperties
-> m (Either KafkaError KAdmin)
newKAdmin properties = liftIO $ do
kafkaConfig@(KafkaConf kafkaConf' _ _) <- kafkaConf ( KafkaProps $ adminProps properties)
maybeKafka <- newRdKafkaT RdKafkaConsumer kafkaConf'
case maybeKafka of
Left err -> pure $ Left $ KafkaError err
Right kafka -> pure $ Right $ KAdmin (Kafka kafka) kafkaConfig

closeKAdmin :: KAdmin
-> IO ()
closeKAdmin ka = void $ rdKafkaConsumerClose (getRdKafka ka)
--- CREATE TOPIC ---
createTopic :: KAdmin
-> NewTopic
-> IO (Either KafkaError TopicName)
createTopic kAdmin topic = liftIO $ do
let kafkaPtr = getRdKafka kAdmin
createTopic :: HasKafka k => k -> NewTopic -> IO (Either KafkaError TopicName)
createTopic k topic = do
let kafkaPtr = getRdKafka k
queue <- newRdKafkaQueue kafkaPtr
opts <- newRdKAdminOptions kafkaPtr RdKafkaAdminOpAny

Expand All @@ -50,11 +32,12 @@ createTopic kAdmin topic = liftIO $ do
pure $ Right $ topicName topic

--- DELETE TOPIC ---
deleteTopic :: KAdmin
deleteTopic :: HasKafka k
=> k
-> TopicName
-> IO (Either KafkaError TopicName)
deleteTopic kAdmin topic = liftIO $ do
let kafkaPtr = getRdKafka kAdmin
deleteTopic k topic = liftIO $ do
let kafkaPtr = getRdKafka k
queue <- newRdKafkaQueue kafkaPtr
opts <- newRdKAdminOptions kafkaPtr RdKafkaAdminOpAny

Expand Down
18 changes: 2 additions & 16 deletions src/Kafka/Admin/Types.hs → src/Kafka/Topic/Types.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
module Kafka.Admin.Types (
KAdmin(..)
, PartitionCount (..)
module Kafka.Topic.Types (
PartitionCount (..)
, ReplicationFactor (..)
, NewTopic (..)
) where
Expand All @@ -10,19 +9,6 @@ import Data.Map
import Kafka.Types
import Kafka.Internal.Setup

Check warning on line 10 in src/Kafka/Topic/Types.hs

View workflow job for this annotation

GitHub Actions / build (8.8.4, ubuntu-latest)

The import of ‘Kafka.Internal.Setup’ is redundant

Check warning on line 10 in src/Kafka/Topic/Types.hs

View workflow job for this annotation

GitHub Actions / build (8.6.5, ubuntu-latest)

The import of ‘Kafka.Internal.Setup’ is redundant

Check warning on line 10 in src/Kafka/Topic/Types.hs

View workflow job for this annotation

GitHub Actions / build (8.10.7, ubuntu-latest)

The import of ‘Kafka.Internal.Setup’ is redundant

Check warning on line 10 in src/Kafka/Topic/Types.hs

View workflow job for this annotation

GitHub Actions / build (9.10.1, ubuntu-latest)

The import of ‘Kafka.Internal.Setup’ is redundant

Check warning on line 10 in src/Kafka/Topic/Types.hs

View workflow job for this annotation

GitHub Actions / build (9.2.4, ubuntu-latest)

The import of ‘Kafka.Internal.Setup’ is redundant

Check warning on line 10 in src/Kafka/Topic/Types.hs

View workflow job for this annotation

GitHub Actions / build (9.0.2, ubuntu-latest)

The import of ‘Kafka.Internal.Setup’ is redundant

Check warning on line 10 in src/Kafka/Topic/Types.hs

View workflow job for this annotation

GitHub Actions / build (9.4.2, ubuntu-latest)

The import of ‘Kafka.Internal.Setup’ is redundant

data KAdmin = KAdmin {
kaKafkaPtr :: !Kafka
, kaKafkaConf :: !KafkaConf
}

instance HasKafka KAdmin where
getKafka = kaKafkaPtr
{-# INLINE getKafka #-}

instance HasKafkaConf KAdmin where
getKafkaConf = kaKafkaConf
{-# INLINE getKafkaConf #-}

newtype PartitionCount = PartitionCount { unPartitionCount :: Int } deriving (Show, Eq)
newtype ReplicationFactor = ReplicationFactor { unReplicationFactor :: Int } deriving (Show, Eq)

Expand Down
20 changes: 9 additions & 11 deletions tests-it/Kafka/IntegrationSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import Kafka.Consumer
import qualified Data.Text as T
import Kafka.Metadata
import Kafka.Producer
import Kafka.Admin
import Kafka.Topic
import Kafka.TestEnv
import Test.Hspec

Expand Down Expand Up @@ -174,20 +174,19 @@ spec = do
forM_ res $ \rcs ->
forM_ rcs ((`shouldBe` Set.fromList (headersToList testHeaders)) . Set.fromList . headersToList . crHeaders)

describe "Kafka.Admin.Spec" $ do
describe "Kafka.Topic.Spec" $ do
let topicName = addRandomChars "admin.topic.created." 5

topicsMVar <- runIO newEmptyMVar

specWithAdmin "Create topic" $ do
it "should create a new topic" $ \(admin :: KAdmin) -> do
specWithConsumer "Read all topics" consumerProps $ do

it "should create a topic" $ \(consumer :: KafkaConsumer) -> do
tName <- topicName
let newTopic = mkNewTopic (TopicName ( T.pack(tName) ))
result <- createTopic admin newTopic
result <- createTopic consumer newTopic
result `shouldSatisfy` isRight

specWithConsumer "Read all topics" consumerProps $ do

it "should return all the topics" $ \(consumer :: KafkaConsumer) -> do
res <- allTopicsMetadata consumer (Timeout 1000)
Expand All @@ -205,13 +204,12 @@ spec = do
topicsLen `shouldSatisfy` (>0)
hasTopic `shouldBe` True

specWithAdmin "Remove topics" $ do

it "should delete all the topics currently existing" $ \(admin ::KAdmin) -> do
it "should delete all the topics currently existing" $ \(consumer :: KafkaConsumer) -> do
topics <- takeMVar topicsMVar
forM_ topics $ \topic -> do
result <- deleteTopic admin topic
result <- deleteTopic consumer topic
result `shouldSatisfy` isRight

----------------------------------------------------------------------------------------------------------------

data ReadState = Skip | Read
Expand Down
13 changes: 0 additions & 13 deletions tests-it/Kafka/TestEnv.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import qualified System.Random as Rnd
import Control.Concurrent
import Kafka.Consumer as C
import Kafka.Producer as P
import Kafka.Admin as A

import Test.Hspec

Expand Down Expand Up @@ -58,9 +57,6 @@ producerProps = P.brokersList [brokerAddress]
<> P.setCallback (logCallback (\l s1 s2 -> print $ "[Producer] " <> show l <> ": " <> s1 <> ", " <> s2))
<> P.setCallback (errorCallback (\e r -> print $ "[Producer] " <> show e <> ": " <> r))

adminProperties :: AdminProperties
adminProperties = A.brokers [brokerAddress]

testSubscription :: TopicName -> Subscription
testSubscription t = topics [t]
<> offsetReset Earliest
Expand All @@ -80,9 +76,6 @@ mkConsumerWith props = do
(RebalanceAssign _) -> putMVar var True
_ -> pure ()

mkAdmin :: IO KAdmin
mkAdmin = newKAdmin adminProperties >>= \(Right k) -> pure k

specWithConsumer :: String -> ConsumerProperties -> SpecWith KafkaConsumer -> Spec
specWithConsumer s p f =
beforeAll (mkConsumerWith p)
Expand All @@ -97,9 +90,3 @@ specWithKafka s p f =
beforeAll ((,) <$> mkConsumerWith p <*> mkProducer)
$ afterAll (\(consumer, producer) -> void $ closeProducer producer >> closeConsumer consumer)
$ describe s f

specWithAdmin :: String -> SpecWith KAdmin -> Spec
specWithAdmin s f =
beforeAll mkAdmin
$ afterAll (void . closeKAdmin)
$ describe s f

0 comments on commit 7bc649a

Please sign in to comment.