diff --git a/pom.xml b/pom.xml index 7cfbd37..97d7975 100644 --- a/pom.xml +++ b/pom.xml @@ -18,6 +18,11 @@ 4.13.1 test + + com.google.code.gson + gson + 2.8.9 + com.amazonaws amazon-kinesis-producer diff --git a/src/main/java/com/warnermedia/kplserver/KinesisEventPublisher.java b/src/main/java/com/warnermedia/kplserver/KinesisEventPublisher.java index 5131bd2..bbb5a37 100644 --- a/src/main/java/com/warnermedia/kplserver/KinesisEventPublisher.java +++ b/src/main/java/com/warnermedia/kplserver/KinesisEventPublisher.java @@ -8,6 +8,8 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.gson.Gson; +import com.google.gson.JsonSyntaxException; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -46,6 +48,25 @@ public void runOnce(String line) throws Exception { ByteBuffer data = ByteBuffer.wrap(finalLine.getBytes(java.nio.charset.StandardCharsets.UTF_8)); + // Need to serialize this to an object to get the key. + String hashKey; + + Gson gson = new Gson(); + try { + MinimalKey minimal = gson.fromJson(line, MinimalKey.class); + if (minimal.kdsHashKey != null) { + hashKey = minimal.kdsHashKey; + log.debug("Using passed in hash key"); + } else { + hashKey = randomExplicitHashKey(); + log.debug("Using random hash key"); + } + } + catch (JsonSyntaxException e) { + hashKey = randomExplicitHashKey(); + log.debug("Using random hash key"); + } + //This is a measure of the backpressure in the system, which should be checked before putting more records, //to avoid exhausting system resources. while (kinesis.getOutstandingRecordsCount() > 1e4) { @@ -53,7 +74,7 @@ public void runOnce(String line) throws Exception { Thread.sleep(500); } - UserRecord userRecord = new UserRecord(stream, " ", randomExplicitHashKey(), data); + UserRecord userRecord = new UserRecord(stream, " ", hashKey, data); ListenableFuture f = kinesis.addUserRecord(userRecord); Futures.addCallback(f, new FutureCallback() { diff --git a/src/main/java/com/warnermedia/kplserver/MinimalKey.java b/src/main/java/com/warnermedia/kplserver/MinimalKey.java new file mode 100644 index 0000000..29ee105 --- /dev/null +++ b/src/main/java/com/warnermedia/kplserver/MinimalKey.java @@ -0,0 +1,5 @@ +package com.warnermedia.kplserver; + +public class MinimalKey { + String kdsHashKey = null; +} diff --git a/src/main/java/com/warnermedia/kplserver/TestClientJSONKey.java b/src/main/java/com/warnermedia/kplserver/TestClientJSONKey.java new file mode 100644 index 0000000..03ec08c --- /dev/null +++ b/src/main/java/com/warnermedia/kplserver/TestClientJSONKey.java @@ -0,0 +1,39 @@ +package com.warnermedia.kplserver; + +import com.google.gson.Gson; + +import java.io.*; +import java.net.Socket; + + +public class TestClientJSONKey { + public static class TestWithKey { + String kdsHashKey; + String testa; + } + + public static void main(String[] args) throws Exception, IOException, ClassNotFoundException, InterruptedException { + + System.out.println("starting client"); + + // establish socket connection to server + Socket socket = new Socket("127.0.0.1", 3000); + OutputStreamWriter out = new OutputStreamWriter(socket.getOutputStream(), "UTF-8"); + + System.out.println("Sending request to Socket Server"); + + TestWithKey tst = new TestWithKey(); + tst.kdsHashKey = "mykey"; + tst.testa ="hello"; + + Gson gson = new Gson(); + String jsonResult = gson.toJson(tst); + String jsonFinal = jsonResult + "\n"; + + out.write(jsonFinal); + out.flush(); + Thread.sleep(100); + + socket.close(); + } +} diff --git a/src/main/java/com/warnermedia/kplserver/TestClientJSONNoKey.java b/src/main/java/com/warnermedia/kplserver/TestClientJSONNoKey.java new file mode 100644 index 0000000..5629f61 --- /dev/null +++ b/src/main/java/com/warnermedia/kplserver/TestClientJSONNoKey.java @@ -0,0 +1,37 @@ +package com.warnermedia.kplserver; + +import com.google.gson.Gson; + +import java.io.*; +import java.net.Socket; + + +public class TestClientJSONNoKey { + public static class TestWithNoKey { + String testa; + } + + public static void main(String[] args) throws Exception, IOException, ClassNotFoundException, InterruptedException { + + System.out.println("starting client"); + + // establish socket connection to server + Socket socket = new Socket("127.0.0.1", 3000); + OutputStreamWriter out = new OutputStreamWriter(socket.getOutputStream(), "UTF-8"); + + System.out.println("Sending request to Socket Server"); + + TestWithNoKey tst = new TestWithNoKey(); + tst.testa ="hello"; + + Gson gson = new Gson(); + String jsonResult = gson.toJson(tst); + String jsonFinal = jsonResult + "\n"; + + out.write(jsonFinal); + out.flush(); + Thread.sleep(100); + + socket.close(); + } +}