diff --git a/build.sbt b/build.sbt index a30f8f0..9230b3a 100644 --- a/build.sbt +++ b/build.sbt @@ -22,6 +22,7 @@ lazy val commonDependencies = Seq( Dependencies.Libraries.nsqClient, Dependencies.Libraries.netty, Dependencies.Libraries.jacksonCbor, + Dependencies.Libraries.kafkaConnectApi, // Scala Dependencies.Libraries.catsRetry, Dependencies.Libraries.circeOptics, diff --git a/config/config.kafka.minimal.hocon b/config/config.kafka.minimal.hocon new file mode 100644 index 0000000..6eb40bd --- /dev/null +++ b/config/config.kafka.minimal.hocon @@ -0,0 +1,36 @@ +# The minimum required config options for loading from kafka +{ + "input": { + "type": "kafka" + "brokers": "localhost:9092" + "topicName":"enriched-events" + "retries": 10 + "groupId": "kafka-consumer" + "buffer": { + "recordLimit": 500 + } + } + "output": { + "good": { + "client": { + "endpoint": "localhost" + } + "cluster": { + "index": "good" + } + } + "bad" { + "type": "kafka" + "brokers": "localhost:9092" + "topicName":"bad-events" + + "retries": 10 + "groupId": "kafka-consumer" + "buffer": { + "byteLimit" : 4500000 + "timeLimit" : 500 + } + } + } + "purpose": "JSON" +} diff --git a/config/config.kafka.reference.hocon b/config/config.kafka.reference.hocon new file mode 100644 index 0000000..f961306 --- /dev/null +++ b/config/config.kafka.reference.hocon @@ -0,0 +1,129 @@ +{ + "input": { + # Sources currently supported are: + # "kinesis" for reading records from a Kinesis stream + # "stdin" for reading unencoded tab-separated events from stdin + # If set to "stdin", JSON documents will not be sent to Elasticsearch + # but will be written to stdout. + # "nsq" for reading unencoded tab-separated events from NSQ + "type": "kafka" + + # Topic name for incoming data + "topicName": "enriched-events" + + + # Kafka brokers + "brokers": "localhost:9092" + + # Events are accumulated in a buffer before being sent to Elasticsearch. + # The buffer is emptied whenever the number of stored records exceeds recordLimit + "buffer": { + "recordLimit": 499 # Default value 500 + } + } + + "output": { + "good": { + # Good sinks currently supported are: + # "elasticsearch" for writing good records to Elasticsearch + # "stdout" for writing good records to stdout + # Default value "elasticsearch" + "type": "elasticsearch" + + # Events are indexed using an Elasticsearch Client + # - endpoint: the cluster endpoint + # - port (optional, default value 9200): the port the cluster can be accessed on + # - for http this is usually 9200 + # - for transport this is usually 9300 + # - username (optional, remove if not active): http basic auth username + # - password (optional, remove if not active): http basic auth password + # - shardDateFormat (optional, remove if not needed): formatting used for sharding good stream, i.e. _yyyy-MM-dd + # - shardDateField (optional, if not specified derived_tstamp is used): timestamp field for sharding good stream + # - maxTimeout: the maximum attempt time before a client restart + # - maxRetries (optional, default value 6): the maximum number of request attempts before giving up + # - ssl (optional, default value false): if using the http client, whether to use ssl or not + "client": { + "endpoint": "localhost" + "port": 9200 + "username": "es-user" + "password": "es-pass" + "shardDateFormat": "_yyyy-MM-dd" + "shardDateField": "derived_tstamp" + "maxTimeout": 9999 + "maxRetries": 5 + "ssl": true + } + + # When using the AWS ES service + # - signing: if using the http client and the AWS ES service you can sign your requests + # http://docs.aws.amazon.com/general/latest/gr/signing_aws_api_requests.html + # - region where the AWS ES service is located + # These values are optional. + "aws": { + "signing": true # Default value false + "region": "eu-central-1" # Default value empty string + } + + "cluster": { + # The Elasticsearch index name + # Default value "good" + "index": "good" + # The Elasticsearch index type. + # Index types are deprecated in ES >=7.x + # Therefore, it shouldn't be set with ES >=7.x + "documentType": "good-doc" + } + + # Bulk request to Elasticsearch will be splitted to + # chunks according given limits. + # These values are optional. + "chunk": { + "byteLimit": 999999 # Default value is 1000000 + "recordLimit": 499 # Default value is 500 + } + } + "bad" { + # Bad sinks currently supported are: + # "kinesis" for writing bad records to Kinesis + # "stderr" for writing bad records to stderr + # "nsq" for writing bad records to NSQ + # "kafka" for writing bad records to Kafka + # "none" for ignoring bad records + "type": "kafka" + + # Topic name for events which are rejected by Elasticsearch + "topicName": "bad-events" + + # brokers name for kafka + "brokers": "localhost:9092" + + # Events are accumulated in a buffer before being sent to Kafka. + # The buffer is emptied whenever: + # - the combined size of the stored records reaches byteLimit or + # - the time in milliseconds since it was last emptied exceeds timeLimit when + # a new event enters the buffer + buffer { + byteLimit = 50000 + timeLimit = 5000 + } + } + } + + # "ENRICHED_EVENTS" for a stream of successfully enriched events + # "BAD_ROWS" for a stream of bad events + # "JSON" for writing plain json + "purpose": "ENRICHED_EVENTS" + + # Optional section for tracking endpoints + "monitoring": { + "snowplow": { + "collector": "localhost:14322" + "appId": "test-app-id" + } + + "metrics": { + # Optional, cloudwatch metrics are enabled by default. + "cloudWatch": false + } + } +} diff --git a/core/src/main/scala/com.snowplowanalytics.stream/loader/Config.scala b/core/src/main/scala/com.snowplowanalytics.stream/loader/Config.scala index 0913575..b407be8 100644 --- a/core/src/main/scala/com.snowplowanalytics.stream/loader/Config.scala +++ b/core/src/main/scala/com.snowplowanalytics.stream/loader/Config.scala @@ -20,18 +20,12 @@ package com.snowplowanalytics.stream.loader import java.nio.file.{Files, Path} import java.text.SimpleDateFormat - import scala.util.Try - import com.monovore.decline.{Command, Opts} - import cats.syntax.either._ import cats.syntax.validated._ - import com.amazonaws.regions.{DefaultAwsRegionProviderChain, Regions} - import com.typesafe.config.ConfigOrigin - import pureconfig._ import pureconfig.generic.{FieldCoproductHint, ProductHint} import pureconfig.generic.semiauto._ @@ -53,6 +47,18 @@ object Config { object Source { final case object Stdin extends Source + final case class Kafka( + brokers: String, + topicName: String, + groupId: String, + sourceConf: Option[Map[String, String]], + buffer: Kafka.Buffer + ) extends Source + + object Kafka { + final case class Buffer(recordLimit: Long) + } + final case class Nsq( streamName: String, channelName: String, @@ -138,6 +144,19 @@ object Config { final case object Stderr extends BadSink + final case class Kafka( + brokers: String, + retries: Int, + topicName: String, + groupId: String, + producerConf: Option[Map[String, String]], + buffer: Kafka.Buffer + ) extends BadSink + + object Kafka { + final case class Buffer(byteLimit: Long, timeLimit: Long) + } + final case class Nsq( streamName: String, nsqdHost: String, @@ -225,6 +244,10 @@ object Config { deriveReader[Source.Nsq] implicit val sourceNsqBufferConfigReader: ConfigReader[Source.Nsq.Buffer] = deriveReader[Source.Nsq.Buffer] + implicit val sourceKafkaConfigReader: ConfigReader[Source.Kafka] = + deriveReader[Source.Kafka] + implicit val sourceKafkaBufferConfigReader: ConfigReader[Source.Kafka.Buffer] = + deriveReader[Source.Kafka.Buffer] implicit val sourceKinesisConfigReader: ConfigReader[Source.Kinesis] = deriveReader[Source.Kinesis] implicit val sourceKinesisConfigBufferReader: ConfigReader[Source.Kinesis.Buffer] = @@ -257,6 +280,10 @@ object Config { deriveReader[Sink.BadSink.Stderr.type] implicit val sinkBadNsqConfigReader: ConfigReader[Sink.BadSink.Nsq] = deriveReader[Sink.BadSink.Nsq] + implicit val sinkBadKafkaConfigReader: ConfigReader[Sink.BadSink.Kafka] = + deriveReader[Sink.BadSink.Kafka] + implicit val sinkBadKafkaBufferConfigReader: ConfigReader[Sink.BadSink.Kafka.Buffer] = + deriveReader[Sink.BadSink.Kafka.Buffer] implicit val monitoringConfigReader: ConfigReader[Monitoring] = deriveReader[Monitoring] implicit val snowplowMonitoringConfig: ConfigReader[Monitoring.SnowplowMonitoring] = diff --git a/core/src/main/scala/com.snowplowanalytics.stream/loader/executors/KafkaSourceExecutor.scala b/core/src/main/scala/com.snowplowanalytics.stream/loader/executors/KafkaSourceExecutor.scala new file mode 100644 index 0000000..144983a --- /dev/null +++ b/core/src/main/scala/com.snowplowanalytics.stream/loader/executors/KafkaSourceExecutor.scala @@ -0,0 +1,137 @@ +/** + * Copyright (c) 2014-2021 Snowplow Analytics Ltd. + * All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache + * License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. + * + * See the Apache License Version 2.0 for the specific language + * governing permissions and limitations there under. + */ +package com.snowplowanalytics.stream.loader +package executors + +// Kafka +import org.apache.kafka.clients.consumer.KafkaConsumer + +import java.nio.charset.StandardCharsets.UTF_8 + +//Java +import java.time.Duration +import java.util +import java.util.Properties + +// Scala +import scala.collection.mutable.ListBuffer +import scala.collection.JavaConverters._ + +// Logging +import org.slf4j.LoggerFactory + +// This project +import com.snowplowanalytics.stream.loader.Config._ +import com.snowplowanalytics.stream.loader.clients._ +import com.snowplowanalytics.stream.loader.sinks._ +import com.snowplowanalytics.stream.loader.transformers.{ + BadEventTransformer, + EnrichedEventJsonTransformer, + JsonTransformer +} + +/** + * NSQSource executor + * + * @param purpose kind of data stored, good, bad or plain-json + * @param kafka Kafka KafkaConfig + * @param goodSink the configured GoodSink + * @param badSink the configured BadSink + */ +class KafkaSourceExecutor( + purpose: Purpose, + kafka: Source.Kafka, + goodSink: Either[ISink, BulkSender[EmitterJsonInput]], + badSink: ISink, + shardDateField: Option[String], + shardDateFormat: Option[String] +) extends Runnable { + + private lazy val log = LoggerFactory.getLogger(getClass) + + // nsq messages will be buffered in msgBuffer until buffer size become equal to nsqBufferSize + private val msgBuffer = new ListBuffer[EmitterJsonInput]() + // ElasticsearchEmitter instance + private val emitter = + new Emitter( + goodSink, + badSink + ) + private val transformer = purpose match { + case Purpose.Enriched => new EnrichedEventJsonTransformer(shardDateField, shardDateFormat) + case Purpose.Json => new JsonTransformer + case Purpose.Bad => new BadEventTransformer + } + + /** + * Creates a new Kafka Producer with the given + * configuration options + * + * @return a new Kafka Producer + */ + private def createConsumer: KafkaConsumer[String, Array[Byte]] = { + + log.info(s"Connect Kafka Consumer to brokers: ${kafka.brokers}") + + val props = new Properties() + props.put("bootstrap.servers", kafka.brokers) + props.put("group.id", kafka.groupId) + props.put("enable.auto.commit", "true") + props.put("auto.commit.interval.ms", "1000") + props.put("auto.offset.reset", "earliest") + props.put("session.timeout.ms", "30000") + props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") + props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer") + + kafka.sourceConf.getOrElse(Map()).foreach { case (k, v) => props.setProperty(k, v) } + + new KafkaConsumer[String, Array[Byte]](props) + } + + /** Never-ending processing loop over source stream. */ + override def run(): Unit = { + log.info(s"Running Kafka consumer group: ${kafka.groupId}.") + log.info(s"Processing raw input Kafka topic: ${kafka.topicName}") + val kafkaBufferSize = kafka.buffer.recordLimit + val consumer = createConsumer + + consumer.subscribe(util.Collections.singletonList(kafka.topicName)) + + while (true) { + val recordValues = consumer + .poll(Duration.ofMillis(100)) // Wait 100 ms if data is not available + .asScala + .toList + .map(_.value) + msgBuffer.synchronized { + for (record <- recordValues) { + val msgStr = new String(record, UTF_8) + val emitterInput = transformer.consumeLine(msgStr) + msgBuffer += emitterInput + + if (msgBuffer.size == kafkaBufferSize) { + val rejectedRecords = emitter.attemptEmit(msgBuffer.toList) + emitter.fail(rejectedRecords.asJava) + msgBuffer.clear() + } + } + } + } + } +} diff --git a/core/src/main/scala/com.snowplowanalytics.stream/loader/sinks/KafkaSink.scala b/core/src/main/scala/com.snowplowanalytics.stream/loader/sinks/KafkaSink.scala new file mode 100644 index 0000000..c1c9208 --- /dev/null +++ b/core/src/main/scala/com.snowplowanalytics.stream/loader/sinks/KafkaSink.scala @@ -0,0 +1,91 @@ +/** + * Copyright (c) 2014-2021 Snowplow Analytics Ltd. + * All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache + * License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. + * + * See the Apache License Version 2.0 for the specific language + * governing permissions and limitations there under. + */ +package com.snowplowanalytics.stream.loader +package sinks + +// Kafka +import org.apache.kafka.clients.producer._ + +//Java +import java.util.Properties +import org.slf4j.LoggerFactory + +//Scala +import scala.util.Random +import java.nio.charset.StandardCharsets.UTF_8 + +//This project +import com.snowplowanalytics.stream.loader.Config.Sink.BadSink.{Kafka => KafkaSinkConfig} + +/** + * Kafka Sink + */ +class KafkaSink(conf: KafkaSinkConfig) extends ISink { + private lazy val log = LoggerFactory.getLogger(getClass) + + private val kafkaProducer = createProducer + + /** + * Creates a new Kafka Producer with the given + * configuration options + * + * @return a new Kafka Producer + */ + private def createProducer: KafkaProducer[String, Array[Byte]] = { + + log.info(s"Create Kafka Producer to brokers: ${conf.brokers}") + + val props = new Properties() + props.setProperty("bootstrap.servers", conf.brokers) + props.setProperty("acks", "all") + props.setProperty("retries", conf.retries.toString) + props.setProperty("buffer.memory", conf.buffer.byteLimit.toString) + props.setProperty("linger.ms", conf.buffer.timeLimit.toString) + props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") + props.setProperty( + "value.serializer", + "org.apache.kafka.common.serialization.ByteArraySerializer") + + conf.producerConf.getOrElse(Map()).foreach { case (k, v) => props.setProperty(k, v) } + + new KafkaProducer[String, Array[Byte]](props) + } + + /** + * Writes a string to Kafka + * + * @param output The string to write + * @param key A hash of the key determines to which shard the + * record is assigned. Defaults to a random string. + * @param good Unused parameter which exists to extend ISink + */ + override def store(output: String, key: Option[String], good: Boolean): Unit = { + kafkaProducer.send( + new ProducerRecord( + conf.topicName, + key.getOrElse(Random.nextInt.toString), + output.getBytes(UTF_8)), + new Callback { + override def onCompletion(metadata: RecordMetadata, e: Exception): Unit = + if (e != null) log.error(s"Sending event failed: ${e.getMessage}") + } + ) + Nil + } +} diff --git a/elasticsearch/src/main/scala/com/snowplowanalytics/stream/loader/ElasticsearchLoader.scala b/elasticsearch/src/main/scala/com/snowplowanalytics/stream/loader/ElasticsearchLoader.scala index 010066f..a168f13 100644 --- a/elasticsearch/src/main/scala/com/snowplowanalytics/stream/loader/ElasticsearchLoader.scala +++ b/elasticsearch/src/main/scala/com/snowplowanalytics/stream/loader/ElasticsearchLoader.scala @@ -20,13 +20,9 @@ package com.snowplowanalytics.stream.loader import cats.Id import com.snowplowanalytics.snowplow.scalatracker.Tracker -import com.snowplowanalytics.stream.loader.sinks.ISink +import com.snowplowanalytics.stream.loader.sinks.{ISink, KafkaSink} import com.snowplowanalytics.stream.loader.clients.{BulkSender, ElasticsearchBulkSender} -import com.snowplowanalytics.stream.loader.executors.{ - KinesisSourceExecutor, - NsqSourceExecutor, - StdinExecutor -} +import com.snowplowanalytics.stream.loader.executors.{KafkaSourceExecutor, KinesisSourceExecutor, NsqSourceExecutor, StdinExecutor} import com.snowplowanalytics.stream.loader.Config._ import com.snowplowanalytics.stream.loader.Config.Sink.{BadSink, GoodSink} @@ -62,6 +58,7 @@ object ElasticsearchLoader { case Source.Stdin => System.exit(1) case _: Source.Kinesis => System.exit(1) case _: Source.Nsq => () + case _: Source.Kafka => () } } @@ -102,9 +99,20 @@ object ElasticsearchLoader { shardDateFormat ) + // Read records from Kafka + case c: Source.Kafka => + new KafkaSourceExecutor( + config.purpose, + c, + goodSink, + badSink, + shardDateField, + shardDateFormat + ) + // Run locally, reading from stdin and sending events to stdout / stderr rather than Elasticsearch / Kinesis case Source.Stdin => new StdinExecutor(config, goodSink, badSink) - case _ => throw new RuntimeException("Source must be set to 'stdin', 'kinesis' or 'nsq'") + case _ => throw new RuntimeException("Source must be set to 'stdin', 'kinesis' or 'nsq' or 'kafka'") } } @@ -113,6 +121,7 @@ object ElasticsearchLoader { case BadSink.None => new sinks.NullSink case BadSink.Stderr => new sinks.StdouterrSink case c: BadSink.Nsq => new sinks.NsqSink(c) + case c: BadSink.Kafka => new KafkaSink(c) case c: BadSink.Kinesis => new sinks.KinesisSink(c) } } diff --git a/project/Dependencies.scala b/project/Dependencies.scala index e5b159d..4b2614c 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -24,6 +24,7 @@ object Dependencies { val nsqClient = "1.1.0-rc1" val netty = "4.1.67.Final" // Override provided version to fix security vulnerability val jackson = "2.12.5" + val kafkaConnectApi = "3.0.0" // Scala val catsRetry = "0.3.2" val circe = "0.14.1" @@ -50,6 +51,7 @@ object Dependencies { val nsqClient = "com.snowplowanalytics" % "nsq-java-client_2.10" % V.nsqClient val netty = "io.netty" % "netty-all" % V.netty val jacksonCbor = "com.fasterxml.jackson.dataformat" % "jackson-dataformat-cbor" % V.jackson // Override provided version to fix security vulnerability + val kafkaConnectApi = "org.apache.kafka" % "connect-api" % V.kafkaConnectApi // Scala val catsRetry = "com.github.cb372" %% "cats-retry-cats-effect" % V.catsRetry val circeOptics = "io.circe" %% "circe-optics" % V.circe