diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index f5e4aefef33f9..d7e77d1c8bf74 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -365,6 +365,8 @@ public class ConsumerConfig extends AbstractConfig { public static final String LI_CLIENT_CLUSTER_METADATA_EXPIRE_TIME_MS_CONFIG = CommonClientConfigs.LI_CLIENT_CLUSTER_METADATA_EXPIRE_TIME_MS_CONFIG; + public static final int DEFAULT_FETCH_MAX_WAIT_MS = 500; + static { CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, @@ -471,7 +473,7 @@ public class ConsumerConfig extends AbstractConfig { FETCH_MAX_BYTES_DOC) .define(FETCH_MAX_WAIT_MS_CONFIG, Type.INT, - 500, + DEFAULT_FETCH_MAX_WAIT_MS, atLeast(0), Importance.LOW, FETCH_MAX_WAIT_MS_DOC) diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 85823d88c8580..e88395b0fd359 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -28,6 +28,7 @@ import kafka.network import kafka.server.{BrokerMetadataStats, KafkaConfig, Observer} import kafka.utils.{Logging, NotNothing, Pool} import kafka.utils.Implicits._ +import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.memory.MemoryPool import org.apache.kafka.common.message.ApiMessageType.ListenerType @@ -275,7 +276,12 @@ object RequestChannel extends Logging { None else { val isFromFollower = body[FetchRequest].isFromFollower - if (isFromFollower) None + val maxWait = body[FetchRequest].maxWait() + // exclude the fetch requests that has fetch.max.wait.ms greater than the default setting for SizeBucketMetric. + // Otherwise the P999 metrics do not reflect the broker performance correctly because P999 could be + // just maxWait in the condition that there isn't sufficient data to immediately satisfy the requirement + // given by fetch.min.bytes for some of the time and maxWait is set to a large number (e.g., 30 seconds) + if (isFromFollower || maxWait > ConsumerConfig.DEFAULT_FETCH_MAX_WAIT_MS) None else Some(metrics.getRequestSizeBucketMetricName(metrics.consumerFetchRequestSizeMetricNameMap, responseBytes)) } }