diff --git a/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java b/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java index a2098e126a3b1..297970ea9a3cf 100644 --- a/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java @@ -300,11 +300,11 @@ static Set findMissing(Set toFind, Set response) { + String verifyFullFetchResponsePartitions(FetchResponse response) { StringBuilder bld = new StringBuilder(); - Set omitted = - findMissing(response.responseData().keySet(), sessionPartitions.keySet()); Set extra = + findMissing(response.responseData().keySet(), sessionPartitions.keySet()); + Set omitted = findMissing(sessionPartitions.keySet(), response.responseData().keySet()); if (!omitted.isEmpty()) { bld.append("omitted=(").append(Utils.join(omitted, ", ")).append(", "); @@ -313,7 +313,7 @@ private String verifyFullFetchResponsePartitions(FetchResponse response) { bld.append("extra=(").append(Utils.join(extra, ", ")).append(", "); } if ((!omitted.isEmpty()) || (!extra.isEmpty())) { - bld.append("response=(").append(Utils.join(response.responseData().keySet(), ", ")); + bld.append("response=(").append(Utils.join(response.responseData().keySet(), ", ")).append(")"); return bld.toString(); } return null; @@ -325,7 +325,7 @@ private String verifyFullFetchResponsePartitions(FetchResponse response) { * @param response The response. * @return True if the incremental fetch response partitions are valid. */ - private String verifyIncrementalFetchResponsePartitions(FetchResponse response) { + String verifyIncrementalFetchResponsePartitions(FetchResponse response) { Set extra = findMissing(response.responseData().keySet(), sessionPartitions.keySet()); if (!extra.isEmpty()) { @@ -394,7 +394,22 @@ public boolean handleResponse(FetchResponse response) { nextMetadata = nextMetadata.nextCloseExisting(); } return false; - } else if (nextMetadata.isFull()) { + } + if (nextMetadata.isFull()) { + if (response.responseData().isEmpty() && response.throttleTimeMs() > 0) { + // Normally, an empty full fetch response would be invalid. However, KIP-219 + // specifies that if the broker wants to throttle the client, it will respond + // to a full fetch request with an empty response and a throttleTimeMs + // value set. We don't want to log this with a warning, since it's not an error. + // However, the empty full fetch response can't be processed, so it's still appropriate + // to return false here. + if (log.isDebugEnabled()) { + log.debug("Node {} sent a empty full fetch response to indicate that this " + + "client should be throttled for {} ms.", node, response.throttleTimeMs()); + } + nextMetadata = FetchMetadata.INITIAL; + return false; + } String problem = verifyFullFetchResponsePartitions(response); if (problem != null) { log.info("Node {} sent an invalid full fetch response with {}", node, problem); @@ -428,9 +443,12 @@ public boolean handleResponse(FetchResponse response) { return true; } else { // The incremental fetch session was continued by the server. + // We don't have to do anything special here to support KIP-219, since an empty incremental + // fetch request is perfectly valid. if (log.isDebugEnabled()) - log.debug("Node {} sent an incremental fetch response for session {}{}", - node, response.sessionId(), responseDataToLogString(response)); + log.debug("Node {} sent an incremental fetch response with throttleTimeMs = {} " + + "for session {}{}", response.throttleTimeMs(), node, response.sessionId(), + responseDataToLogString(response)); nextMetadata = nextMetadata.nextIncremental(); return true; } diff --git a/clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java index ec1b0624169af..810c4ecb6faeb 100644 --- a/clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java @@ -353,4 +353,36 @@ public void testIncrementalPartitionRemoval() { assertMapsEqual(reqMap(new ReqEntry("foo", 0, 0, 100, 200)), data3.sessionPartitions(), data3.toSend()); } + + @Test + public void testVerifyFullFetchResponsePartitions() throws Exception { + FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1); + String issue = handler.verifyFullFetchResponsePartitions(new FetchResponse<>(Errors.NONE, + respMap(new RespEntry("foo", 0, 10, 20), + new RespEntry("foo", 1, 10, 20), + new RespEntry("bar", 0, 10, 20)), + 0, INVALID_SESSION_ID)); + assertTrue(issue.contains("extra")); + assertFalse(issue.contains("omitted")); + FetchSessionHandler.Builder builder = handler.newBuilder(); + builder.add(new TopicPartition("foo", 0), + new FetchRequest.PartitionData(0, 100, 200, Optional.empty())); + builder.add(new TopicPartition("foo", 1), + new FetchRequest.PartitionData(10, 110, 210, Optional.empty())); + builder.add(new TopicPartition("bar", 0), + new FetchRequest.PartitionData(20, 120, 220, Optional.empty())); + builder.build(); + String issue2 = handler.verifyFullFetchResponsePartitions(new FetchResponse<>(Errors.NONE, + respMap(new RespEntry("foo", 0, 10, 20), + new RespEntry("foo", 1, 10, 20), + new RespEntry("bar", 0, 10, 20)), + 0, INVALID_SESSION_ID)); + assertTrue(issue2 == null); + String issue3 = handler.verifyFullFetchResponsePartitions(new FetchResponse<>(Errors.NONE, + respMap(new RespEntry("foo", 0, 10, 20), + new RespEntry("foo", 1, 10, 20)), + 0, INVALID_SESSION_ID)); + assertFalse(issue3.contains("extra")); + assertTrue(issue3.contains("omitted")); + } }