Skip to content

Commit

Permalink
Add http sink (close #67)
Browse files Browse the repository at this point in the history
  • Loading branch information
colmsnowplow committed Sep 12, 2023
1 parent b3e2423 commit a0c08ff
Show file tree
Hide file tree
Showing 11 changed files with 287 additions and 33 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/snyk.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ jobs:
uses: snyk/actions/scala@master
with:
command: monitor
args: --project-name=snowplow-event-generator
args: --project-name=snowplow-event-generator --prune-repeated-subdependencies
env:
SNYK_TOKEN: ${{ secrets.SNYK_TOKEN }}
SNYK_TOKEN: ${{ secrets.SNYK_TOKEN }}
39 changes: 38 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,15 @@ Alternatively you can write events directly to a S3 bucket:
}
```

...or a Snowplow collector:

```
"output": {
"type": "Http"
"endpoint": "https://my.collector.endpoint.com"
}
```

Then run:
```bash
./snowplow-event-generator --config my-config.hocon
Expand Down Expand Up @@ -186,7 +195,31 @@ Aside from "output" configuration, all fields in the configuration file are opti
// Only used for "Fixed" timestamps. Change this to generate more recent or more historic events.
"at": "2022-02-01T01:01:01z"
},
}
// Set weights for the distributions of event types.
// Setting a fequency to 0 results in that event type not being produced at all
// Setting equal values results in an even distribution of event types.
"eventFrequencies": {
"struct": 1
"unstruct": 1
"pageView": 1
"pagePing": 1
"unstructEventFrequencies": {
"changeForm": 1
"funnelInteraction": 1
"linkClick": 1
}
}
// HTTP output only - set weights for the distributions of request methods.
// Setting a fequency to 0 results in that method not being produced at all
// Note that head requests are currently not implemented and will result in an exception.
"methodFrequencies": {
"post": 1
"get": 1
"head": 0
}
// Required: Storage to sink generated events into
// Currently only a single output at a time is supported
Expand Down Expand Up @@ -216,6 +249,10 @@ Aside from "output" configuration, all fields in the configuration file are opti
// "type": "PubSub"
// Required: PubSub stream URI
// "uri": "pubsub://projects/my-project/topics/my-topic"
// "type": "Http"
// Required: Snowplow collector endpoint, including protocol
// "endpoint": "https://my.collector.endpoint.com"
}
}
```
Expand Down
10 changes: 8 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,14 @@ lazy val sinks = project
Dependencies.Libraries.kcl,
Dependencies.Libraries.fs2Pubsub,
Dependencies.Libraries.fs2Kafka,
Dependencies.Libraries.awsRegions
)
Dependencies.Libraries.awsRegions,
"org.http4s" %% "http4s-ember-client" % "0.23.15",
"org.http4s" %% "http4s-circe" % "0.23.15"
// TODO move this
),
//libraryDependencies += "org.typelevel" %% "cats-effect" % "3.4.6",
// libraryDependencies += "org.http4s" %% "http4s-ember-client" % "0.23.15",
// libraryDependencies += "org.http4s" %% "http4s-circe" % "0.23.15"
)
.dependsOn(core)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,24 @@ object HttpRequest {
val path: Api
}

case class MethodFrequencies(
get: Int,
post: Int,
head: Int
)

object Method {
final case class Post(path: Api) extends Method
final case class Get(path: Api) extends Method
final case class Head(path: Api) extends Method

def gen: Gen[Method] = Gen.oneOf(genPost, genGet, genHead)
def gen(freq: MethodFrequencies): Gen[Method] = {
Gen.frequency(
(freq.post, genPost),
(freq.get, genGet),
(freq.head, genHead)
)
}

private def genPost: Gen[Method.Post] = Gen.oneOf(genApi(0), genApi(200)).map(Method.Post)
private def genGet: Gen[Method.Get] = Gen.oneOf(fixedApis, genApi(0), genApi(1)).map(Method.Get)
Expand All @@ -48,16 +60,48 @@ object HttpRequest {
eventPerPayloadMin: Int,
eventPerPayloadMax: Int,
now: Instant,
frequencies: EventFrequencies
): Gen[HttpRequest] =
frequencies: EventFrequencies,
methodFrequencies: Option[MethodFrequencies]
): Gen[HttpRequest] = {
// MethodFrequencies is an option here, at the entrypoint in order not to force a breaking change where this is a lib.
// If it's not provided, we give equal distribution to each to achieve behaviour parity
// From here in it's not an option, just to make the code a bit cleaner
val methodFreq = methodFrequencies.getOrElse(new MethodFrequencies(1, 1, 1))
genWithParts(
HttpRequestQuerystring.gen(now, frequencies),
HttpRequestBody.gen(eventPerPayloadMin, eventPerPayloadMax, now, frequencies)
HttpRequestBody.gen(eventPerPayloadMin, eventPerPayloadMax, now, frequencies),
methodFreq
)
}

def genDup(
natProb: Float,
synProb: Float,
natTotal: Int,
synTotal: Int,
eventPerPayloadMin: Int,
eventPerPayloadMax: Int,
now: Instant,
frequencies: EventFrequencies,
methodFrequencies: Option[MethodFrequencies]
): Gen[HttpRequest] = {
// MethodFrequencies is an option here, at the entrypoint in order not to force a breaking change where this is a lib.
// If it's not provided, we give equal distribution to each to achieve behaviour parity
// From here in it's not an option, just to make the code a bit cleaner
val methodFreq = methodFrequencies.getOrElse(new MethodFrequencies(1, 1, 1))
genWithParts(
// qs doesn't do duplicates?
HttpRequestQuerystring.gen(now, frequencies),
HttpRequestBody
.genDup(natProb, synProb, natTotal, synTotal, eventPerPayloadMin, eventPerPayloadMax, now, frequencies),
methodFreq
)
}


private def genWithParts(qsGen: Gen[HttpRequestQuerystring], bodyGen: Gen[HttpRequestBody]) =
private def genWithParts(qsGen: Gen[HttpRequestQuerystring], bodyGen: Gen[HttpRequestBody], methodFreq: MethodFrequencies) =
for {
method <- Method.gen
method <- Method.gen(methodFreq)
qs <- Gen.option(qsGen)
body <- method match {
case Method.Head(_) => Gen.const(None) // HEAD requests can't have a message body
Expand Down
2 changes: 2 additions & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ object Dependencies {
val badRows = "2.1.1"
val httpClient = "4.5.13"
val thrift = "0.15.0" // override transitive dependency to mitigate security vulnerabilities
val http4s = "0.23.15"
}

object Libraries {
Expand All @@ -66,6 +67,7 @@ object Dependencies {
val slf4j = "org.slf4j" % "slf4j-simple" % V.slf4j
val kcl = "software.amazon.kinesis" % "amazon-kinesis-client" % V.kcl
val awsRegions = "software.amazon.awssdk" % "regions" % V.awsRegions
val http4sClient = "org.http4s" %% "http4s-blaze-client" % V.http4s

// Scala (test only)
val specs2 = "org.specs2" %% "specs2-core" % V.specs2 % Test
Expand Down
8 changes: 7 additions & 1 deletion sinks/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,11 @@
"linkClick": 1
}
}

"methodFrequencies": {
# Setting these defaults becuase HEAD sink is not implemented yet,
# but it is still possibly in use as a library in our module tests
"post": 1
"get": 1
"head": 0
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import io.circe.generic.extras.semiauto._

import com.snowplowanalytics.snowplow.eventgen.protocol.event.EventFrequencies
import com.snowplowanalytics.snowplow.eventgen.protocol.event.UnstructEventFrequencies
import com.snowplowanalytics.snowplow.eventgen.tracker.HttpRequest.MethodFrequencies

final case class Config(
payloadsTotal: Int,
Expand All @@ -41,6 +42,7 @@ final case class Config(
duplicates: Option[Config.Duplicates],
timestamps: Config.Timestamps,
eventFrequencies: EventFrequencies,
methodFrequencies: Option[MethodFrequencies],
output: Config.Output
)

Expand Down Expand Up @@ -69,6 +71,7 @@ object Config {
case class File(path: URI) extends Output
case class PubSub(subscription: String) extends Output
case class Kafka(brokers: String, topic: String, producerConf: Map[String, String] = Map.empty) extends Output
case class Http(endpoint: org.http4s.Uri) extends Output
}

val configOpt = Opts.option[Path]("config", "Path to the configuration HOCON").orNone
Expand All @@ -85,9 +88,12 @@ object Config {
implicit val duplicatesDecoder: Decoder[Duplicates] =
deriveConfiguredDecoder[Duplicates]

implicit val frequenciesDecoder: Decoder[EventFrequencies] =
implicit val eventFrequenciesDecoder: Decoder[EventFrequencies] =
deriveConfiguredDecoder[EventFrequencies]

implicit val methodFrequenciesDecoder: Decoder[MethodFrequencies] =
deriveConfiguredDecoder [MethodFrequencies]

implicit val unstructEventFrequenciesDecoder: Decoder[UnstructEventFrequencies] =
deriveConfiguredDecoder[UnstructEventFrequencies]

Expand All @@ -97,6 +103,10 @@ object Config {
Either.catchOnly[IllegalArgumentException](URI.create(str)).leftMap(_.getMessage)
}

implicit val httpUriDecoder: Decoder[org.http4s.Uri] = Decoder[String].emap { str =>
org.http4s.Uri.fromString(str).leftMap(_.getMessage)
}

implicit val kafkaDecoder: Decoder[Output.Kafka] =
deriveConfiguredDecoder[Output.Kafka]

Expand All @@ -109,6 +119,9 @@ object Config {
implicit val pubSubDecoder: Decoder[Output.PubSub] =
deriveConfiguredDecoder[Output.PubSub]

implicit val httpDecoder: Decoder[Output.Http] =
deriveConfiguredDecoder[Output.Http]

implicit val outputDecoder: Decoder[Output] =
deriveConfiguredDecoder[Output]

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright (c) 2021-2023 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/

package com.snowplowanalytics.snowplow.eventgen

import com.snowplowanalytics.snowplow.eventgen.tracker.HttpRequest
import com.snowplowanalytics.snowplow.eventgen.tracker.HttpRequest.{Method => TrackerMethod}

import fs2.{Pipe, Stream}

import cats.syntax.all._

import cats.effect.Async

import org.http4s.ember.client.EmberClientBuilder
import org.http4s.Request
import org.http4s.Header.Raw
import org.http4s.Method

import org.typelevel.ci._
import com.snowplowanalytics.snowplow.eventgen.tracker.HttpRequestQuerystring


object Http {

def sink[F[_]: Async](properties: Config.Output.Http): Pipe[F, Main.GenOutput, Unit] = {

def buildRequesst(
generatedRequest: HttpRequest
): Request[F] = {

// Origin headers are given to us a key and a comma separated string of values,
// but we wish to attach each value as a header with the provided key ("Origin")
// This function provides a Seq of Raw headers (the type that putHeaders expects)
// containing an item for each origin header, and an item each for the other headers.
def parseHeaders(headers: Map[String, String]): Seq[Raw] = {
headers
.map(kv => (kv._1, kv._2) match {
case (k, v) if k == "Origin" =>
v.split(",")
.map(v => Raw(CIString(k), v))
.toSeq

// Raw(CIString(k), v)
case (_, _) => Seq(Raw(CIString(kv._1), kv._2))
})
.toSeq
.flatten
}

// since /i requests have empty version, we pattern match to add the path.
// address is a var since we modify it when adding querystring
var address = generatedRequest.method.path.version match {
case "" => properties.endpoint / generatedRequest.method.path.vendor
case version: String => properties.endpoint / generatedRequest.method.path.vendor / version
}

val body = generatedRequest.body match {
case Some(b) => b.toString()
case _ => ""
}

val req = generatedRequest.method match {

// POST requests: attach body, use the uri we already have, without adding querysring
case TrackerMethod.Post(_) =>
Request[F](method = Method.POST, uri = address).withEntity(body).putHeaders(parseHeaders(generatedRequest.headers))

// GET requests: add querystring, ignore body field
case TrackerMethod.Get(_) =>

// iterate querystring and add as kv pairs to the address
generatedRequest.qs match {
case None => address
case Some(querystring: HttpRequestQuerystring) =>
// val newUri = address
querystring.toProto
querystring.toProto.foreach(kv =>
address = address.withQueryParam(kv.getName(), kv.getValue())
)
}

Request[F](method = Method.GET, uri = address).putHeaders(parseHeaders(generatedRequest.headers))

// HEAD requests induce server errors that will take some digging to figure out.
// In the interest of getting to a usable tool quickly, for now we just throw an error for these.
case TrackerMethod.Head(_) => throw new NotImplementedError ( "HEAD requests not implemented" )
}

return req
}

val httpClient = EmberClientBuilder.default[F].build

st: Stream[F, Main.GenOutput] => {
Stream
.resource(httpClient)
.flatMap(client =>
st.map(_._3).map(buildRequesst).evalMap(req => client.status(req)).void)
}
}
}
Loading

0 comments on commit a0c08ff

Please sign in to comment.