Skip to content

Commit

Permalink
Merge b67521e into 38bba80
Browse files Browse the repository at this point in the history
  • Loading branch information
smithatlanta authored Feb 8, 2022
2 parents 38bba80 + b67521e commit 28f7b5c
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 1 deletion.
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
<version>4.13.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.9</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-producer</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand Down Expand Up @@ -46,14 +48,33 @@ public void runOnce(String line) throws Exception {

ByteBuffer data = ByteBuffer.wrap(finalLine.getBytes(java.nio.charset.StandardCharsets.UTF_8));

// Need to serialize this to an object to get the key.
String hashKey;

Gson gson = new Gson();
try {
MinimalKey minimal = gson.fromJson(line, MinimalKey.class);
if (minimal.kdsHashKey != null) {
hashKey = minimal.kdsHashKey;
log.debug("Using passed in hash key");
} else {
hashKey = randomExplicitHashKey();
log.debug("Using random hash key");
}
}
catch (JsonSyntaxException e) {
hashKey = randomExplicitHashKey();
log.debug("Using random hash key");
}

//This is a measure of the backpressure in the system, which should be checked before putting more records,
//to avoid exhausting system resources.
while (kinesis.getOutstandingRecordsCount() > 1e4) {
log.info("Too many outstanding records pending in the queue. Waiting for a second.");
Thread.sleep(500);
}

UserRecord userRecord = new UserRecord(stream, " ", randomExplicitHashKey(), data);
UserRecord userRecord = new UserRecord(stream, " ", hashKey, data);
ListenableFuture<UserRecordResult> f = kinesis.addUserRecord(userRecord);

Futures.addCallback(f, new FutureCallback<UserRecordResult>() {
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/com/warnermedia/kplserver/MinimalKey.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.warnermedia.kplserver;

public class MinimalKey {
String kdsHashKey = null;
}
39 changes: 39 additions & 0 deletions src/main/java/com/warnermedia/kplserver/TestClientJSONKey.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.warnermedia.kplserver;

import com.google.gson.Gson;

import java.io.*;
import java.net.Socket;


public class TestClientJSONKey {
public static class TestWithKey {
String kdsHashKey;
String testa;
}

public static void main(String[] args) throws Exception, IOException, ClassNotFoundException, InterruptedException {

System.out.println("starting client");

// establish socket connection to server
Socket socket = new Socket("127.0.0.1", 3000);
OutputStreamWriter out = new OutputStreamWriter(socket.getOutputStream(), "UTF-8");

System.out.println("Sending request to Socket Server");

TestWithKey tst = new TestWithKey();
tst.kdsHashKey = "mykey";
tst.testa ="hello";

Gson gson = new Gson();
String jsonResult = gson.toJson(tst);
String jsonFinal = jsonResult + "\n";

out.write(jsonFinal);
out.flush();
Thread.sleep(100);

socket.close();
}
}
37 changes: 37 additions & 0 deletions src/main/java/com/warnermedia/kplserver/TestClientJSONNoKey.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.warnermedia.kplserver;

import com.google.gson.Gson;

import java.io.*;
import java.net.Socket;


public class TestClientJSONNoKey {
public static class TestWithNoKey {
String testa;
}

public static void main(String[] args) throws Exception, IOException, ClassNotFoundException, InterruptedException {

System.out.println("starting client");

// establish socket connection to server
Socket socket = new Socket("127.0.0.1", 3000);
OutputStreamWriter out = new OutputStreamWriter(socket.getOutputStream(), "UTF-8");

System.out.println("Sending request to Socket Server");

TestWithNoKey tst = new TestWithNoKey();
tst.testa ="hello";

Gson gson = new Gson();
String jsonResult = gson.toJson(tst);
String jsonFinal = jsonResult + "\n";

out.write(jsonFinal);
out.flush();
Thread.sleep(100);

socket.close();
}
}

0 comments on commit 28f7b5c

Please sign in to comment.