Skip to content

Commit

Permalink
Add RequestMetrics for Produce/ConsumerFetch requests in different si…
Browse files Browse the repository at this point in the history
…ze buckets and acks (#404)

Add RequestMetrics for Produce/ConsumerFetch requests in different size buckets and acks (#404)

TICKET = LIKAFKA-47556 Establish Kafka Server SLOs
LI_DESCRIPTION =
This PR is to add separate RequestMetrics for produce requests in different request size buckets and of different acks and add separate RequestMetrics for consumer fetch requests in different response size buckets. With this, we could provide latency SLO for requests in each buckets. The size buckets are configurable.

EXIT_CRITERIA = N/A
  • Loading branch information
hshi2022 authored Oct 25, 2022
1 parent d58aa46 commit b17e0cd
Show file tree
Hide file tree
Showing 9 changed files with 296 additions and 21 deletions.
102 changes: 95 additions & 7 deletions core/src/main/scala/kafka/network/RequestChannel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import com.typesafe.scalalogging.Logger
import com.yammer.metrics.core.{Histogram, Meter}
import kafka.metrics.KafkaMetricsGroup
import kafka.network
import kafka.server.{BrokerMetadataStats, KafkaConfig, Observer}
import kafka.server.{BrokerMetadataStats, Defaults, KafkaConfig, Observer}
import kafka.utils.{Logging, NotNothing, Pool}
import kafka.utils.Implicits._
import org.apache.kafka.common.config.ConfigResource
Expand All @@ -38,6 +38,7 @@ import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.{Sanitizer, Time}

import java.util
import scala.annotation.nowarn
import scala.collection.mutable
import scala.jdk.CollectionConverters._
Expand All @@ -59,24 +60,86 @@ object RequestChannel extends Logging {
val sanitizedUser: String = Sanitizer.sanitize(principal.getName)
}

class Metrics(enabledApis: Iterable[ApiKeys]) {
def this(scope: ListenerType) = {
this(ApiKeys.apisForListener(scope).asScala)
class Metrics(enabledApis: Iterable[ApiKeys], config: KafkaConfig) {
def this(scope: ListenerType, config: KafkaConfig) = {
this(ApiKeys.apisForListener(scope).asScala, config)
}

private val metricsMap = mutable.Map[String, RequestMetrics]()

// map[acks, map[requestSize, metricName]]
// this is used to create metrics for produce requests of different size and acks
val produceRequestAcksSizeMetricNameMap = getProduceRequestAcksSizeMetricNameMap()
// map[responseSize, metricName]
// this is used to create metrics for fetch requests of different response size
val consumerFetchRequestSizeMetricNameMap = getConsumerFetchRequestAcksSizeMetricNameMap

(enabledApis.map(_.name) ++
Seq(RequestMetrics.MetadataAllTopics) ++
Seq(RequestMetrics.consumerFetchMetricName, RequestMetrics.followFetchMetricName)).foreach { name =>
metricsMap.put(name, new RequestMetrics(name))
Seq(RequestMetrics.consumerFetchMetricName, RequestMetrics.followFetchMetricName) ++
getConsumerFetchRequestSizeMetricNames ++
getProduceRequestAcksSizeMetricNames)
.foreach { name => metricsMap.put(name, new RequestMetrics(name))
}

def apply(metricName: String): RequestMetrics = metricsMap(metricName)

def close(): Unit = {
metricsMap.values.foreach(_.removeMetrics())
}

// generate map[request/responseSize, metricName] for requests of different size
def getRequestSizeMetricNameMap(requestType: String):util.TreeMap[Int, String] = {
val buckets = config.requestMetricsSizeBuckets
// get size and corresponding term to be used in metric name
// [0,1,10,50,100] => requestSizeBuckets:Seq((0, "0To1Mb"), (1, "1To10Mb"), (10, "10To50Mb"), (50, "50To100Mb"),
// (100, "100MbGreater"))
val requestSizeBuckets = for (i <- 0 until buckets.length) yield {
val size = buckets(i)
if (i == buckets.length - 1) (size, s"${size}MbGreater")
else (size, s"${size}To${buckets(i + 1)}Mb")
}
val treeMap = new util.TreeMap[Int, String]
requestSizeBuckets.map(bucket => (bucket._1, s"${requestType}${bucket._2}")).toMap
.foreach{case (size, bucket) => treeMap.put(size, bucket)}
treeMap
}

// generate map[acks, map[requestSize, metricName]] for produce requests of different request size and acks
private def getProduceRequestAcksSizeMetricNameMap():Map[Int, util.TreeMap[Int, String]] = {
val produceRequestAcks = Seq((0, "0"), (1, "1"), (-1, "All"))
val requestSizeMetricNameMap = getRequestSizeMetricNameMap(ApiKeys.PRODUCE.name)

val ackSizeMetricNames = for(ack <- produceRequestAcks) yield {
val treeMap = new util.TreeMap[Int, String]
requestSizeMetricNameMap.asScala.map({case(size, name) => treeMap.put(size, s"${name}Acks${ack._2}")})
(ack._1, treeMap)
}
ackSizeMetricNames.toMap
}

// generate map[responseSize, metricName] for consumerFetch requests of different request size and acks
private def getConsumerFetchRequestAcksSizeMetricNameMap():util.TreeMap[Int, String] = {
getRequestSizeMetricNameMap(RequestMetrics.consumerFetchMetricName)
}

// get all the metric names for produce requests of different acks and size
def getProduceRequestAcksSizeMetricNames : Seq[String] = {
produceRequestAcksSizeMetricNameMap.values.toSeq.map(a => a.values.asScala.toSeq).flatten
}

// get all the metric names for fetch requests of different size
def getConsumerFetchRequestSizeMetricNames : Seq[String] = {
consumerFetchRequestSizeMetricNameMap.values.asScala.toSeq
}

// get the metric name for a given request/response size
// the bucket is [left, right)
def getRequestSizeBucketMetricName(sizeMetricNameMap: util.TreeMap[Int, String], sizeBytes: Long): String = {
val sizeMb = sizeBytes / 1024 / 1024
if(sizeMb < sizeMetricNameMap.firstKey()) sizeMetricNameMap.firstEntry().getValue
else sizeMetricNameMap.floorEntry(sizeMb.toInt).getValue
}
}

class Request(val processor: Int,
Expand Down Expand Up @@ -207,6 +270,29 @@ object RequestChannel extends Logging {
math.max(apiLocalCompleteTimeNanos - requestDequeueTimeNanos, 0L)
}

def getConsumerFetchSizeBucketMetricName: Option[String] = {
if (header.apiKey != ApiKeys.FETCH)
None
else {
val isFromFollower = body[FetchRequest].isFromFollower
if (isFromFollower) None
else Some(metrics.getRequestSizeBucketMetricName(metrics.consumerFetchRequestSizeMetricNameMap, responseBytes))
}
}

def getProduceAckSizeBucketMetricName: Option[String] = {
if (header.apiKey != ApiKeys.PRODUCE)
None
else {
var acks = body[ProduceRequest].acks()
if(!metrics.produceRequestAcksSizeMetricNameMap.contains(acks)) {
error(s"metrics.produceRequestAcksSizeMetricNameMap does not contain key acks '${acks}', use -1 instead.")
acks = -1
}
Some(metrics.getRequestSizeBucketMetricName(metrics.produceRequestAcksSizeMetricNameMap(acks), sizeOfBodyInBytes))
}
}

def updateRequestMetrics(networkThreadTimeNanos: Long, response: Response): Unit = {
val endTimeNanos = Time.SYSTEM.nanoseconds

Expand Down Expand Up @@ -240,7 +326,9 @@ object RequestChannel extends Logging {
if (header.apiKey() == ApiKeys.METADATA && body[MetadataRequest].isAllTopics) {
Seq(RequestMetrics.MetadataAllTopics)
} else Seq.empty
val metricNames = (fetchMetricNames ++ metadataMetricNames) :+ header.apiKey.name

val metricNames = (fetchMetricNames ++ metadataMetricNames ++ getConsumerFetchSizeBucketMetricName.toSeq ++
getProduceAckSizeBucketMetricName.toSeq) :+ header.apiKey.name
metricNames.foreach { metricName =>
val m = metrics(metricName)
m.requestRate(header.apiVersion).mark()
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/network/SocketServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,12 @@ class SocketServer(val config: KafkaConfig,
// data-plane
private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]()
private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, Acceptor]()
val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneMetricPrefix, time, observer, apiVersionManager.newRequestMetrics)
val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneMetricPrefix, time, observer, apiVersionManager.newRequestMetrics(config))
// control-plane
private var controlPlaneProcessorOpt : Option[Processor] = None
private[network] var controlPlaneAcceptorOpt : Option[Acceptor] = None
val controlPlaneRequestChannelOpt: Option[RequestChannel] = config.controlPlaneListenerName.map(_ =>
new RequestChannel(20, ControlPlaneMetricPrefix, time, observer, apiVersionManager.newRequestMetrics))
new RequestChannel(20, ControlPlaneMetricPrefix, time, observer, apiVersionManager.newRequestMetrics(config)))

private var nextProcessorId = 0
val connectionQuotas = new ConnectionQuotas(config, time, metrics)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/ApiVersionManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ trait ApiVersionManager {
def enabledApis: collection.Set[ApiKeys]
def apiVersionResponse(throttleTimeMs: Int): ApiVersionsResponse
def isApiEnabled(apiKey: ApiKeys): Boolean = enabledApis.contains(apiKey)
def newRequestMetrics: RequestChannel.Metrics = new network.RequestChannel.Metrics(enabledApis)
def newRequestMetrics(config: KafkaConfig): RequestChannel.Metrics = new network.RequestChannel.Metrics(enabledApis, config)
}

object ApiVersionManager {
Expand Down
34 changes: 33 additions & 1 deletion core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import kafka.utils.Implicits._
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.common.Reconfigurable
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, ConfigResource, SaslConfigs, SecurityConfig, SslClientAuth, SslConfigs, TopicConfig}
import org.apache.kafka.common.config.ConfigDef.{ConfigKey, ValidList}
import org.apache.kafka.common.config.ConfigDef.{ConfigKey, ValidList, Validator}
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.metrics.Sensor
Expand Down Expand Up @@ -265,6 +265,7 @@ object Defaults {
val MetricReporterClasses = ""
val MetricRecordingLevel = Sensor.RecordingLevel.INFO.toString()
val MetricReplaceOnDuplicate = false
val RequestMetricsSizeBuckets = "0,1,10,50,100"


/** ********* Kafka Yammer Metrics Reporter Configuration ***********/
Expand Down Expand Up @@ -619,6 +620,7 @@ object KafkaConfig {
val MetricReporterClassesProp: String = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG
val MetricRecordingLevelProp: String = CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG
val MetricReplaceOnDuplicateProp: String = CommonClientConfigs.METRICS_REPLACE_ON_DUPLICATE_CONFIG
val RequestMetricsSizeBucketsProp: String = "request.metrics.size.buckets"

/** ********* Kafka Yammer Metrics Reporters Configuration ***********/
val KafkaMetricsReporterClassesProp = "kafka.metrics.reporters"
Expand Down Expand Up @@ -1064,6 +1066,7 @@ object KafkaConfig {
val MetricReporterClassesDoc = CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC
val MetricRecordingLevelDoc = CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC
val MetricReplaceOnDuplicateDoc = CommonClientConfigs.METRICS_REPLACE_ON_DUPLICATE_DOC
val RequestMetricsSizeBucketsDoc = "The size buckets used to group requests to provide RequestMetrics for each group"


/** ********* Kafka Yammer Metrics Reporter Configuration ***********/
Expand Down Expand Up @@ -1389,6 +1392,7 @@ object KafkaConfig {
.define(MetricReporterClassesProp, LIST, Defaults.MetricReporterClasses, LOW, MetricReporterClassesDoc)
.define(MetricRecordingLevelProp, STRING, Defaults.MetricRecordingLevel, LOW, MetricRecordingLevelDoc)
.define(MetricReplaceOnDuplicateProp, BOOLEAN, Defaults.MetricReplaceOnDuplicate, LOW, MetricReplaceOnDuplicateDoc)
.define(RequestMetricsSizeBucketsProp, STRING, Defaults.RequestMetricsSizeBuckets, RequestMetricsSizeBucketsValidator, LOW, RequestMetricsSizeBucketsDoc)

/** ********* Kafka Yammer Metrics Reporter Configuration for docs ***********/
.define(KafkaMetricsReporterClassesProp, LIST, Defaults.KafkaMetricReporterClasses, LOW, KafkaMetricsReporterClassesDoc)
Expand Down Expand Up @@ -1480,6 +1484,29 @@ object KafkaConfig {
.define(RaftConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG, INT, Defaults.QuorumRetryBackoffMs, null, LOW, RaftConfig.QUORUM_RETRY_BACKOFF_MS_DOC)
}

object RequestMetricsSizeBucketsValidator extends Validator {
override def ensureValid(name: String, value: Any): Unit = {
getRequestMetricsSizeBuckets(value.toString)
}

override def toString: String = "integer list, separated by ',', at least two integers"
}
// convert requestMetricsSizeBucketsConfig from string to int array. e.g., "0,1,10,50,100" => [0,1,10,50,100]
private def getRequestMetricsSizeBuckets(requestMetricsSizeBuckets: String): Array[Int] = {
val bucketsString = requestMetricsSizeBuckets.replaceAll("\\s", "").split(',')
if(bucketsString.length < 2) {
throw new ConfigException(RequestMetricsSizeBucketsProp, requestMetricsSizeBuckets,
s"requestMetricsSizeBuckets has fewer than 2 buckets: '${requestMetricsSizeBuckets}'.")
}
try {
bucketsString.map(_.toInt)
} catch {
case _: Exception =>
throw new ConfigException(RequestMetricsSizeBucketsProp, requestMetricsSizeBuckets,
s"Failed to parse requestMetricsSizeBuckets: '${requestMetricsSizeBuckets}'. ")
}
}

/** ********* Remote Log Management Configuration *********/
RemoteLogManagerConfig.CONFIG_DEF.configKeys().values().forEach(key => configDef.define(key))

Expand Down Expand Up @@ -1912,6 +1939,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
val metricSampleWindowMs = getLong(KafkaConfig.MetricSampleWindowMsProp)
val metricRecordingLevel = getString(KafkaConfig.MetricRecordingLevelProp)
val metricReplaceOnDuplicate = getBoolean(KafkaConfig.MetricReplaceOnDuplicateProp)
def requestMetricsSizeBuckets = getRequestMetricsSizeBuckets()

/** ********* SSL/SASL Configuration **************/
// Security configs may be overridden for listeners, so it is not safe to use the base values
Expand Down Expand Up @@ -2046,6 +2074,10 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
listeners.filterNot(l => controllerListenerNames.contains(l.listenerName.value()))
}

def getRequestMetricsSizeBuckets(): Array[Int] = {
KafkaConfig.getRequestMetricsSizeBuckets(getString(KafkaConfig.RequestMetricsSizeBucketsProp))
}

private def getInterBrokerListenerNameAndSecurityProtocol: (ListenerName, SecurityProtocol) = {
Option(getString(KafkaConfig.InterBrokerListenerNameProp)) match {
case Some(_) if originals.containsKey(KafkaConfig.InterBrokerSecurityProtocolProp) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ abstract class QuotaTestClients(topic: String,
def yammerMetricValue(name: String): Double = {
val allMetrics = KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
val (_, metric) = allMetrics.find { case (metricName, _) =>
metricName.getMBeanName.startsWith(name)
metricName.getMBeanName == name
}.getOrElse(fail(s"Unable to find broker metric $name: allMetrics: ${allMetrics.keySet.map(_.getMBeanName)}"))
metric match {
case m: Meter => m.count.toDouble
Expand Down
14 changes: 12 additions & 2 deletions core/src/test/scala/integration/kafka/api/MetricsTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
val producer = createProducer()
sendRecords(producer, numRecords, recordSize, tp)

val requestRateWithoutVersionMeter = yammerMeterWithPrefix("kafka.network:type=RequestMetrics,name=RequestsPerSecAcrossVersions,request=Produce")
val requestRateWithoutVersionMeter = yammerMeterWithName("kafka.network:type=RequestMetrics,name=RequestsPerSecAcrossVersions,request=Produce")
assertTrue(requestRateWithoutVersionMeter.count() > 0, "The Produce RequestsPerSecAcrossVersions metric is not recorded")
}

Expand Down Expand Up @@ -305,14 +305,24 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {

private def yammerMeterWithPrefix(prefix: String): Meter = {
val allMetrics = KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
val (_, metric) = allMetrics.find { case (n, _) => n.getMBeanName.startsWith(prefix) }
val (_, metric) = allMetrics.find { case (n, _) => n.getMBeanName.startsWith(prefix) }
.getOrElse(fail(s"Unable to find broker metric with prefix $prefix: allMetrics: ${allMetrics.keySet.map(_.getMBeanName)}"))
metric match {
case m: Meter => m
case m => fail(s"Unexpected broker metric of class ${m.getClass}")
}
}

private def yammerMeterWithName(name: String): Meter = {
val allMetrics = KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
val (_, metric) = allMetrics.find { case (n, _) => n.getMBeanName == name}
.getOrElse(fail(s"Unable to find broker metric with name $name: allMetrics: ${allMetrics.keySet.map(_.getMBeanName)}"))
metric match {
case m: Meter => m
case m => fail(s"Unexpected broker metric of class ${m.getClass}")
}
}

private def verifyNoRequestMetrics(errorMessage: String): Unit = {
val metrics = KafkaYammerMetrics.defaultRegistry.allMetrics.asScala.filter { case (n, _) =>
n.getMBeanName.startsWith("kafka.network:type=RequestMetrics")
Expand Down
Loading

0 comments on commit b17e0cd

Please sign in to comment.