Skip to content

Commit

Permalink
Add Kafka sink (close #51)
Browse files Browse the repository at this point in the history
  • Loading branch information
peel authored and colmsnowplow committed Aug 15, 2023
1 parent b7b925e commit a9c81dd
Show file tree
Hide file tree
Showing 8 changed files with 451 additions and 1 deletion.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ Alternatively you can write events directly to a S3 bucket:
./snowplow-event-generator --output pubsub://projects/my-project/topics/my-topic
```

...or directly to a Kafka topic:

```bash
./snowplow-event-generator --output kafka://my-topic?brokers=my-broker:9092,my-broker-2:9092&my-property=my-value
```


By default, it generates 1000 events with no duplicates. The generated events are _deterministic_, which means if you re-run the app multiple times with the same configuration then you will generate the same events each time.

#### Configuration
Expand Down
4 changes: 3 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ lazy val sinks = project
Dependencies.Libraries.specs2,
Dependencies.Libraries.kcl,
Dependencies.Libraries.fs2Pubsub,
Dependencies.Libraries.awsRegions
Dependencies.Libraries.fs2Kafka,
Dependencies.Libraries.awsRegions,
Dependencies.Libraries.scalaParsec
)
)
.dependsOn(core)
Expand Down
238 changes: 238 additions & 0 deletions flake.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

55 changes: 55 additions & 0 deletions flake.nix
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
{
description = "applications for recovering snowplow bad rows";

inputs = {
nixpkgs.url = "github:nixos/nixpkgs/nixpkgs-unstable";
flake-utils.url = "github:numtide/flake-utils";
flake-utils.inputs.nixpkgs.follows = "nixpkgs";
devenv.url = "github:cachix/devenv";
devenv.inputs.nixpkgs.follows = "nixpkgs";
};

outputs = {
nixpkgs,
flake-utils,
devenv,
...
} @ inputs:
flake-utils.lib.eachDefaultSystem (
system: let
pkgs = import nixpkgs {
inherit system;
config.allowUnfree = true;
config.allowUnsupportedSystem = true;
};
jre = pkgs.openjdk11;
sbt = pkgs.sbt.override {inherit jre;};
coursier = pkgs.coursier.override {inherit jre;};
metals = pkgs.metals.override {inherit coursier jre;};
in {
devShell = devenv.lib.mkShell {
inherit inputs pkgs;
modules = [
{
packages = [
jre
metals
sbt
pkgs.nodePackages.snyk
];
languages.nix.enable = true;
pre-commit.hooks = {
alejandra.enable = true;
deadnix.enable = true;
gitleaks = {
enable = true;
name = "gitleaks";
entry = "${pkgs.gitleaks}/bin/gitleaks detect --source . -v";
};
};
}
];
};
}
);
}
4 changes: 4 additions & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ object Dependencies {
val circeConfig = "0.8.0"
val circe = "0.14.1"
val fs2Pubsub = "0.22.0"
val fs2Kafka = "3.0.1"
val awsRegions = "2.20.69"
val scalaParsec = "2.3.0"
// Scala (test only)
val specs2 = "4.12.3"
val scalaCheck = "1.14.0"
Expand All @@ -51,6 +53,7 @@ object Dependencies {
val fs2 = "co.fs2" %% "fs2-core" % V.fs2
val fs2file = "co.fs2" %% "fs2-io" % V.fs2
val fs2Pubsub = "com.permutive" %% "fs2-google-pubsub-grpc" % V.fs2Pubsub
val fs2Kafka = "com.github.fd4s" %% "fs2-kafka" % V.fs2Kafka
val decline = "com.monovore" %% "decline" % V.decline
val blobstore = "com.github.fs2-blobstore" %% "s3" % V.blobstore
val circeCore = "io.circe" %% "circe-core" % V.circe
Expand All @@ -64,6 +67,7 @@ object Dependencies {
val slf4j = "org.slf4j" % "slf4j-simple" % V.slf4j
val kcl = "software.amazon.kinesis" % "amazon-kinesis-client" % V.kcl
val awsRegions = "software.amazon.awssdk" % "regions" % V.awsRegions
val scalaParsec = "org.scala-lang.modules" %% "scala-parser-combinators" % V.scalaParsec

// Scala (test only)
val specs2 = "org.specs2" %% "specs2-core" % V.specs2 % Test
Expand Down
Loading

0 comments on commit a9c81dd

Please sign in to comment.