From c35da2da725e9f5e31605e2733ec1c46ff71347b Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Fri, 18 Jun 2021 11:51:39 +0200 Subject: [PATCH] Feature: SASL Auth (PLAIN and SCRAM) (#3) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Test and fix issues with SASL plain and scram authentication * Return error on SASL SCRAM failure * Reformat JS code * Fix docstrings/comments in scripts * Remove minBytes and maxBytes from reader * Fix linter errors * Add a slightly better implementation of credentials for SASL * Update README --- For testing this feature, I've created a test environment with SASL PLAIN and SASL SCRAM enabled using Confluents test environments: ``` $ git clone https://github.com/vdesabou/kafka-docker-playground $ cd kafka-docker-playground/environment/sasl-plain $ ./start.sh ``` I've compiled xk6-kafka and copied it to the broker container. I also copied the test_sasl_auth.js into the container. Then I executed a shell inside the container and run k6 using the test script. ``` $ xk6 build --with github.com/mostafa/xk6-kafka=. $ docker cp k6 broker:/ $ docker cp test_sasl_auth.js broker:/ $ docker exec -it broker bash [appuser@broker ~]$ cd / [appuser@broker ~]$ ./k6 run --vus 50 --duration 60s test_sasl_auth.js /\ |‾‾| /‾‾/ /‾‾/ /\ / \ | |/ / / / / \/ \ | ( / ‾‾\ / \ | |\ \ | (‾) | / __________ \ |__| \__\ \_____/ .io execution: local script: test_sasl_auth.js output: - scenarios: (100.00%) 1 scenario, 50 max VUs, 1m30s max duration (incl. graceful stop): * default: 50 looping VUs for 1m0s (gracefulStop: 30s) running (1m00.4s), 00/50 VUs, 3711 complete and 0 interrupted iterations default ✓ [======================================] 50 VUs 1m0s ✓ is sent ✓ 10 messages returned █ teardown checks.........................: 100.00% ✓ 374811 ✗ 0 data_received..................: 0 B 0 B/s data_sent......................: 0 B 0 B/s iteration_duration.............: avg=811.56ms min=7.13ms med=734.71ms max=2.34s p(90)=1.1s p(95)=1.34s iterations.....................: 3711 61.420675/s kafka.reader.dial.count........: 50 0.827549/s kafka.reader.error.count.......: 0 0/s kafka.reader.fetches.count.....: 50 0.827549/s kafka.reader.message.bytes.....: 7.3 MB 120 kB/s kafka.reader.message.count.....: 37160 615.034296/s kafka.reader.rebalance.count...: 0 0/s kafka.reader.timeouts.count....: 0 0/s kafka.writer.dial.count........: 50 0.827549/s kafka.writer.error.count.......: 0 0/s kafka.writer.message.bytes.....: 146 MB 2.4 MB/s kafka.writer.message.count.....: 742200 12284.134941/s kafka.writer.rebalance.count...: 250 4.137744/s kafka.writer.write.count.......: 742200 12284.134941/s vus............................: 50 min=50 max=50 vus_max........................: 50 min=50 max=50 ``` --- README.md | 2 + auth.go | 64 +++++++++++++++++++++++++++++++ consumer.go | 25 ++++++++---- producer.go | 24 ++++++++++-- test_avro.js | 98 +++++++++++++++++++++++------------------------ test_json.js | 50 ++++++++++++------------ test_sasl_auth.js | 79 ++++++++++++++++++++++++++++++++++++++ 7 files changed, 257 insertions(+), 85 deletions(-) create mode 100644 auth.go create mode 100644 test_sasl_auth.js diff --git a/README.md b/README.md index af1d827..0e11ec2 100644 --- a/README.md +++ b/README.md @@ -54,6 +54,8 @@ $ docker exec -it lensesio bash (inside container)$ kafka-topics --create --topic xk6_kafka_json_topic --bootstrap-server localhost:9092 ``` +If you want to test SASL authentication, have a look at (this commmit message)[https://github.com/mostafa/xk6-kafka/pull/3/commits/216ee0cd4f69864cb259445819541ef34fe2f2dd], where I describe how to run a test environment. + ### k6 Test The following k6 test script is used to test this extension and Apache Kafka in turn. The script is available as `test_.js` with more code and commented sections. The scripts have 4 parts: diff --git a/auth.go b/auth.go new file mode 100644 index 0000000..8a94a4d --- /dev/null +++ b/auth.go @@ -0,0 +1,64 @@ +package kafka + +import ( + "encoding/json" + "time" + + kafkago "github.com/segmentio/kafka-go" + "github.com/segmentio/kafka-go/sasl/plain" + "github.com/segmentio/kafka-go/sasl/scram" +) + +const ( + Plain = "plain" + SHA256 = "sha256" + SHA512 = "sha512" +) + +type Credentials struct { + Username string `json:"username"` + Password string `json:"password"` + Algorithm string `json:"algorithm"` +} + +func unmarshalCredentials(auth string) (creds *Credentials, err error) { + creds = &Credentials{ + Algorithm: Plain, + } + + err = json.Unmarshal([]byte(auth), &creds) + + return +} + +func getDialer(creds *Credentials) (dialer *kafkago.Dialer) { + dialer = &kafkago.Dialer{ + Timeout: 10 * time.Second, + DualStack: true, + } + + if creds.Algorithm == Plain { + mechanism := plain.Mechanism{ + Username: creds.Username, + Password: creds.Password, + } + dialer.SASLMechanism = mechanism + return + } else { + hashes := make(map[string]scram.Algorithm) + hashes["sha256"] = scram.SHA256 + hashes["sha512"] = scram.SHA512 + + mechanism, err := scram.Mechanism( + hashes[creds.Algorithm], + creds.Username, + creds.Password, + ) + if err != nil { + ReportError(err, "authentication failed") + return nil + } + dialer.SASLMechanism = mechanism + return + } +} diff --git a/consumer.go b/consumer.go index c10fc89..55aa563 100644 --- a/consumer.go +++ b/consumer.go @@ -6,10 +6,10 @@ import ( "io" "time" + kafkago "github.com/segmentio/kafka-go" "go.k6.io/k6/js/modules" "go.k6.io/k6/lib" "go.k6.io/k6/stats" - kafkago "github.com/segmentio/kafka-go" ) func init() { @@ -19,22 +19,31 @@ func init() { type Kafka struct{} func (*Kafka) Reader( - brokers []string, topic string, partition int, - minBytes int, maxBytes int, offset int64) *kafkago.Reader { + brokers []string, topic string, partition int, offset int64, auth string) *kafkago.Reader { + var dialer *kafkago.Dialer + + if auth != "" { + creds, err := unmarshalCredentials(auth) + if err != nil { + ReportError(err, "Unable to unmarshal credentials") + return nil + } - if maxBytes == 0 { - maxBytes = 10e6 // 10MB + dialer = getDialer(creds) + if dialer == nil { + ReportError(nil, "Dialer cannot authenticate") + return nil + } } reader := kafkago.NewReader(kafkago.ReaderConfig{ Brokers: brokers, Topic: topic, Partition: partition, - MinBytes: minBytes, - MaxBytes: maxBytes, MaxWait: time.Millisecond * 200, RebalanceTimeout: time.Second * 5, QueueCapacity: 1, + Dialer: dialer, }) if offset > 0 { @@ -102,7 +111,7 @@ func (*Kafka) Consume( func ReportReaderStats(ctx context.Context, currentStats kafkago.ReaderStats) error { state := lib.GetState(ctx) - err := errors.New("State is nil") + err := errors.New("state is nil") if state == nil { ReportError(err, "Cannot determine state") diff --git a/producer.go b/producer.go index 58f428f..253fa05 100644 --- a/producer.go +++ b/producer.go @@ -10,12 +10,30 @@ import ( "go.k6.io/k6/stats" ) -func (*Kafka) Writer(brokers []string, topic string) *kafkago.Writer { +func (*Kafka) Writer(brokers []string, topic string, auth string) *kafkago.Writer { + var dialer *kafkago.Dialer + + if auth != "" { + creds, err := unmarshalCredentials(auth) + if err != nil { + ReportError(err, "Unable to unmarshal credentials") + return nil + } + + dialer = getDialer(creds) + if dialer == nil { + ReportError(nil, "Dialer cannot authenticate") + return nil + } + } + return kafkago.NewWriter(kafkago.WriterConfig{ Brokers: brokers, Topic: topic, Balancer: &kafkago.LeastBytes{}, BatchSize: 1, + Dialer: dialer, + Async: false, }) } @@ -23,7 +41,7 @@ func (*Kafka) Produce( ctx context.Context, writer *kafkago.Writer, messages []map[string]string, keySchema string, valueSchema string) error { state := lib.GetState(ctx) - err := errors.New("State is nil") + err := errors.New("state is nil") if state == nil { ReportError(err, "Cannot determine state") @@ -66,7 +84,7 @@ func (*Kafka) Produce( func ReportWriterStats(ctx context.Context, currentStats kafkago.WriterStats) error { state := lib.GetState(ctx) - err := errors.New("State is nil") + err := errors.New("state is nil") if state == nil { ReportError(err, "Cannot determine state") diff --git a/test_avro.js b/test_avro.js index f7630ce..97af045 100644 --- a/test_avro.js +++ b/test_avro.js @@ -1,7 +1,7 @@ /* This is a k6 test script that imports the xk6-kafka and -tests Kafka with a 100 Avro messages per iteration. +tests Kafka with a 200 Avro messages per iteration. */ @@ -36,60 +36,60 @@ const valueSchema = JSON.stringify({ "name": "Value", "namespace": "dev.mostafa.xk6.kafka", "fields": [{ - "name": "name", - "type": "string" - }, - { - "name": "version", - "type": "string" - }, - { - "name": "author", - "type": "string" - }, - { - "name": "description", - "type": "string" - }, - { - "name": "url", - "type": "string" - }, - { - "name": "index", - "type": "int" - } + "name": "name", + "type": "string" + }, + { + "name": "version", + "type": "string" + }, + { + "name": "author", + "type": "string" + }, + { + "name": "description", + "type": "string" + }, + { + "name": "url", + "type": "string" + }, + { + "name": "index", + "type": "int" + } ] }); export default function () { for (let index = 0; index < 100; index++) { let messages = [{ - key: JSON.stringify({ - "correlationId": "test-id-abc-" + index - }), - value: JSON.stringify({ - "name": "xk6-kafka", - "version": "0.2.1", - "author": "Mostafa Moradian", - "description": "k6 extension to load test Apache Kafka with support for Avro messages", - "url": "https://mostafa.dev", - "index": index - }) - }, - { - key: JSON.stringify({ - "correlationId": "test-id-def-" + index - }), - value: JSON.stringify({ - "name": "xk6-kafka", - "version": "0.2.1", - "author": "Mostafa Moradian", - "description": "k6 extension to load test Apache Kafka with support for Avro messages", - "url": "https://mostafa.dev", - "index": index - }) - } + key: JSON.stringify({ + "correlationId": "test-id-abc-" + index + }), + value: JSON.stringify({ + "name": "xk6-kafka", + "version": "0.2.1", + "author": "Mostafa Moradian", + "description": "k6 extension to load test Apache Kafka with support for Avro messages", + "url": "https://mostafa.dev", + "index": index + }) + }, + { + key: JSON.stringify({ + "correlationId": "test-id-def-" + index + }), + value: JSON.stringify({ + "name": "xk6-kafka", + "version": "0.2.1", + "author": "Mostafa Moradian", + "description": "k6 extension to load test Apache Kafka with support for Avro messages", + "url": "https://mostafa.dev", + "index": index + }) + } ] let error = produce(producer, messages, keySchema, valueSchema); check(error, { diff --git a/test_json.js b/test_json.js index 0136728..5f2861a 100644 --- a/test_json.js +++ b/test_json.js @@ -1,7 +1,7 @@ /* This is a k6 test script that imports the xk6-kafka and -tests Kafka with a 100 JSON messages per iteration. +tests Kafka with a 200 JSON messages per iteration. */ @@ -13,7 +13,7 @@ import { produce, reader, consume -} from 'k6/x/kafka'; // import kafka plugin +} from 'k6/x/kafka'; // import kafka extension const bootstrapServers = ["localhost:9092"]; const kafkaTopic = "xk6_kafka_json_topic"; @@ -24,29 +24,29 @@ const consumer = reader(bootstrapServers, kafkaTopic); export default function () { for (let index = 0; index < 100; index++) { let messages = [{ - key: JSON.stringify({ - "correlationId": "test-id-abc-" + index - }), - value: JSON.stringify({ - "name": "xk6-kafka", - "version": "0.2.1", - "author": "Mostafa Moradian", - "description": "k6 extension to load test Apache Kafka with support for Avro messages", - "index": index - }) - }, - { - key: JSON.stringify({ - "correlationId": "test-id-def-" + index - }), - value: JSON.stringify({ - "name": "xk6-kafka", - "version": "0.2.1", - "author": "Mostafa Moradian", - "description": "k6 extension to load test Apache Kafka with support for Avro messages", - "index": index - }) - } + key: JSON.stringify({ + "correlationId": "test-id-abc-" + index + }), + value: JSON.stringify({ + "name": "xk6-kafka", + "version": "0.2.1", + "author": "Mostafa Moradian", + "description": "k6 extension to load test Apache Kafka with support for Avro messages", + "index": index + }) + }, + { + key: JSON.stringify({ + "correlationId": "test-id-def-" + index + }), + value: JSON.stringify({ + "name": "xk6-kafka", + "version": "0.2.1", + "author": "Mostafa Moradian", + "description": "k6 extension to load test Apache Kafka with support for Avro messages", + "index": index + }) + } ] let error = produce(producer, messages); diff --git a/test_sasl_auth.js b/test_sasl_auth.js new file mode 100644 index 0000000..ac39edf --- /dev/null +++ b/test_sasl_auth.js @@ -0,0 +1,79 @@ +/* + +This is a k6 test script that imports the xk6-kafka and +tests Kafka with a 200 JSON messages per iteration. It +also uses SASL authentication. + +*/ + +import { + check +} from 'k6'; +import { + writer, + produce, + reader, + consume +} from 'k6/x/kafka'; // import kafka extension + +const bootstrapServers = ["localhost:9092"]; +const kafkaTopic = "xk6_kafka_json_topic"; +const auth = JSON.stringify({ + username: "client", + password: "client-secret", + // Possible values for the algorithm is: + // SASL Plain: "plain" (default if omitted) + // SASL SCRAM: "sha256", "sha512" + algorithm: "sha256" +}) +const offset = 0; +const partition = 1; + +const producer = writer(bootstrapServers, kafkaTopic, auth); +const consumer = reader(bootstrapServers, kafkaTopic, offset, partition, auth); + +export default function () { + for (let index = 0; index < 100; index++) { + let messages = [{ + key: JSON.stringify({ + "correlationId": "test-id-abc-" + index + }), + value: JSON.stringify({ + "name": "xk6-kafka", + "version": "0.2.1", + "author": "Mostafa Moradian", + "description": "k6 extension to load test Apache Kafka with support for Avro messages", + "index": index + }) + }, + { + key: JSON.stringify({ + "correlationId": "test-id-def-" + index + }), + value: JSON.stringify({ + "name": "xk6-kafka", + "version": "0.2.1", + "author": "Mostafa Moradian", + "description": "k6 extension to load test Apache Kafka with support for Avro messages", + "index": index + }) + } + ] + + let error = produce(producer, messages); + check(error, { + "is sent": err => err == undefined + }); + } + + // Read 10 messages only + let messages = consume(consumer, 10); + check(messages, { + "10 messages returned": msgs => msgs.length == 10 + }) +} + +export function teardown(data) { + producer.close(); + consumer.close(); +} \ No newline at end of file