diff --git a/pom.xml b/pom.xml
index 7cfbd37..97d7975 100644
--- a/pom.xml
+++ b/pom.xml
@@ -18,6 +18,11 @@
4.13.1
test
+
+ com.google.code.gson
+ gson
+ 2.8.9
+
com.amazonaws
amazon-kinesis-producer
diff --git a/src/main/java/com/warnermedia/kplserver/KinesisEventPublisher.java b/src/main/java/com/warnermedia/kplserver/KinesisEventPublisher.java
index 5131bd2..bbb5a37 100644
--- a/src/main/java/com/warnermedia/kplserver/KinesisEventPublisher.java
+++ b/src/main/java/com/warnermedia/kplserver/KinesisEventPublisher.java
@@ -8,6 +8,8 @@
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.gson.Gson;
+import com.google.gson.JsonSyntaxException;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -46,6 +48,25 @@ public void runOnce(String line) throws Exception {
ByteBuffer data = ByteBuffer.wrap(finalLine.getBytes(java.nio.charset.StandardCharsets.UTF_8));
+ // Need to serialize this to an object to get the key.
+ String hashKey;
+
+ Gson gson = new Gson();
+ try {
+ MinimalKey minimal = gson.fromJson(line, MinimalKey.class);
+ if (minimal.kdsHashKey != null) {
+ hashKey = minimal.kdsHashKey;
+ log.debug("Using passed in hash key");
+ } else {
+ hashKey = randomExplicitHashKey();
+ log.debug("Using random hash key");
+ }
+ }
+ catch (JsonSyntaxException e) {
+ hashKey = randomExplicitHashKey();
+ log.debug("Using random hash key");
+ }
+
//This is a measure of the backpressure in the system, which should be checked before putting more records,
//to avoid exhausting system resources.
while (kinesis.getOutstandingRecordsCount() > 1e4) {
@@ -53,7 +74,7 @@ public void runOnce(String line) throws Exception {
Thread.sleep(500);
}
- UserRecord userRecord = new UserRecord(stream, " ", randomExplicitHashKey(), data);
+ UserRecord userRecord = new UserRecord(stream, " ", hashKey, data);
ListenableFuture f = kinesis.addUserRecord(userRecord);
Futures.addCallback(f, new FutureCallback() {
diff --git a/src/main/java/com/warnermedia/kplserver/MinimalKey.java b/src/main/java/com/warnermedia/kplserver/MinimalKey.java
new file mode 100644
index 0000000..29ee105
--- /dev/null
+++ b/src/main/java/com/warnermedia/kplserver/MinimalKey.java
@@ -0,0 +1,5 @@
+package com.warnermedia.kplserver;
+
+public class MinimalKey {
+ String kdsHashKey = null;
+}
diff --git a/src/main/java/com/warnermedia/kplserver/TestClientJSONKey.java b/src/main/java/com/warnermedia/kplserver/TestClientJSONKey.java
new file mode 100644
index 0000000..03ec08c
--- /dev/null
+++ b/src/main/java/com/warnermedia/kplserver/TestClientJSONKey.java
@@ -0,0 +1,39 @@
+package com.warnermedia.kplserver;
+
+import com.google.gson.Gson;
+
+import java.io.*;
+import java.net.Socket;
+
+
+public class TestClientJSONKey {
+ public static class TestWithKey {
+ String kdsHashKey;
+ String testa;
+ }
+
+ public static void main(String[] args) throws Exception, IOException, ClassNotFoundException, InterruptedException {
+
+ System.out.println("starting client");
+
+ // establish socket connection to server
+ Socket socket = new Socket("127.0.0.1", 3000);
+ OutputStreamWriter out = new OutputStreamWriter(socket.getOutputStream(), "UTF-8");
+
+ System.out.println("Sending request to Socket Server");
+
+ TestWithKey tst = new TestWithKey();
+ tst.kdsHashKey = "mykey";
+ tst.testa ="hello";
+
+ Gson gson = new Gson();
+ String jsonResult = gson.toJson(tst);
+ String jsonFinal = jsonResult + "\n";
+
+ out.write(jsonFinal);
+ out.flush();
+ Thread.sleep(100);
+
+ socket.close();
+ }
+}
diff --git a/src/main/java/com/warnermedia/kplserver/TestClientJSONNoKey.java b/src/main/java/com/warnermedia/kplserver/TestClientJSONNoKey.java
new file mode 100644
index 0000000..5629f61
--- /dev/null
+++ b/src/main/java/com/warnermedia/kplserver/TestClientJSONNoKey.java
@@ -0,0 +1,37 @@
+package com.warnermedia.kplserver;
+
+import com.google.gson.Gson;
+
+import java.io.*;
+import java.net.Socket;
+
+
+public class TestClientJSONNoKey {
+ public static class TestWithNoKey {
+ String testa;
+ }
+
+ public static void main(String[] args) throws Exception, IOException, ClassNotFoundException, InterruptedException {
+
+ System.out.println("starting client");
+
+ // establish socket connection to server
+ Socket socket = new Socket("127.0.0.1", 3000);
+ OutputStreamWriter out = new OutputStreamWriter(socket.getOutputStream(), "UTF-8");
+
+ System.out.println("Sending request to Socket Server");
+
+ TestWithNoKey tst = new TestWithNoKey();
+ tst.testa ="hello";
+
+ Gson gson = new Gson();
+ String jsonResult = gson.toJson(tst);
+ String jsonFinal = jsonResult + "\n";
+
+ out.write(jsonFinal);
+ out.flush();
+ Thread.sleep(100);
+
+ socket.close();
+ }
+}