Skip to content

Commit

Permalink
Added code to crack open the JSON to see if it has the kdshashkey
Browse files Browse the repository at this point in the history
We need this to allow us to optimize session based KDA processing for real time.  It should have minimal impact on existing implementations
  • Loading branch information
smithatlanta committed Feb 8, 2022
1 parent 38bba80 commit b67521e
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 1 deletion.
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
<version>4.13.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.9</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-producer</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand Down Expand Up @@ -46,14 +48,33 @@ public void runOnce(String line) throws Exception {

ByteBuffer data = ByteBuffer.wrap(finalLine.getBytes(java.nio.charset.StandardCharsets.UTF_8));

// Need to serialize this to an object to get the key.
String hashKey;

Gson gson = new Gson();
try {
MinimalKey minimal = gson.fromJson(line, MinimalKey.class);
if (minimal.kdsHashKey != null) {
hashKey = minimal.kdsHashKey;
log.debug("Using passed in hash key");
} else {
hashKey = randomExplicitHashKey();
log.debug("Using random hash key");
}
}
catch (JsonSyntaxException e) {
hashKey = randomExplicitHashKey();
log.debug("Using random hash key");
}

//This is a measure of the backpressure in the system, which should be checked before putting more records,
//to avoid exhausting system resources.
while (kinesis.getOutstandingRecordsCount() > 1e4) {
log.info("Too many outstanding records pending in the queue. Waiting for a second.");
Thread.sleep(500);
}

UserRecord userRecord = new UserRecord(stream, " ", randomExplicitHashKey(), data);
UserRecord userRecord = new UserRecord(stream, " ", hashKey, data);
ListenableFuture<UserRecordResult> f = kinesis.addUserRecord(userRecord);

Futures.addCallback(f, new FutureCallback<UserRecordResult>() {
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/com/warnermedia/kplserver/MinimalKey.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.warnermedia.kplserver;

public class MinimalKey {
String kdsHashKey = null;
}
39 changes: 39 additions & 0 deletions src/main/java/com/warnermedia/kplserver/TestClientJSONKey.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.warnermedia.kplserver;

import com.google.gson.Gson;

import java.io.*;
import java.net.Socket;


public class TestClientJSONKey {
public static class TestWithKey {
String kdsHashKey;
String testa;
}

public static void main(String[] args) throws Exception, IOException, ClassNotFoundException, InterruptedException {

System.out.println("starting client");

// establish socket connection to server
Socket socket = new Socket("127.0.0.1", 3000);
OutputStreamWriter out = new OutputStreamWriter(socket.getOutputStream(), "UTF-8");

System.out.println("Sending request to Socket Server");

TestWithKey tst = new TestWithKey();
tst.kdsHashKey = "mykey";
tst.testa ="hello";

Gson gson = new Gson();
String jsonResult = gson.toJson(tst);
String jsonFinal = jsonResult + "\n";

out.write(jsonFinal);
out.flush();
Thread.sleep(100);

socket.close();
}
}
37 changes: 37 additions & 0 deletions src/main/java/com/warnermedia/kplserver/TestClientJSONNoKey.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.warnermedia.kplserver;

import com.google.gson.Gson;

import java.io.*;
import java.net.Socket;


public class TestClientJSONNoKey {
public static class TestWithNoKey {
String testa;
}

public static void main(String[] args) throws Exception, IOException, ClassNotFoundException, InterruptedException {

System.out.println("starting client");

// establish socket connection to server
Socket socket = new Socket("127.0.0.1", 3000);
OutputStreamWriter out = new OutputStreamWriter(socket.getOutputStream(), "UTF-8");

System.out.println("Sending request to Socket Server");

TestWithNoKey tst = new TestWithNoKey();
tst.testa ="hello";

Gson gson = new Gson();
String jsonResult = gson.toJson(tst);
String jsonFinal = jsonResult + "\n";

out.write(jsonFinal);
out.flush();
Thread.sleep(100);

socket.close();
}
}

0 comments on commit b67521e

Please sign in to comment.