Skip to content

Commit

Permalink
Merge pull request #287 from uds5501/feat/acl-integration-gopay
Browse files Browse the repository at this point in the history
Feature : ACL integration features
  • Loading branch information
Vruttant1403 authored Dec 3, 2024
2 parents 0ac5e04 + dad1197 commit 7e89b4f
Show file tree
Hide file tree
Showing 15 changed files with 514 additions and 141 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
All notable changes to this project will be documented in this file. This change log follows the conventions
of [keepachangelog.com](http://keepachangelog.com/).

## 4.12.0
- Adds support for ACL auth for kafka streams.

## 4.11.1
- Fix retry-count returning nil if empty. Returns 0 by default now.

Expand Down
110 changes: 85 additions & 25 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,40 +1,100 @@
KAFKA_TOPICS = topic another-test-topic
KAFKA_BROKERS = kafka1:9095 kafka2:9096 kafka3:9097
ADMIN_CONFIG = /etc/kafka/secrets/config-admin.properties
KAFKA_CONTAINER = ziggurat_kafka1_1

.PHONY: all
all: test

topic="topic"
another_test_topic="another-test-topic"
# Main target to setup the entire cluster
setup-cluster: down up wait-for-kafka create-scram-credentials create-topics setup-acls

setup:
docker-compose down
lein deps
docker-compose up -d
sleep 10
docker exec ziggurat_kafka /opt/bitnami/kafka/bin/kafka-topics.sh --create --topic $(topic) --partitions 3 --replication-factor 1 --zookeeper ziggurat_zookeeper
docker exec ziggurat_kafka /opt/bitnami/kafka/bin/kafka-topics.sh --create --topic $(another_test_topic) --partitions 3 --replication-factor 1 --zookeeper ziggurat_zookeeper
# Bring down all containers and clean volumes
down:
@echo "Bringing down all containers..."
docker-compose -f docker-compose-cluster.yml down -v

test: setup
TESTING_TYPE=local lein test
docker-compose down
# Start all containers
up:
@echo "Starting all containers..."
docker-compose -f docker-compose-cluster.yml up -d

setup-cluster:
rm -rf /tmp/ziggurat_kafka_cluster_data
docker-compose -f docker-compose-cluster.yml -p ziggurat down
lein deps
docker-compose -f docker-compose-cluster.yml -p ziggurat up -d
sleep 30
# Sleeping for 30s to allow the cluster to come up
docker exec ziggurat_kafka1_1 kafka-topics --create --topic $(topic) --partitions 3 --replication-factor 3 --if-not-exists --zookeeper ziggurat_zookeeper_1
docker exec ziggurat_kafka1_1 kafka-topics --create --topic $(another_test_topic) --partitions 3 --replication-factor 3 --if-not-exists --zookeeper ziggurat_zookeeper_1
# Wait for Kafka to be ready
wait-for-kafka:
@echo "Waiting for Kafka to be ready..."
@sleep 30

# Restart everything
restart: down up wait-for-kafka

# Create SCRAM credentials for admin user
create-scram-credentials:
@echo "Creating SCRAM credentials for admin user..."
@docker exec $(KAFKA_CONTAINER) kafka-configs \
--alter \
--zookeeper zookeeper:2181 \
--add-config 'SCRAM-SHA-256=[password=admin]' \
--entity-type users \
--entity-name admin

# Create all required topics
create-topics:
@for topic in $(KAFKA_TOPICS); do \
echo "Creating topic: $$topic"; \
docker exec $(KAFKA_CONTAINER) kafka-topics \
--create \
--zookeeper zookeeper:2181 \
--if-not-exists \
--topic $$topic \
--partitions 3 \
--replication-factor 3; \
done

# Setup ACLs for admin user on all brokers
setup-acls:
@for broker in $(KAFKA_BROKERS); do \
case $$broker in \
kafka1:9095) \
container="ziggurat_kafka1_1" ;; \
kafka2:9096) \
container="ziggurat_kafka2_1" ;; \
kafka3:9097) \
container="ziggurat_kafka3_1" ;; \
esac; \
for topic in $(KAFKA_TOPICS); do \
echo "Setting up ACLs for topic: $$topic on broker: $$broker using container: $$container"; \
docker exec $$container kafka-acls \
--bootstrap-server $$broker \
--command-config $(ADMIN_CONFIG) \
--add \
--allow-principal User:admin \
--operation All \
--topic $$topic; \
done \
done

# Clean up topics (can be used during development)
clean-topics:
@for topic in $(KAFKA_TOPICS); do \
echo "Deleting topic: $$topic"; \
docker exec $(KAFKA_CONTAINER) kafka-topics --bootstrap-server kafka1:9095 \
--delete \
--topic $$topic; \
done

# Show logs
logs:
docker-compose -f docker-compose-cluster.yml logs -f

test-cluster: setup-cluster
TESTING_TYPE=cluster lein test
docker-compose -f docker-compose-cluster.yml down
rm -rf /tmp/ziggurat_kafka_cluster_data

coverage: setup
coverage: setup-cluster
lein code-coverage
docker-compose down
docker-compose -f docker-compose-cluster.yml down


proto:
protoc -I=resources --java_out=test/ resources/proto/example.proto
protoc -I=resources --java_out=test/ resources/proto/person.proto
protoc -I=resources --java_out=test/ resources/proto/person.proto
5 changes: 5 additions & 0 deletions config-admin.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="admin" \
password="admin";
71 changes: 57 additions & 14 deletions docker-compose-cluster.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,18 @@ services:
container_name: 'ziggurat_rabbitmq'

zookeeper:
image: zookeeper:3.4.9
image: confluentinc/cp-zookeeper:5.5.0
hostname: zookeeper
ports:
- "2181:2181"
environment:
ZOO_MY_ID: 1
ZOO_PORT: 2181
ZOO_SERVERS: server.1=zookeeper:2888:3888
ZOO_TICK_TIME: 2000
ZOOKEEPER_CLIENT_PORT: 2181
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/zookeeper_server_jaas.conf
-Dzookeeper.authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
-Dzookeeper.allowSaslFailedClients=true
-Dzookeeper.requireClientAuthScheme=sasl"
volumes:
- ./zookeeper_server_jaas.conf:/etc/kafka/zookeeper_server_jaas.conf
- /tmp/ziggurat_kafka_cluster_data/zookeeper/data:/data
- /tmp/ziggurat_kafka_cluster_data/zookeeper/datalog:/datalog

Expand All @@ -28,17 +30,32 @@ services:
- SYS_ADMIN
hostname: kafka1
ports:
- "9091:9091"
- "9094:9094"
- "9095:9095"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19091,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9091
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19094,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9094,SASL_PLAINTEXT://${DOCKER_HOST_IP:-127.0.0.1}:9095
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 60000
KAFKA_BROKER_ID: 1
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_NUM_PARTITIONS: 3
KAFKA_SASL_ENABLED_MECHANISMS: SCRAM-SHA-256
KAFKA_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.scram.ScramLoginModule required \
username=\"client\" \
password=\"client-secret\";"
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizer
KAFKA_SUPER_USERS: User:ANONYMOUS;User:admin
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "false"
KAFKA_SECURITY_PROTOCOL: SASL_PLAINTEXT
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/secrets/kafka_server_jaas.conf -Dzookeeper.sasl.client=true -Dzookeeper.sasl.clientconfig=Client"
KAFKA_ZOOKEEPER_SET_ACL: "true"
KAFKA_ZOOKEEPER_SASL_ENABLED: "true"
volumes:
- /tmp/ziggurat_kafka_cluster_data/kafka1/data:/var/lib/kafka/data
- ./kafka_server_jaas.conf:/etc/kafka/secrets/kafka_server_jaas.conf
- ./config-admin.properties:/etc/kafka/secrets/config-admin.properties
depends_on:
- zookeeper

Expand All @@ -50,16 +67,29 @@ services:
hostname: kafka2
ports:
- "9092:9092"
- "9096:9096"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka2:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka2:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,SASL_PLAINTEXT://${DOCKER_HOST_IP:-127.0.0.1}:9096
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 60000
KAFKA_BROKER_ID: 2
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_NUM_PARTITIONS: 3
KAFKA_SASL_ENABLED_MECHANISMS: SCRAM-SHA-256
KAFKA_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.scram.ScramLoginModule required \
username=\"client\" \
password=\"client-secret\";"
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizer
KAFKA_SUPER_USERS: User:ANONYMOUS;User:admin
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "false"
KAFKA_SECURITY_PROTOCOL: SASL_PLAINTEXT
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/secrets/kafka_server_jaas.conf"
volumes:
- /tmp/ziggurat_kafka_cluster_data/kafka2/data:/var/lib/kafka/data
- ./kafka_server_jaas.conf:/etc/kafka/secrets/kafka_server_jaas.conf
- ./config-admin.properties:/etc/kafka/secrets/config-admin.properties
depends_on:
- zookeeper

Expand All @@ -71,15 +101,28 @@ services:
hostname: kafka3
ports:
- "9093:9093"
- "9097:9097"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka3:19093,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka3:19093,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093,SASL_PLAINTEXT://${DOCKER_HOST_IP:-127.0.0.1}:9097
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 60000
KAFKA_BROKER_ID: 3
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_NUM_PARTITIONS: 3
KAFKA_SASL_ENABLED_MECHANISMS: SCRAM-SHA-256
KAFKA_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.scram.ScramLoginModule required \
username=\"client\" \
password=\"client-secret\";"
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizer
KAFKA_SUPER_USERS: User:ANONYMOUS;User:admin
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "false"
KAFKA_SECURITY_PROTOCOL: SASL_PLAINTEXT
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/secrets/kafka_server_jaas.conf"
volumes:
- /tmp/ziggurat_kafka_cluster_data/kafka3/data:/var/lib/kafka/data
- ./kafka_server_jaas.conf:/etc/kafka/secrets/kafka_server_jaas.conf
- ./config-admin.properties:/etc/kafka/secrets/config-admin.properties
depends_on:
- zookeeper
17 changes: 17 additions & 0 deletions kafka_server_jaas.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin";
};

Client {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="admin"
password="admin";
};

KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="client"
password="client-secret";
};
11 changes: 5 additions & 6 deletions project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
(cemerick.pomegranate.aether/register-wagon-factory!
"http" #(org.apache.maven.wagon.providers.http.HttpWagon.))

(defproject tech.gojek/ziggurat "4.11.1"
(defproject tech.gojek/ziggurat "4.12.0"
:description "A stream processing framework to build stateless applications on kafka"
:url "https://github.com/gojektech/ziggurat"
:license {:name "Apache License, Version 2.0"
Expand All @@ -15,7 +15,7 @@
[com.cemerick/url "0.1.1"]
[com.datadoghq/java-dogstatsd-client "2.4"]
[com.fasterxml.jackson.core/jackson-databind "2.9.9"]
[com.novemberain/langohr "5.2.0" :exclusions [org.clojure/clojure]]
[com.novemberain/langohr "5.2.0" :exclusions [org.clojure/clojure org.slf4j/slf4j-api]]
[com.taoensso/nippy "3.1.1"]
[io.dropwizard.metrics5/metrics-core "5.0.0" :scope "compile"]
[medley "1.3.0" :exclusions [org.clojure/clojure]]
Expand All @@ -41,10 +41,7 @@
[com.newrelic.agent.java/newrelic-api "6.5.0"]
[yleisradio/new-reliquary "1.1.0" :exclusions [org.clojure/clojure]]
[metosin/ring-swagger "0.26.2"
:exclusions [cheshire
com.fasterxml.jackson.core/jackson-core
com.fasterxml.jackson.dataformat/jackson-dataformat-smile
com.fasterxml.jackson.dataformat/jackson-dataformat-cbor]]
:exclusions [org.mozilla/rhino com.fasterxml.jackson.dataformat/jackson-dataformat-smile com.fasterxml.jackson.dataformat/jackson-dataformat-cbor cheshire com.google.code.findbugs/jsr305 com.fasterxml.jackson.core/jackson-core]]
[metosin/ring-swagger-ui "3.46.0"]
[cambium/cambium.core "1.1.0"]
[cambium/cambium.codec-cheshire "1.0.0"]
Expand Down Expand Up @@ -72,8 +69,10 @@
:dependencies [[com.google.protobuf/protobuf-java "3.17.0"]
[junit/junit "4.13.2"]
[org.hamcrest/hamcrest-core "2.2"]
[org.apache.kafka/kafka_2.12 "2.8.0"]
[org.apache.kafka/kafka-streams "2.8.2" :classifier "test" :exclusions [org.slf4j/slf4j-log4j12 log4j]]
[org.apache.kafka/kafka-clients "2.8.2" :classifier "test"]
[org.apache.kafka/kafka-streams-test-utils "2.8.2" :classifier "test"]
[org.clojure/test.check "1.1.0"]]
:plugins [[lein-cloverage "1.2.2" :exclusions [org.clojure/clojure]]]
:cloverage {:exclude-call ['cambium.core/info
Expand Down
1 change: 1 addition & 0 deletions resources/config.test.edn
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,5 @@
:new-relic {:report-errors false}
:prometheus {:port 8002
:enabled false}
:ssl {:enabled false}
:log-format "text"}}
Loading

0 comments on commit 7e89b4f

Please sign in to comment.