Skip to content

Commit

Permalink
fix: fixing incorrect Data in DLQ for HTTP/Prometheus Sink (#34)
Browse files Browse the repository at this point in the history
fix: fixing incorrect Data in DLQ for HTTP/Prometheus Sink #27
issue: #27

Co-authored-by: Shreyansh <[email protected]>
  • Loading branch information
Shreyansh228 and Shreyansh authored Apr 4, 2024
1 parent 7a66d9f commit 31ac7af
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 32 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ lombok {
}

group 'com.gotocompany'
version '0.9.7'
version '0.9.8'

def projName = "firehose"

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.gotocompany.firehose.sink.common;


import com.gotocompany.firehose.exception.NeedToRetry;
import com.gotocompany.firehose.exception.DeserializerException;
import com.gotocompany.firehose.message.Message;
import com.gotocompany.firehose.metrics.FirehoseInstrumentation;
import com.gotocompany.firehose.metrics.Metrics;
Expand All @@ -18,6 +18,7 @@
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand All @@ -33,6 +34,7 @@ public abstract class AbstractHttpSink extends AbstractSink {
private final Map<Integer, Boolean> retryStatusCodeRanges;
private final Map<Integer, Boolean> requestLogStatusCodeRanges;
protected static final String SUCCESS_CODE_PATTERN = "^2.*";
private List<Message> sourceMessages;

public AbstractHttpSink(FirehoseInstrumentation firehoseInstrumentation, String sinkType, HttpClient httpClient, StencilClient stencilClient, Map<Integer, Boolean> retryStatusCodeRanges, Map<Integer, Boolean> requestLogStatusCodeRanges) {
super(firehoseInstrumentation, sinkType);
Expand All @@ -45,30 +47,31 @@ public AbstractHttpSink(FirehoseInstrumentation firehoseInstrumentation, String
@Override
public List<Message> execute() throws Exception {
HttpResponse response = null;
for (HttpEntityEnclosingRequestBase httpRequest : httpRequests) {
ArrayList<Message> failedMessages = new ArrayList<>();
for (int i = 0; i < httpRequests.size(); i++) {
try {
response = httpClient.execute(httpRequest);
response = httpClient.execute(httpRequests.get(i));
List<String> contentStringList = null;
getFirehoseInstrumentation().logInfo("Response Status: {}", statusCode(response));
if (shouldLogResponse(response)) {
printResponse(response);
}
if (shouldLogRequest(response)) {
contentStringList = readContent(httpRequest);
printRequest(httpRequest, contentStringList);
contentStringList = readContent(httpRequests.get(i));
printRequest(httpRequests.get(i), contentStringList);
}
if (shouldRetry(response)) {
throw new NeedToRetry(statusCode(response));
failedMessages.add(sourceMessages.get(i));
} else if (!Pattern.compile(SUCCESS_CODE_PATTERN).matcher(String.valueOf(response.getStatusLine().getStatusCode())).matches()) {
contentStringList = contentStringList == null ? readContent(httpRequest) : contentStringList;
contentStringList = contentStringList == null ? readContent(httpRequests.get(i)) : contentStringList;
captureMessageDropCount(response, contentStringList);
}
} finally {
consumeResponse(response);
captureHttpStatusCount(response);
}
}
return new ArrayList<>();
return failedMessages;
}

@Override
Expand All @@ -78,6 +81,10 @@ public void close() throws IOException {
getStencilClient().close();
}

@Override
protected void prepare(List<Message> messages) throws DeserializerException, IOException, SQLException {
this.sourceMessages = messages;
}

private void consumeResponse(HttpResponse response) {
if (response != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.io.InputStreamReader;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -48,8 +49,9 @@ public HttpSink(FirehoseInstrumentation firehoseInstrumentation, Request request
}

@Override
protected void prepare(List<Message> messages) throws DeserializerException, IOException {
protected void prepare(List<Message> messages) throws DeserializerException, IOException, SQLException {
try {
super.prepare(messages);
setHttpRequests(request.build(messages));
} catch (URISyntaxException e) {
throw new IOException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.net.URISyntaxException;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -53,8 +54,9 @@ public PromSink(FirehoseInstrumentation firehoseInstrumentation, PromRequest req
* @throws IOException the io exception
*/
@Override
protected void prepare(List<Message> messages) throws DeserializerException, IOException {
protected void prepare(List<Message> messages) throws DeserializerException, IOException, SQLException {
try {
super.prepare(messages);
setHttpRequests(request.build(messages));
} catch (URISyntaxException e) {
throw new IOException(e);
Expand Down
35 changes: 23 additions & 12 deletions src/test/java/com/gotocompany/firehose/sink/http/HttpSinkTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

import com.gotocompany.firehose.config.converter.RangeToHashMapConverter;
import com.gotocompany.firehose.exception.DeserializerException;
import com.gotocompany.firehose.exception.NeedToRetry;
import com.gotocompany.firehose.message.Message;
import com.gotocompany.firehose.metrics.FirehoseInstrumentation;
import com.gotocompany.firehose.sink.http.request.types.Request;
Expand All @@ -27,8 +26,11 @@
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.sql.SQLException;
import java.util.*;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.mockito.Mockito.*;
import static org.mockito.MockitoAnnotations.initMocks;

Expand Down Expand Up @@ -98,8 +100,8 @@ public void shouldCallHttpClientWithProperRequest() throws Exception {
verify(httpClient, times(1)).execute(httpPost);
}

@Test(expected = NeedToRetry.class)
public void shouldThrowNeedToRetryExceptionWhenResponseCodeIsGivenRange() throws Exception {
@Test
public void shouldReturnBackFailedMessagesWhenResponseCodeIsGivenRange() throws Exception {
when(response.getStatusLine()).thenReturn(statusLine);
when(statusLine.getStatusCode()).thenReturn(500);

Expand All @@ -115,11 +117,14 @@ public void shouldThrowNeedToRetryExceptionWhenResponseCodeIsGivenRange() throws
HttpSink httpSink = new HttpSink(firehoseInstrumentation, request, httpClient, stencilClient,
new RangeToHashMapConverter().convert(null, "400-505"), requestLogStatusCodeRanges);
httpSink.prepare(messages);
httpSink.execute();
List<Message> failedMessages = httpSink.execute();

assertFalse(failedMessages.isEmpty());
assertEquals(1, failedMessages.size());
}

@Test(expected = NeedToRetry.class)
public void shouldThrowNeedToRetryExceptionWhenResponseIsNull() throws Exception {
@Test
public void shouldReturnBackFailedMessagesWhenResponseIsNull() throws Exception {

List<HttpEntityEnclosingRequestBase> httpRequests = Arrays.asList(httpPut);

Expand All @@ -133,11 +138,14 @@ public void shouldThrowNeedToRetryExceptionWhenResponseIsNull() throws Exception

HttpSink httpSink = new HttpSink(firehoseInstrumentation, request, httpClient, stencilClient, retryStatusCodeRange, requestLogStatusCodeRanges);
httpSink.prepare(messages);
httpSink.execute();
List<Message> failedMessages = httpSink.execute();

assertFalse(failedMessages.isEmpty());
assertEquals(1, failedMessages.size());
}

@Test(expected = NeedToRetry.class)
public void shouldThrowNeedToRetryExceptionWhenResponseStatusCodeIsZero() throws Exception {
@Test
public void shouldReturnBackFailedMessagesWhenResponseStatusCodeIsZero() throws Exception {

List<HttpEntityEnclosingRequestBase> httpRequests = Arrays.asList(httpPut);

Expand All @@ -152,11 +160,14 @@ public void shouldThrowNeedToRetryExceptionWhenResponseStatusCodeIsZero() throws

HttpSink httpSink = new HttpSink(firehoseInstrumentation, request, httpClient, stencilClient, retryStatusCodeRange, requestLogStatusCodeRanges);
httpSink.prepare(messages);
httpSink.execute();
List<Message> failedMessages = httpSink.execute();

assertFalse(failedMessages.isEmpty());
assertEquals(1, failedMessages.size());
}

@Test(expected = IOException.class)
public void shouldCatchURISyntaxExceptionAndThrowIOException() throws URISyntaxException, DeserializerException, IOException {
public void shouldCatchURISyntaxExceptionAndThrowIOException() throws URISyntaxException, DeserializerException, IOException, SQLException {
when(request.build(messages)).thenThrow(new URISyntaxException("", ""));

HttpSink httpSink = new HttpSink(firehoseInstrumentation, request, httpClient, stencilClient, retryStatusCodeRange, requestLogStatusCodeRanges);
Expand Down Expand Up @@ -324,7 +335,7 @@ public void shouldCaptureDroppedMessagesMetricsIfNotInStatusCodeRange() throws E
verify(firehoseInstrumentation, times(1)).captureCount("firehose_sink_messages_drop_total", 2L, "cause= 500");
}

@Test(expected = NeedToRetry.class)
@Test
public void shouldNotCaptureDroppedMessagesMetricsIfInStatusCodeRange() throws Exception {
when(response.getStatusLine()).thenReturn(statusLine);
when(statusLine.getStatusCode()).thenReturn(500);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

import com.gotocompany.firehose.config.converter.RangeToHashMapConverter;
import com.gotocompany.firehose.exception.DeserializerException;
import com.gotocompany.firehose.exception.NeedToRetry;
import com.gotocompany.firehose.message.Message;
import com.gotocompany.firehose.metrics.FirehoseInstrumentation;
import com.gotocompany.firehose.sink.prometheus.request.PromRequest;
Expand All @@ -29,12 +28,14 @@
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.mockito.Mockito.*;
import static org.mockito.MockitoAnnotations.initMocks;

Expand Down Expand Up @@ -99,8 +100,8 @@ public void shouldPrepareRequestDuringPreparationAndCallItDuringExecution() thro
verify(httpClient, times(1)).execute(httpPost);
}

@Test(expected = NeedToRetry.class)
public void shouldThrowNeedToRetryExceptionWhenResponseCodeIsGivenRange() throws Exception {
@Test
public void shouldReturnBackFailedMessagesWhenResponseCodeIsGivenRange() throws Exception {
when(response.getStatusLine()).thenReturn(statusLine);
when(statusLine.getStatusCode()).thenReturn(500);
when(httpPost.getURI()).thenReturn(new URI("http://dummy.com"));
Expand All @@ -110,11 +111,14 @@ public void shouldThrowNeedToRetryExceptionWhenResponseCodeIsGivenRange() throws
PromSink promSink = new PromSink(firehoseInstrumentation, request, httpClient, stencilClient,
new RangeToHashMapConverter().convert(null, "400-505"), requestLogStatusCodeRanges);
promSink.prepare(messages);
promSink.execute();
List<Message> failedMessages = promSink.execute();

assertFalse(failedMessages.isEmpty());
assertEquals(1, failedMessages.size());
}

@Test(expected = NeedToRetry.class)
public void shouldThrowNeedToRetryExceptionWhenResponseCodeIsNull() throws Exception {
@Test
public void shouldReturnBackFailedMessagesWhenResponseCodeIsNull() throws Exception {
when(httpPost.getURI()).thenReturn(new URI("http://dummy.com"));
when(httpPost.getAllHeaders()).thenReturn(new Header[]{});
when(httpPost.getEntity()).thenReturn(httpEntity);
Expand All @@ -124,11 +128,14 @@ public void shouldThrowNeedToRetryExceptionWhenResponseCodeIsNull() throws Excep

PromSink promSink = new PromSink(firehoseInstrumentation, request, httpClient, stencilClient, retryStatusCodeRange, requestLogStatusCodeRanges);
promSink.prepare(messages);
promSink.execute();
List<Message> failedMessages = promSink.execute();

assertFalse(failedMessages.isEmpty());
assertEquals(1, failedMessages.size());
}

@Test(expected = IOException.class)
public void shouldCatchURISyntaxExceptionAndThrowIOException() throws URISyntaxException, DeserializerException, IOException {
public void shouldCatchURISyntaxExceptionAndThrowIOException() throws URISyntaxException, DeserializerException, IOException, SQLException {
when(request.build(messages)).thenThrow(new URISyntaxException("", ""));

PromSink promSink = new PromSink(firehoseInstrumentation, request, httpClient, stencilClient, retryStatusCodeRange, requestLogStatusCodeRanges);
Expand Down Expand Up @@ -218,7 +225,7 @@ public void shouldCaptureDroppedMessagesMetricsIfNotInStatusCodeRange() throws E
verify(firehoseInstrumentation, times(1)).captureCount("firehose_sink_messages_drop_total", 1L, "cause= 500");
}

@Test(expected = NeedToRetry.class)
@Test
public void shouldNotCaptureDroppedMessagesMetricsIfInStatusCodeRange() throws Exception {
when(response.getStatusLine()).thenReturn(statusLine);
when(statusLine.getStatusCode()).thenReturn(500);
Expand Down

0 comments on commit 31ac7af

Please sign in to comment.