Skip to content

Commit

Permalink
Exclude the fetch requests with large fetch.max.wait.ms in SizeBucket…
Browse files Browse the repository at this point in the history
…Metrics #418

TICKET = LIKAFKA-47556 Establish Kafka Server SLOs
LI_DESCRIPTION =
This PR is to exclude the fetch requests that has fetch.max.wait.ms greater than the default setting for SizeBucketMetrics, otherwise the P999 metrics do not reflect the broker performance correctly because P999 could be just maxWait in the condition that there isn't sufficient data to immediately satisfy the requirement given by fetch.min.bytes for some of the time and maxWait is set to a large number (e.g., 30 seconds).

EXIT_CRITERIA = N/A
  • Loading branch information
hshi2022 authored Nov 22, 2022
1 parent 494cb74 commit a95d648
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,8 @@ public class ConsumerConfig extends AbstractConfig {

public static final String LI_CLIENT_CLUSTER_METADATA_EXPIRE_TIME_MS_CONFIG = CommonClientConfigs.LI_CLIENT_CLUSTER_METADATA_EXPIRE_TIME_MS_CONFIG;

public static final int DEFAULT_FETCH_MAX_WAIT_MS = 500;

static {
CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
Type.LIST,
Expand Down Expand Up @@ -471,7 +473,7 @@ public class ConsumerConfig extends AbstractConfig {
FETCH_MAX_BYTES_DOC)
.define(FETCH_MAX_WAIT_MS_CONFIG,
Type.INT,
500,
DEFAULT_FETCH_MAX_WAIT_MS,
atLeast(0),
Importance.LOW,
FETCH_MAX_WAIT_MS_DOC)
Expand Down
8 changes: 7 additions & 1 deletion core/src/main/scala/kafka/network/RequestChannel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import kafka.network
import kafka.server.{BrokerMetadataStats, KafkaConfig, Observer}
import kafka.utils.{Logging, NotNothing, Pool}
import kafka.utils.Implicits._
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.memory.MemoryPool
import org.apache.kafka.common.message.ApiMessageType.ListenerType
Expand Down Expand Up @@ -275,7 +276,12 @@ object RequestChannel extends Logging {
None
else {
val isFromFollower = body[FetchRequest].isFromFollower
if (isFromFollower) None
val maxWait = body[FetchRequest].maxWait()
// exclude the fetch requests that has fetch.max.wait.ms greater than the default setting for SizeBucketMetric.
// Otherwise the P999 metrics do not reflect the broker performance correctly because P999 could be
// just maxWait in the condition that there isn't sufficient data to immediately satisfy the requirement
// given by fetch.min.bytes for some of the time and maxWait is set to a large number (e.g., 30 seconds)
if (isFromFollower || maxWait > ConsumerConfig.DEFAULT_FETCH_MAX_WAIT_MS) None
else Some(metrics.getRequestSizeBucketMetricName(metrics.consumerFetchRequestSizeMetricNameMap, responseBytes))
}
}
Expand Down

0 comments on commit a95d648

Please sign in to comment.