From a95d648df16550fdbe98e0a073eb83ecf87f8ab2 Mon Sep 17 00:00:00 2001 From: Huilin Shi Date: Tue, 22 Nov 2022 11:14:38 -0800 Subject: [PATCH] Exclude the fetch requests with large fetch.max.wait.ms in SizeBucketMetrics #418 TICKET = LIKAFKA-47556 Establish Kafka Server SLOs LI_DESCRIPTION = This PR is to exclude the fetch requests that has fetch.max.wait.ms greater than the default setting for SizeBucketMetrics, otherwise the P999 metrics do not reflect the broker performance correctly because P999 could be just maxWait in the condition that there isn't sufficient data to immediately satisfy the requirement given by fetch.min.bytes for some of the time and maxWait is set to a large number (e.g., 30 seconds). EXIT_CRITERIA = N/A --- .../org/apache/kafka/clients/consumer/ConsumerConfig.java | 4 +++- core/src/main/scala/kafka/network/RequestChannel.scala | 8 +++++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index f5e4aefef33f9..d7e77d1c8bf74 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -365,6 +365,8 @@ public class ConsumerConfig extends AbstractConfig { public static final String LI_CLIENT_CLUSTER_METADATA_EXPIRE_TIME_MS_CONFIG = CommonClientConfigs.LI_CLIENT_CLUSTER_METADATA_EXPIRE_TIME_MS_CONFIG; + public static final int DEFAULT_FETCH_MAX_WAIT_MS = 500; + static { CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, @@ -471,7 +473,7 @@ public class ConsumerConfig extends AbstractConfig { FETCH_MAX_BYTES_DOC) .define(FETCH_MAX_WAIT_MS_CONFIG, Type.INT, - 500, + DEFAULT_FETCH_MAX_WAIT_MS, atLeast(0), Importance.LOW, FETCH_MAX_WAIT_MS_DOC) diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 85823d88c8580..e88395b0fd359 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -28,6 +28,7 @@ import kafka.network import kafka.server.{BrokerMetadataStats, KafkaConfig, Observer} import kafka.utils.{Logging, NotNothing, Pool} import kafka.utils.Implicits._ +import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.memory.MemoryPool import org.apache.kafka.common.message.ApiMessageType.ListenerType @@ -275,7 +276,12 @@ object RequestChannel extends Logging { None else { val isFromFollower = body[FetchRequest].isFromFollower - if (isFromFollower) None + val maxWait = body[FetchRequest].maxWait() + // exclude the fetch requests that has fetch.max.wait.ms greater than the default setting for SizeBucketMetric. + // Otherwise the P999 metrics do not reflect the broker performance correctly because P999 could be + // just maxWait in the condition that there isn't sufficient data to immediately satisfy the requirement + // given by fetch.min.bytes for some of the time and maxWait is set to a large number (e.g., 30 seconds) + if (isFromFollower || maxWait > ConsumerConfig.DEFAULT_FETCH_MAX_WAIT_MS) None else Some(metrics.getRequestSizeBucketMetricName(metrics.consumerFetchRequestSizeMetricNameMap, responseBytes)) } }