Skip to content

Commit

Permalink
Add Kafka Support
Browse files Browse the repository at this point in the history
  • Loading branch information
ahmad-marzouq committed Apr 2, 2022
1 parent 402f187 commit 78e70ed
Show file tree
Hide file tree
Showing 8 changed files with 446 additions and 14 deletions.
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ lazy val commonDependencies = Seq(
Dependencies.Libraries.nsqClient,
Dependencies.Libraries.netty,
Dependencies.Libraries.jacksonCbor,
Dependencies.Libraries.kafkaConnectApi,
// Scala
Dependencies.Libraries.catsRetry,
Dependencies.Libraries.circeOptics,
Expand Down
36 changes: 36 additions & 0 deletions config/config.kafka.minimal.hocon
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# The minimum required config options for loading from kafka
{
"input": {
"type": "kafka"
"brokers": "localhost:9092"
"topicName":"enriched-events"
"retries": 10
"groupId": "kafka-consumer"
"buffer": {
"recordLimit": 500
}
}
"output": {
"good": {
"client": {
"endpoint": "localhost"
}
"cluster": {
"index": "good"
}
}
"bad" {
"type": "kafka"
"brokers": "localhost:9092"
"topicName":"bad-events"

"retries": 10
"groupId": "kafka-consumer"
"buffer": {
"byteLimit" : 4500000
"timeLimit" : 500
}
}
}
"purpose": "JSON"
}
129 changes: 129 additions & 0 deletions config/config.kafka.reference.hocon
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
{
"input": {
# Sources currently supported are:
# "kinesis" for reading records from a Kinesis stream
# "stdin" for reading unencoded tab-separated events from stdin
# If set to "stdin", JSON documents will not be sent to Elasticsearch
# but will be written to stdout.
# "nsq" for reading unencoded tab-separated events from NSQ
"type": "kafka"

# Topic name for incoming data
"topicName": "enriched-events"


# Kafka brokers
"brokers": "localhost:9092"

# Events are accumulated in a buffer before being sent to Elasticsearch.
# The buffer is emptied whenever the number of stored records exceeds recordLimit
"buffer": {
"recordLimit": 499 # Default value 500
}
}

"output": {
"good": {
# Good sinks currently supported are:
# "elasticsearch" for writing good records to Elasticsearch
# "stdout" for writing good records to stdout
# Default value "elasticsearch"
"type": "elasticsearch"

# Events are indexed using an Elasticsearch Client
# - endpoint: the cluster endpoint
# - port (optional, default value 9200): the port the cluster can be accessed on
# - for http this is usually 9200
# - for transport this is usually 9300
# - username (optional, remove if not active): http basic auth username
# - password (optional, remove if not active): http basic auth password
# - shardDateFormat (optional, remove if not needed): formatting used for sharding good stream, i.e. _yyyy-MM-dd
# - shardDateField (optional, if not specified derived_tstamp is used): timestamp field for sharding good stream
# - maxTimeout: the maximum attempt time before a client restart
# - maxRetries (optional, default value 6): the maximum number of request attempts before giving up
# - ssl (optional, default value false): if using the http client, whether to use ssl or not
"client": {
"endpoint": "localhost"
"port": 9200
"username": "es-user"
"password": "es-pass"
"shardDateFormat": "_yyyy-MM-dd"
"shardDateField": "derived_tstamp"
"maxTimeout": 9999
"maxRetries": 5
"ssl": true
}

# When using the AWS ES service
# - signing: if using the http client and the AWS ES service you can sign your requests
# http://docs.aws.amazon.com/general/latest/gr/signing_aws_api_requests.html
# - region where the AWS ES service is located
# These values are optional.
"aws": {
"signing": true # Default value false
"region": "eu-central-1" # Default value empty string
}

"cluster": {
# The Elasticsearch index name
# Default value "good"
"index": "good"
# The Elasticsearch index type.
# Index types are deprecated in ES >=7.x
# Therefore, it shouldn't be set with ES >=7.x
"documentType": "good-doc"
}

# Bulk request to Elasticsearch will be splitted to
# chunks according given limits.
# These values are optional.
"chunk": {
"byteLimit": 999999 # Default value is 1000000
"recordLimit": 499 # Default value is 500
}
}
"bad" {
# Bad sinks currently supported are:
# "kinesis" for writing bad records to Kinesis
# "stderr" for writing bad records to stderr
# "nsq" for writing bad records to NSQ
# "kafka" for writing bad records to Kafka
# "none" for ignoring bad records
"type": "kafka"

# Topic name for events which are rejected by Elasticsearch
"topicName": "bad-events"

# brokers name for kafka
"brokers": "localhost:9092"

# Events are accumulated in a buffer before being sent to Kafka.
# The buffer is emptied whenever:
# - the combined size of the stored records reaches byteLimit or
# - the time in milliseconds since it was last emptied exceeds timeLimit when
# a new event enters the buffer
buffer {
byteLimit = 50000
timeLimit = 5000
}
}
}

# "ENRICHED_EVENTS" for a stream of successfully enriched events
# "BAD_ROWS" for a stream of bad events
# "JSON" for writing plain json
"purpose": "ENRICHED_EVENTS"

# Optional section for tracking endpoints
"monitoring": {
"snowplow": {
"collector": "localhost:14322"
"appId": "test-app-id"
}

"metrics": {
# Optional, cloudwatch metrics are enabled by default.
"cloudWatch": false
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,12 @@ package com.snowplowanalytics.stream.loader

import java.nio.file.{Files, Path}
import java.text.SimpleDateFormat

import scala.util.Try

import com.monovore.decline.{Command, Opts}

import cats.syntax.either._
import cats.syntax.validated._

import com.amazonaws.regions.{DefaultAwsRegionProviderChain, Regions}

import com.typesafe.config.ConfigOrigin

import pureconfig._
import pureconfig.generic.{FieldCoproductHint, ProductHint}
import pureconfig.generic.semiauto._
Expand All @@ -53,6 +47,18 @@ object Config {
object Source {
final case object Stdin extends Source

final case class Kafka(
brokers: String,
topicName: String,
groupId: String,
sourceConf: Option[Map[String, String]],
buffer: Kafka.Buffer
) extends Source

object Kafka {
final case class Buffer(recordLimit: Long)
}

final case class Nsq(
streamName: String,
channelName: String,
Expand Down Expand Up @@ -138,6 +144,19 @@ object Config {

final case object Stderr extends BadSink

final case class Kafka(
brokers: String,
retries: Int,
topicName: String,
groupId: String,
producerConf: Option[Map[String, String]],
buffer: Kafka.Buffer
) extends BadSink

object Kafka {
final case class Buffer(byteLimit: Long, timeLimit: Long)
}

final case class Nsq(
streamName: String,
nsqdHost: String,
Expand Down Expand Up @@ -225,6 +244,10 @@ object Config {
deriveReader[Source.Nsq]
implicit val sourceNsqBufferConfigReader: ConfigReader[Source.Nsq.Buffer] =
deriveReader[Source.Nsq.Buffer]
implicit val sourceKafkaConfigReader: ConfigReader[Source.Kafka] =
deriveReader[Source.Kafka]
implicit val sourceKafkaBufferConfigReader: ConfigReader[Source.Kafka.Buffer] =
deriveReader[Source.Kafka.Buffer]
implicit val sourceKinesisConfigReader: ConfigReader[Source.Kinesis] =
deriveReader[Source.Kinesis]
implicit val sourceKinesisConfigBufferReader: ConfigReader[Source.Kinesis.Buffer] =
Expand Down Expand Up @@ -257,6 +280,10 @@ object Config {
deriveReader[Sink.BadSink.Stderr.type]
implicit val sinkBadNsqConfigReader: ConfigReader[Sink.BadSink.Nsq] =
deriveReader[Sink.BadSink.Nsq]
implicit val sinkBadKafkaConfigReader: ConfigReader[Sink.BadSink.Kafka] =
deriveReader[Sink.BadSink.Kafka]
implicit val sinkBadKafkaBufferConfigReader: ConfigReader[Sink.BadSink.Kafka.Buffer] =
deriveReader[Sink.BadSink.Kafka.Buffer]
implicit val monitoringConfigReader: ConfigReader[Monitoring] =
deriveReader[Monitoring]
implicit val snowplowMonitoringConfig: ConfigReader[Monitoring.SnowplowMonitoring] =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/**
* Copyright (c) 2014-2021 Snowplow Analytics Ltd.
* All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache
* License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied.
*
* See the Apache License Version 2.0 for the specific language
* governing permissions and limitations there under.
*/
package com.snowplowanalytics.stream.loader
package executors

// Kafka
import org.apache.kafka.clients.consumer.KafkaConsumer

import java.nio.charset.StandardCharsets.UTF_8

//Java
import java.time.Duration
import java.util
import java.util.Properties

// Scala
import scala.collection.mutable.ListBuffer
import scala.collection.JavaConverters._

// Logging
import org.slf4j.LoggerFactory

// This project
import com.snowplowanalytics.stream.loader.Config._
import com.snowplowanalytics.stream.loader.clients._
import com.snowplowanalytics.stream.loader.sinks._
import com.snowplowanalytics.stream.loader.transformers.{
BadEventTransformer,
EnrichedEventJsonTransformer,
JsonTransformer
}

/**
* NSQSource executor
*
* @param purpose kind of data stored, good, bad or plain-json
* @param kafka Kafka KafkaConfig
* @param goodSink the configured GoodSink
* @param badSink the configured BadSink
*/
class KafkaSourceExecutor(
purpose: Purpose,
kafka: Source.Kafka,
goodSink: Either[ISink, BulkSender[EmitterJsonInput]],
badSink: ISink,
shardDateField: Option[String],
shardDateFormat: Option[String]
) extends Runnable {

private lazy val log = LoggerFactory.getLogger(getClass)

// nsq messages will be buffered in msgBuffer until buffer size become equal to nsqBufferSize
private val msgBuffer = new ListBuffer[EmitterJsonInput]()
// ElasticsearchEmitter instance
private val emitter =
new Emitter(
goodSink,
badSink
)
private val transformer = purpose match {
case Purpose.Enriched => new EnrichedEventJsonTransformer(shardDateField, shardDateFormat)
case Purpose.Json => new JsonTransformer
case Purpose.Bad => new BadEventTransformer
}

/**
* Creates a new Kafka Producer with the given
* configuration options
*
* @return a new Kafka Producer
*/
private def createConsumer: KafkaConsumer[String, Array[Byte]] = {

log.info(s"Connect Kafka Consumer to brokers: ${kafka.brokers}")

val props = new Properties()
props.put("bootstrap.servers", kafka.brokers)
props.put("group.id", kafka.groupId)
props.put("enable.auto.commit", "true")
props.put("auto.commit.interval.ms", "1000")
props.put("auto.offset.reset", "earliest")
props.put("session.timeout.ms", "30000")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer")

kafka.sourceConf.getOrElse(Map()).foreach { case (k, v) => props.setProperty(k, v) }

new KafkaConsumer[String, Array[Byte]](props)
}

/** Never-ending processing loop over source stream. */
override def run(): Unit = {
log.info(s"Running Kafka consumer group: ${kafka.groupId}.")
log.info(s"Processing raw input Kafka topic: ${kafka.topicName}")
val kafkaBufferSize = kafka.buffer.recordLimit
val consumer = createConsumer

consumer.subscribe(util.Collections.singletonList(kafka.topicName))

while (true) {
val recordValues = consumer
.poll(Duration.ofMillis(100)) // Wait 100 ms if data is not available
.asScala
.toList
.map(_.value)
msgBuffer.synchronized {
for (record <- recordValues) {
val msgStr = new String(record, UTF_8)
val emitterInput = transformer.consumeLine(msgStr)
msgBuffer += emitterInput

if (msgBuffer.size == kafkaBufferSize) {
val rejectedRecords = emitter.attemptEmit(msgBuffer.toList)
emitter.fail(rejectedRecords.asJava)
msgBuffer.clear()
}
}
}
}
}
}
Loading

0 comments on commit 78e70ed

Please sign in to comment.