From 31ac7afe1912820f43084522a2c70282e1c9067d Mon Sep 17 00:00:00 2001 From: Shreyansh Sangolli Date: Thu, 4 Apr 2024 19:38:50 +0530 Subject: [PATCH] fix: fixing incorrect Data in DLQ for HTTP/Prometheus Sink (#34) fix: fixing incorrect Data in DLQ for HTTP/Prometheus Sink #27 issue: https://github.com/goto/firehose/issues/27 Co-authored-by: Shreyansh --- build.gradle | 2 +- .../sink/common/AbstractHttpSink.java | 23 +++++++----- .../firehose/sink/http/HttpSink.java | 4 ++- .../firehose/sink/prometheus/PromSink.java | 4 ++- .../firehose/sink/http/HttpSinkTest.java | 35 ++++++++++++------- .../sink/prometheus/PromSinkTest.java | 25 ++++++++----- 6 files changed, 61 insertions(+), 32 deletions(-) diff --git a/build.gradle b/build.gradle index d5bc72bc6..3c26e828c 100644 --- a/build.gradle +++ b/build.gradle @@ -33,7 +33,7 @@ lombok { } group 'com.gotocompany' -version '0.9.7' +version '0.9.8' def projName = "firehose" diff --git a/src/main/java/com/gotocompany/firehose/sink/common/AbstractHttpSink.java b/src/main/java/com/gotocompany/firehose/sink/common/AbstractHttpSink.java index 2a11b6118..39388e3e1 100644 --- a/src/main/java/com/gotocompany/firehose/sink/common/AbstractHttpSink.java +++ b/src/main/java/com/gotocompany/firehose/sink/common/AbstractHttpSink.java @@ -1,7 +1,7 @@ package com.gotocompany.firehose.sink.common; -import com.gotocompany.firehose.exception.NeedToRetry; +import com.gotocompany.firehose.exception.DeserializerException; import com.gotocompany.firehose.message.Message; import com.gotocompany.firehose.metrics.FirehoseInstrumentation; import com.gotocompany.firehose.metrics.Metrics; @@ -18,6 +18,7 @@ import java.io.InputStream; import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; +import java.sql.SQLException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -33,6 +34,7 @@ public abstract class AbstractHttpSink extends AbstractSink { private final Map retryStatusCodeRanges; private final Map requestLogStatusCodeRanges; protected static final String SUCCESS_CODE_PATTERN = "^2.*"; + private List sourceMessages; public AbstractHttpSink(FirehoseInstrumentation firehoseInstrumentation, String sinkType, HttpClient httpClient, StencilClient stencilClient, Map retryStatusCodeRanges, Map requestLogStatusCodeRanges) { super(firehoseInstrumentation, sinkType); @@ -45,22 +47,23 @@ public AbstractHttpSink(FirehoseInstrumentation firehoseInstrumentation, String @Override public List execute() throws Exception { HttpResponse response = null; - for (HttpEntityEnclosingRequestBase httpRequest : httpRequests) { + ArrayList failedMessages = new ArrayList<>(); + for (int i = 0; i < httpRequests.size(); i++) { try { - response = httpClient.execute(httpRequest); + response = httpClient.execute(httpRequests.get(i)); List contentStringList = null; getFirehoseInstrumentation().logInfo("Response Status: {}", statusCode(response)); if (shouldLogResponse(response)) { printResponse(response); } if (shouldLogRequest(response)) { - contentStringList = readContent(httpRequest); - printRequest(httpRequest, contentStringList); + contentStringList = readContent(httpRequests.get(i)); + printRequest(httpRequests.get(i), contentStringList); } if (shouldRetry(response)) { - throw new NeedToRetry(statusCode(response)); + failedMessages.add(sourceMessages.get(i)); } else if (!Pattern.compile(SUCCESS_CODE_PATTERN).matcher(String.valueOf(response.getStatusLine().getStatusCode())).matches()) { - contentStringList = contentStringList == null ? readContent(httpRequest) : contentStringList; + contentStringList = contentStringList == null ? readContent(httpRequests.get(i)) : contentStringList; captureMessageDropCount(response, contentStringList); } } finally { @@ -68,7 +71,7 @@ public List execute() throws Exception { captureHttpStatusCount(response); } } - return new ArrayList<>(); + return failedMessages; } @Override @@ -78,6 +81,10 @@ public void close() throws IOException { getStencilClient().close(); } + @Override + protected void prepare(List messages) throws DeserializerException, IOException, SQLException { + this.sourceMessages = messages; + } private void consumeResponse(HttpResponse response) { if (response != null) { diff --git a/src/main/java/com/gotocompany/firehose/sink/http/HttpSink.java b/src/main/java/com/gotocompany/firehose/sink/http/HttpSink.java index 4313b959f..7b8bc55bf 100644 --- a/src/main/java/com/gotocompany/firehose/sink/http/HttpSink.java +++ b/src/main/java/com/gotocompany/firehose/sink/http/HttpSink.java @@ -18,6 +18,7 @@ import java.io.InputStreamReader; import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; +import java.sql.SQLException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -48,8 +49,9 @@ public HttpSink(FirehoseInstrumentation firehoseInstrumentation, Request request } @Override - protected void prepare(List messages) throws DeserializerException, IOException { + protected void prepare(List messages) throws DeserializerException, IOException, SQLException { try { + super.prepare(messages); setHttpRequests(request.build(messages)); } catch (URISyntaxException e) { throw new IOException(e); diff --git a/src/main/java/com/gotocompany/firehose/sink/prometheus/PromSink.java b/src/main/java/com/gotocompany/firehose/sink/prometheus/PromSink.java index 6186d3366..4dfb2b9f4 100644 --- a/src/main/java/com/gotocompany/firehose/sink/prometheus/PromSink.java +++ b/src/main/java/com/gotocompany/firehose/sink/prometheus/PromSink.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.io.InputStream; import java.net.URISyntaxException; +import java.sql.SQLException; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -53,8 +54,9 @@ public PromSink(FirehoseInstrumentation firehoseInstrumentation, PromRequest req * @throws IOException the io exception */ @Override - protected void prepare(List messages) throws DeserializerException, IOException { + protected void prepare(List messages) throws DeserializerException, IOException, SQLException { try { + super.prepare(messages); setHttpRequests(request.build(messages)); } catch (URISyntaxException e) { throw new IOException(e); diff --git a/src/test/java/com/gotocompany/firehose/sink/http/HttpSinkTest.java b/src/test/java/com/gotocompany/firehose/sink/http/HttpSinkTest.java index a2d068baa..13ca165e9 100644 --- a/src/test/java/com/gotocompany/firehose/sink/http/HttpSinkTest.java +++ b/src/test/java/com/gotocompany/firehose/sink/http/HttpSinkTest.java @@ -3,7 +3,6 @@ import com.gotocompany.firehose.config.converter.RangeToHashMapConverter; import com.gotocompany.firehose.exception.DeserializerException; -import com.gotocompany.firehose.exception.NeedToRetry; import com.gotocompany.firehose.message.Message; import com.gotocompany.firehose.metrics.FirehoseInstrumentation; import com.gotocompany.firehose.sink.http.request.types.Request; @@ -27,8 +26,11 @@ import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.sql.SQLException; import java.util.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.mockito.Mockito.*; import static org.mockito.MockitoAnnotations.initMocks; @@ -98,8 +100,8 @@ public void shouldCallHttpClientWithProperRequest() throws Exception { verify(httpClient, times(1)).execute(httpPost); } - @Test(expected = NeedToRetry.class) - public void shouldThrowNeedToRetryExceptionWhenResponseCodeIsGivenRange() throws Exception { + @Test + public void shouldReturnBackFailedMessagesWhenResponseCodeIsGivenRange() throws Exception { when(response.getStatusLine()).thenReturn(statusLine); when(statusLine.getStatusCode()).thenReturn(500); @@ -115,11 +117,14 @@ public void shouldThrowNeedToRetryExceptionWhenResponseCodeIsGivenRange() throws HttpSink httpSink = new HttpSink(firehoseInstrumentation, request, httpClient, stencilClient, new RangeToHashMapConverter().convert(null, "400-505"), requestLogStatusCodeRanges); httpSink.prepare(messages); - httpSink.execute(); + List failedMessages = httpSink.execute(); + + assertFalse(failedMessages.isEmpty()); + assertEquals(1, failedMessages.size()); } - @Test(expected = NeedToRetry.class) - public void shouldThrowNeedToRetryExceptionWhenResponseIsNull() throws Exception { + @Test + public void shouldReturnBackFailedMessagesWhenResponseIsNull() throws Exception { List httpRequests = Arrays.asList(httpPut); @@ -133,11 +138,14 @@ public void shouldThrowNeedToRetryExceptionWhenResponseIsNull() throws Exception HttpSink httpSink = new HttpSink(firehoseInstrumentation, request, httpClient, stencilClient, retryStatusCodeRange, requestLogStatusCodeRanges); httpSink.prepare(messages); - httpSink.execute(); + List failedMessages = httpSink.execute(); + + assertFalse(failedMessages.isEmpty()); + assertEquals(1, failedMessages.size()); } - @Test(expected = NeedToRetry.class) - public void shouldThrowNeedToRetryExceptionWhenResponseStatusCodeIsZero() throws Exception { + @Test + public void shouldReturnBackFailedMessagesWhenResponseStatusCodeIsZero() throws Exception { List httpRequests = Arrays.asList(httpPut); @@ -152,11 +160,14 @@ public void shouldThrowNeedToRetryExceptionWhenResponseStatusCodeIsZero() throws HttpSink httpSink = new HttpSink(firehoseInstrumentation, request, httpClient, stencilClient, retryStatusCodeRange, requestLogStatusCodeRanges); httpSink.prepare(messages); - httpSink.execute(); + List failedMessages = httpSink.execute(); + + assertFalse(failedMessages.isEmpty()); + assertEquals(1, failedMessages.size()); } @Test(expected = IOException.class) - public void shouldCatchURISyntaxExceptionAndThrowIOException() throws URISyntaxException, DeserializerException, IOException { + public void shouldCatchURISyntaxExceptionAndThrowIOException() throws URISyntaxException, DeserializerException, IOException, SQLException { when(request.build(messages)).thenThrow(new URISyntaxException("", "")); HttpSink httpSink = new HttpSink(firehoseInstrumentation, request, httpClient, stencilClient, retryStatusCodeRange, requestLogStatusCodeRanges); @@ -324,7 +335,7 @@ public void shouldCaptureDroppedMessagesMetricsIfNotInStatusCodeRange() throws E verify(firehoseInstrumentation, times(1)).captureCount("firehose_sink_messages_drop_total", 2L, "cause= 500"); } - @Test(expected = NeedToRetry.class) + @Test public void shouldNotCaptureDroppedMessagesMetricsIfInStatusCodeRange() throws Exception { when(response.getStatusLine()).thenReturn(statusLine); when(statusLine.getStatusCode()).thenReturn(500); diff --git a/src/test/java/com/gotocompany/firehose/sink/prometheus/PromSinkTest.java b/src/test/java/com/gotocompany/firehose/sink/prometheus/PromSinkTest.java index 75f2380bd..222fdae72 100644 --- a/src/test/java/com/gotocompany/firehose/sink/prometheus/PromSinkTest.java +++ b/src/test/java/com/gotocompany/firehose/sink/prometheus/PromSinkTest.java @@ -3,7 +3,6 @@ import com.gotocompany.firehose.config.converter.RangeToHashMapConverter; import com.gotocompany.firehose.exception.DeserializerException; -import com.gotocompany.firehose.exception.NeedToRetry; import com.gotocompany.firehose.message.Message; import com.gotocompany.firehose.metrics.FirehoseInstrumentation; import com.gotocompany.firehose.sink.prometheus.request.PromRequest; @@ -29,12 +28,14 @@ import java.io.InputStream; import java.net.URI; import java.net.URISyntaxException; +import java.sql.SQLException; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.mockito.Mockito.*; import static org.mockito.MockitoAnnotations.initMocks; @@ -99,8 +100,8 @@ public void shouldPrepareRequestDuringPreparationAndCallItDuringExecution() thro verify(httpClient, times(1)).execute(httpPost); } - @Test(expected = NeedToRetry.class) - public void shouldThrowNeedToRetryExceptionWhenResponseCodeIsGivenRange() throws Exception { + @Test + public void shouldReturnBackFailedMessagesWhenResponseCodeIsGivenRange() throws Exception { when(response.getStatusLine()).thenReturn(statusLine); when(statusLine.getStatusCode()).thenReturn(500); when(httpPost.getURI()).thenReturn(new URI("http://dummy.com")); @@ -110,11 +111,14 @@ public void shouldThrowNeedToRetryExceptionWhenResponseCodeIsGivenRange() throws PromSink promSink = new PromSink(firehoseInstrumentation, request, httpClient, stencilClient, new RangeToHashMapConverter().convert(null, "400-505"), requestLogStatusCodeRanges); promSink.prepare(messages); - promSink.execute(); + List failedMessages = promSink.execute(); + + assertFalse(failedMessages.isEmpty()); + assertEquals(1, failedMessages.size()); } - @Test(expected = NeedToRetry.class) - public void shouldThrowNeedToRetryExceptionWhenResponseCodeIsNull() throws Exception { + @Test + public void shouldReturnBackFailedMessagesWhenResponseCodeIsNull() throws Exception { when(httpPost.getURI()).thenReturn(new URI("http://dummy.com")); when(httpPost.getAllHeaders()).thenReturn(new Header[]{}); when(httpPost.getEntity()).thenReturn(httpEntity); @@ -124,11 +128,14 @@ public void shouldThrowNeedToRetryExceptionWhenResponseCodeIsNull() throws Excep PromSink promSink = new PromSink(firehoseInstrumentation, request, httpClient, stencilClient, retryStatusCodeRange, requestLogStatusCodeRanges); promSink.prepare(messages); - promSink.execute(); + List failedMessages = promSink.execute(); + + assertFalse(failedMessages.isEmpty()); + assertEquals(1, failedMessages.size()); } @Test(expected = IOException.class) - public void shouldCatchURISyntaxExceptionAndThrowIOException() throws URISyntaxException, DeserializerException, IOException { + public void shouldCatchURISyntaxExceptionAndThrowIOException() throws URISyntaxException, DeserializerException, IOException, SQLException { when(request.build(messages)).thenThrow(new URISyntaxException("", "")); PromSink promSink = new PromSink(firehoseInstrumentation, request, httpClient, stencilClient, retryStatusCodeRange, requestLogStatusCodeRanges); @@ -218,7 +225,7 @@ public void shouldCaptureDroppedMessagesMetricsIfNotInStatusCodeRange() throws E verify(firehoseInstrumentation, times(1)).captureCount("firehose_sink_messages_drop_total", 1L, "cause= 500"); } - @Test(expected = NeedToRetry.class) + @Test public void shouldNotCaptureDroppedMessagesMetricsIfInStatusCodeRange() throws Exception { when(response.getStatusLine()).thenReturn(statusLine); when(statusLine.getStatusCode()).thenReturn(500);