Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Kafka Support #210

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ lazy val commonDependencies = Seq(
Dependencies.Libraries.nsqClient,
Dependencies.Libraries.netty,
Dependencies.Libraries.jacksonCbor,
Dependencies.Libraries.kafkaConnectApi,
// Scala
Dependencies.Libraries.catsRetry,
Dependencies.Libraries.circeOptics,
Expand Down
36 changes: 36 additions & 0 deletions config/config.kafka.minimal.hocon
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# The minimum required config options for loading from kafka
{
"input": {
"type": "kafka"
"brokers": "localhost:9092"
"topicName":"enriched-events"
"retries": 10
"groupId": "kafka-consumer"
"buffer": {
"recordLimit": 500
}
}
"output": {
"good": {
"client": {
"endpoint": "localhost"
}
"cluster": {
"index": "good"
}
}
"bad" {
"type": "kafka"
"brokers": "localhost:9092"
"topicName":"bad-events"

"retries": 10
"groupId": "kafka-consumer"
"buffer": {
"byteLimit" : 4500000
"timeLimit" : 500
}
}
}
"purpose": "JSON"
}
129 changes: 129 additions & 0 deletions config/config.kafka.reference.hocon
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
{
"input": {
# Sources currently supported are:
# "kinesis" for reading records from a Kinesis stream
# "stdin" for reading unencoded tab-separated events from stdin
# If set to "stdin", JSON documents will not be sent to Elasticsearch
# but will be written to stdout.
# "nsq" for reading unencoded tab-separated events from NSQ
"type": "kafka"

# Topic name for incoming data
"topicName": "enriched-events"


# Kafka brokers
"brokers": "localhost:9092"

# Events are accumulated in a buffer before being sent to Elasticsearch.
# The buffer is emptied whenever the number of stored records exceeds recordLimit
"buffer": {
"recordLimit": 499 # Default value 500
}
}

"output": {
"good": {
# Good sinks currently supported are:
# "elasticsearch" for writing good records to Elasticsearch
# "stdout" for writing good records to stdout
# Default value "elasticsearch"
"type": "elasticsearch"

# Events are indexed using an Elasticsearch Client
# - endpoint: the cluster endpoint
# - port (optional, default value 9200): the port the cluster can be accessed on
# - for http this is usually 9200
# - for transport this is usually 9300
# - username (optional, remove if not active): http basic auth username
# - password (optional, remove if not active): http basic auth password
# - shardDateFormat (optional, remove if not needed): formatting used for sharding good stream, i.e. _yyyy-MM-dd
# - shardDateField (optional, if not specified derived_tstamp is used): timestamp field for sharding good stream
# - maxTimeout: the maximum attempt time before a client restart
# - maxRetries (optional, default value 6): the maximum number of request attempts before giving up
# - ssl (optional, default value false): if using the http client, whether to use ssl or not
"client": {
"endpoint": "localhost"
"port": 9200
"username": "es-user"
"password": "es-pass"
"shardDateFormat": "_yyyy-MM-dd"
"shardDateField": "derived_tstamp"
"maxTimeout": 9999
"maxRetries": 5
"ssl": true
}

# When using the AWS ES service
# - signing: if using the http client and the AWS ES service you can sign your requests
# http://docs.aws.amazon.com/general/latest/gr/signing_aws_api_requests.html
# - region where the AWS ES service is located
# These values are optional.
"aws": {
"signing": true # Default value false
"region": "eu-central-1" # Default value empty string
}

"cluster": {
# The Elasticsearch index name
# Default value "good"
"index": "good"
# The Elasticsearch index type.
# Index types are deprecated in ES >=7.x
# Therefore, it shouldn't be set with ES >=7.x
"documentType": "good-doc"
}

# Bulk request to Elasticsearch will be splitted to
# chunks according given limits.
# These values are optional.
"chunk": {
"byteLimit": 999999 # Default value is 1000000
"recordLimit": 499 # Default value is 500
}
}
"bad" {
# Bad sinks currently supported are:
# "kinesis" for writing bad records to Kinesis
# "stderr" for writing bad records to stderr
# "nsq" for writing bad records to NSQ
# "kafka" for writing bad records to Kafka
# "none" for ignoring bad records
"type": "kafka"

# Topic name for events which are rejected by Elasticsearch
"topicName": "bad-events"

# brokers name for kafka
"brokers": "localhost:9092"

# Events are accumulated in a buffer before being sent to Kafka.
# The buffer is emptied whenever:
# - the combined size of the stored records reaches byteLimit or
# - the time in milliseconds since it was last emptied exceeds timeLimit when
# a new event enters the buffer
buffer {
byteLimit = 50000
timeLimit = 5000
}
}
}

# "ENRICHED_EVENTS" for a stream of successfully enriched events
# "BAD_ROWS" for a stream of bad events
# "JSON" for writing plain json
"purpose": "ENRICHED_EVENTS"

# Optional section for tracking endpoints
"monitoring": {
"snowplow": {
"collector": "localhost:14322"
"appId": "test-app-id"
}

"metrics": {
# Optional, cloudwatch metrics are enabled by default.
"cloudWatch": false
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,12 @@ package com.snowplowanalytics.stream.loader

import java.nio.file.{Files, Path}
import java.text.SimpleDateFormat

import scala.util.Try

import com.monovore.decline.{Command, Opts}

import cats.syntax.either._
import cats.syntax.validated._

import com.amazonaws.regions.{DefaultAwsRegionProviderChain, Regions}

import com.typesafe.config.ConfigOrigin

import pureconfig._
import pureconfig.generic.{FieldCoproductHint, ProductHint}
import pureconfig.generic.semiauto._
Expand All @@ -53,6 +47,18 @@ object Config {
object Source {
final case object Stdin extends Source

final case class Kafka(
brokers: String,
topicName: String,
groupId: String,
sourceConf: Option[Map[String, String]],
buffer: Kafka.Buffer
) extends Source

object Kafka {
final case class Buffer(recordLimit: Long)
}

final case class Nsq(
streamName: String,
channelName: String,
Expand Down Expand Up @@ -138,6 +144,19 @@ object Config {

final case object Stderr extends BadSink

final case class Kafka(
brokers: String,
retries: Int,
topicName: String,
groupId: String,
producerConf: Option[Map[String, String]],
buffer: Kafka.Buffer
) extends BadSink

object Kafka {
final case class Buffer(byteLimit: Long, timeLimit: Long)
}

final case class Nsq(
streamName: String,
nsqdHost: String,
Expand Down Expand Up @@ -225,6 +244,10 @@ object Config {
deriveReader[Source.Nsq]
implicit val sourceNsqBufferConfigReader: ConfigReader[Source.Nsq.Buffer] =
deriveReader[Source.Nsq.Buffer]
implicit val sourceKafkaConfigReader: ConfigReader[Source.Kafka] =
deriveReader[Source.Kafka]
implicit val sourceKafkaBufferConfigReader: ConfigReader[Source.Kafka.Buffer] =
deriveReader[Source.Kafka.Buffer]
implicit val sourceKinesisConfigReader: ConfigReader[Source.Kinesis] =
deriveReader[Source.Kinesis]
implicit val sourceKinesisConfigBufferReader: ConfigReader[Source.Kinesis.Buffer] =
Expand Down Expand Up @@ -257,6 +280,10 @@ object Config {
deriveReader[Sink.BadSink.Stderr.type]
implicit val sinkBadNsqConfigReader: ConfigReader[Sink.BadSink.Nsq] =
deriveReader[Sink.BadSink.Nsq]
implicit val sinkBadKafkaConfigReader: ConfigReader[Sink.BadSink.Kafka] =
deriveReader[Sink.BadSink.Kafka]
implicit val sinkBadKafkaBufferConfigReader: ConfigReader[Sink.BadSink.Kafka.Buffer] =
deriveReader[Sink.BadSink.Kafka.Buffer]
implicit val monitoringConfigReader: ConfigReader[Monitoring] =
deriveReader[Monitoring]
implicit val snowplowMonitoringConfig: ConfigReader[Monitoring.SnowplowMonitoring] =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/**
* Copyright (c) 2014-2021 Snowplow Analytics Ltd.
* All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache
* License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied.
*
* See the Apache License Version 2.0 for the specific language
* governing permissions and limitations there under.
*/
package com.snowplowanalytics.stream.loader
package executors

// Kafka
import org.apache.kafka.clients.consumer.KafkaConsumer

import java.nio.charset.StandardCharsets.UTF_8

//Java
import java.time.Duration
import java.util
import java.util.Properties

// Scala
import scala.collection.mutable.ListBuffer
import scala.collection.JavaConverters._

// Logging
import org.slf4j.LoggerFactory

// This project
import com.snowplowanalytics.stream.loader.Config._
import com.snowplowanalytics.stream.loader.clients._
import com.snowplowanalytics.stream.loader.sinks._
import com.snowplowanalytics.stream.loader.transformers.{
BadEventTransformer,
EnrichedEventJsonTransformer,
JsonTransformer
}

/**
* NSQSource executor
*
* @param purpose kind of data stored, good, bad or plain-json
* @param kafka Kafka KafkaConfig
* @param goodSink the configured GoodSink
* @param badSink the configured BadSink
*/
class KafkaSourceExecutor(
purpose: Purpose,
kafka: Source.Kafka,
goodSink: Either[ISink, BulkSender[EmitterJsonInput]],
badSink: ISink,
shardDateField: Option[String],
shardDateFormat: Option[String]
) extends Runnable {

private lazy val log = LoggerFactory.getLogger(getClass)

// nsq messages will be buffered in msgBuffer until buffer size become equal to nsqBufferSize
private val msgBuffer = new ListBuffer[EmitterJsonInput]()
// ElasticsearchEmitter instance
private val emitter =
new Emitter(
goodSink,
badSink
)
private val transformer = purpose match {
case Purpose.Enriched => new EnrichedEventJsonTransformer(shardDateField, shardDateFormat)
case Purpose.Json => new JsonTransformer
case Purpose.Bad => new BadEventTransformer
}

/**
* Creates a new Kafka Producer with the given
* configuration options
*
* @return a new Kafka Producer
*/
private def createConsumer: KafkaConsumer[String, Array[Byte]] = {

log.info(s"Connect Kafka Consumer to brokers: ${kafka.brokers}")

val props = new Properties()
props.put("bootstrap.servers", kafka.brokers)
props.put("group.id", kafka.groupId)
props.put("enable.auto.commit", "true")
props.put("auto.commit.interval.ms", "1000")
props.put("auto.offset.reset", "earliest")
props.put("session.timeout.ms", "30000")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer")

kafka.sourceConf.getOrElse(Map()).foreach { case (k, v) => props.setProperty(k, v) }

new KafkaConsumer[String, Array[Byte]](props)
}

/** Never-ending processing loop over source stream. */
override def run(): Unit = {
log.info(s"Running Kafka consumer group: ${kafka.groupId}.")
log.info(s"Processing raw input Kafka topic: ${kafka.topicName}")
val kafkaBufferSize = kafka.buffer.recordLimit
val consumer = createConsumer

consumer.subscribe(util.Collections.singletonList(kafka.topicName))

while (true) {
val recordValues = consumer
.poll(Duration.ofMillis(100)) // Wait 100 ms if data is not available
.asScala
.toList
.map(_.value)
msgBuffer.synchronized {
for (record <- recordValues) {
val msgStr = new String(record, UTF_8)
val emitterInput = transformer.consumeLine(msgStr)
msgBuffer += emitterInput

if (msgBuffer.size == kafkaBufferSize) {
val rejectedRecords = emitter.attemptEmit(msgBuffer.toList)
emitter.fail(rejectedRecords.asJava)
msgBuffer.clear()
}
}
}
}
}
}
Loading