From b17e0cdedc6cff7c91bb4ec80279bd06d84c8237 Mon Sep 17 00:00:00 2001 From: Huilin Shi Date: Tue, 25 Oct 2022 10:57:16 -0700 Subject: [PATCH] Add RequestMetrics for Produce/ConsumerFetch requests in different size buckets and acks (#404) Add RequestMetrics for Produce/ConsumerFetch requests in different size buckets and acks (#404) TICKET = LIKAFKA-47556 Establish Kafka Server SLOs LI_DESCRIPTION = This PR is to add separate RequestMetrics for produce requests in different request size buckets and of different acks and add separate RequestMetrics for consumer fetch requests in different response size buckets. With this, we could provide latency SLO for requests in each buckets. The size buckets are configurable. EXIT_CRITERIA = N/A --- .../scala/kafka/network/RequestChannel.scala | 102 +++++++++++- .../scala/kafka/network/SocketServer.scala | 4 +- .../kafka/server/ApiVersionManager.scala | 2 +- .../main/scala/kafka/server/KafkaConfig.scala | 34 +++- .../integration/kafka/api/BaseQuotaTest.scala | 2 +- .../integration/kafka/api/MetricsTest.scala | 14 +- .../kafka/network/RequestChannelTest.scala | 149 +++++++++++++++++- .../kafka/server/ForwardingManagerTest.scala | 9 +- .../unit/kafka/server/KafkaConfigTest.scala | 1 + 9 files changed, 296 insertions(+), 21 deletions(-) diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 7349513675222..bfa412d9ff86d 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -25,7 +25,7 @@ import com.typesafe.scalalogging.Logger import com.yammer.metrics.core.{Histogram, Meter} import kafka.metrics.KafkaMetricsGroup import kafka.network -import kafka.server.{BrokerMetadataStats, KafkaConfig, Observer} +import kafka.server.{BrokerMetadataStats, Defaults, KafkaConfig, Observer} import kafka.utils.{Logging, NotNothing, Pool} import kafka.utils.Implicits._ import org.apache.kafka.common.config.ConfigResource @@ -38,6 +38,7 @@ import org.apache.kafka.common.requests._ import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.utils.{Sanitizer, Time} +import java.util import scala.annotation.nowarn import scala.collection.mutable import scala.jdk.CollectionConverters._ @@ -59,17 +60,26 @@ object RequestChannel extends Logging { val sanitizedUser: String = Sanitizer.sanitize(principal.getName) } - class Metrics(enabledApis: Iterable[ApiKeys]) { - def this(scope: ListenerType) = { - this(ApiKeys.apisForListener(scope).asScala) + class Metrics(enabledApis: Iterable[ApiKeys], config: KafkaConfig) { + def this(scope: ListenerType, config: KafkaConfig) = { + this(ApiKeys.apisForListener(scope).asScala, config) } private val metricsMap = mutable.Map[String, RequestMetrics]() + // map[acks, map[requestSize, metricName]] + // this is used to create metrics for produce requests of different size and acks + val produceRequestAcksSizeMetricNameMap = getProduceRequestAcksSizeMetricNameMap() + // map[responseSize, metricName] + // this is used to create metrics for fetch requests of different response size + val consumerFetchRequestSizeMetricNameMap = getConsumerFetchRequestAcksSizeMetricNameMap + (enabledApis.map(_.name) ++ Seq(RequestMetrics.MetadataAllTopics) ++ - Seq(RequestMetrics.consumerFetchMetricName, RequestMetrics.followFetchMetricName)).foreach { name => - metricsMap.put(name, new RequestMetrics(name)) + Seq(RequestMetrics.consumerFetchMetricName, RequestMetrics.followFetchMetricName) ++ + getConsumerFetchRequestSizeMetricNames ++ + getProduceRequestAcksSizeMetricNames) + .foreach { name => metricsMap.put(name, new RequestMetrics(name)) } def apply(metricName: String): RequestMetrics = metricsMap(metricName) @@ -77,6 +87,59 @@ object RequestChannel extends Logging { def close(): Unit = { metricsMap.values.foreach(_.removeMetrics()) } + + // generate map[request/responseSize, metricName] for requests of different size + def getRequestSizeMetricNameMap(requestType: String):util.TreeMap[Int, String] = { + val buckets = config.requestMetricsSizeBuckets + // get size and corresponding term to be used in metric name + // [0,1,10,50,100] => requestSizeBuckets:Seq((0, "0To1Mb"), (1, "1To10Mb"), (10, "10To50Mb"), (50, "50To100Mb"), + // (100, "100MbGreater")) + val requestSizeBuckets = for (i <- 0 until buckets.length) yield { + val size = buckets(i) + if (i == buckets.length - 1) (size, s"${size}MbGreater") + else (size, s"${size}To${buckets(i + 1)}Mb") + } + val treeMap = new util.TreeMap[Int, String] + requestSizeBuckets.map(bucket => (bucket._1, s"${requestType}${bucket._2}")).toMap + .foreach{case (size, bucket) => treeMap.put(size, bucket)} + treeMap + } + + // generate map[acks, map[requestSize, metricName]] for produce requests of different request size and acks + private def getProduceRequestAcksSizeMetricNameMap():Map[Int, util.TreeMap[Int, String]] = { + val produceRequestAcks = Seq((0, "0"), (1, "1"), (-1, "All")) + val requestSizeMetricNameMap = getRequestSizeMetricNameMap(ApiKeys.PRODUCE.name) + + val ackSizeMetricNames = for(ack <- produceRequestAcks) yield { + val treeMap = new util.TreeMap[Int, String] + requestSizeMetricNameMap.asScala.map({case(size, name) => treeMap.put(size, s"${name}Acks${ack._2}")}) + (ack._1, treeMap) + } + ackSizeMetricNames.toMap + } + + // generate map[responseSize, metricName] for consumerFetch requests of different request size and acks + private def getConsumerFetchRequestAcksSizeMetricNameMap():util.TreeMap[Int, String] = { + getRequestSizeMetricNameMap(RequestMetrics.consumerFetchMetricName) + } + + // get all the metric names for produce requests of different acks and size + def getProduceRequestAcksSizeMetricNames : Seq[String] = { + produceRequestAcksSizeMetricNameMap.values.toSeq.map(a => a.values.asScala.toSeq).flatten + } + + // get all the metric names for fetch requests of different size + def getConsumerFetchRequestSizeMetricNames : Seq[String] = { + consumerFetchRequestSizeMetricNameMap.values.asScala.toSeq + } + + // get the metric name for a given request/response size + // the bucket is [left, right) + def getRequestSizeBucketMetricName(sizeMetricNameMap: util.TreeMap[Int, String], sizeBytes: Long): String = { + val sizeMb = sizeBytes / 1024 / 1024 + if(sizeMb < sizeMetricNameMap.firstKey()) sizeMetricNameMap.firstEntry().getValue + else sizeMetricNameMap.floorEntry(sizeMb.toInt).getValue + } } class Request(val processor: Int, @@ -207,6 +270,29 @@ object RequestChannel extends Logging { math.max(apiLocalCompleteTimeNanos - requestDequeueTimeNanos, 0L) } + def getConsumerFetchSizeBucketMetricName: Option[String] = { + if (header.apiKey != ApiKeys.FETCH) + None + else { + val isFromFollower = body[FetchRequest].isFromFollower + if (isFromFollower) None + else Some(metrics.getRequestSizeBucketMetricName(metrics.consumerFetchRequestSizeMetricNameMap, responseBytes)) + } + } + + def getProduceAckSizeBucketMetricName: Option[String] = { + if (header.apiKey != ApiKeys.PRODUCE) + None + else { + var acks = body[ProduceRequest].acks() + if(!metrics.produceRequestAcksSizeMetricNameMap.contains(acks)) { + error(s"metrics.produceRequestAcksSizeMetricNameMap does not contain key acks '${acks}', use -1 instead.") + acks = -1 + } + Some(metrics.getRequestSizeBucketMetricName(metrics.produceRequestAcksSizeMetricNameMap(acks), sizeOfBodyInBytes)) + } + } + def updateRequestMetrics(networkThreadTimeNanos: Long, response: Response): Unit = { val endTimeNanos = Time.SYSTEM.nanoseconds @@ -240,7 +326,9 @@ object RequestChannel extends Logging { if (header.apiKey() == ApiKeys.METADATA && body[MetadataRequest].isAllTopics) { Seq(RequestMetrics.MetadataAllTopics) } else Seq.empty - val metricNames = (fetchMetricNames ++ metadataMetricNames) :+ header.apiKey.name + + val metricNames = (fetchMetricNames ++ metadataMetricNames ++ getConsumerFetchSizeBucketMetricName.toSeq ++ + getProduceAckSizeBucketMetricName.toSeq) :+ header.apiKey.name metricNames.foreach { metricName => val m = metrics(metricName) m.requestRate(header.apiVersion).mark() diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index c8133d613b731..039f2fd124844 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -106,12 +106,12 @@ class SocketServer(val config: KafkaConfig, // data-plane private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]() private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, Acceptor]() - val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneMetricPrefix, time, observer, apiVersionManager.newRequestMetrics) + val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneMetricPrefix, time, observer, apiVersionManager.newRequestMetrics(config)) // control-plane private var controlPlaneProcessorOpt : Option[Processor] = None private[network] var controlPlaneAcceptorOpt : Option[Acceptor] = None val controlPlaneRequestChannelOpt: Option[RequestChannel] = config.controlPlaneListenerName.map(_ => - new RequestChannel(20, ControlPlaneMetricPrefix, time, observer, apiVersionManager.newRequestMetrics)) + new RequestChannel(20, ControlPlaneMetricPrefix, time, observer, apiVersionManager.newRequestMetrics(config))) private var nextProcessorId = 0 val connectionQuotas = new ConnectionQuotas(config, time, metrics) diff --git a/core/src/main/scala/kafka/server/ApiVersionManager.scala b/core/src/main/scala/kafka/server/ApiVersionManager.scala index aa29ee02fa961..c3dc137213777 100644 --- a/core/src/main/scala/kafka/server/ApiVersionManager.scala +++ b/core/src/main/scala/kafka/server/ApiVersionManager.scala @@ -32,7 +32,7 @@ trait ApiVersionManager { def enabledApis: collection.Set[ApiKeys] def apiVersionResponse(throttleTimeMs: Int): ApiVersionsResponse def isApiEnabled(apiKey: ApiKeys): Boolean = enabledApis.contains(apiKey) - def newRequestMetrics: RequestChannel.Metrics = new network.RequestChannel.Metrics(enabledApis) + def newRequestMetrics(config: KafkaConfig): RequestChannel.Metrics = new network.RequestChannel.Metrics(enabledApis, config) } object ApiVersionManager { diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 3601a362d3c14..bec3b0b3a6cbf 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -34,7 +34,7 @@ import kafka.utils.Implicits._ import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.common.Reconfigurable import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, ConfigResource, SaslConfigs, SecurityConfig, SslClientAuth, SslConfigs, TopicConfig} -import org.apache.kafka.common.config.ConfigDef.{ConfigKey, ValidList} +import org.apache.kafka.common.config.ConfigDef.{ConfigKey, ValidList, Validator} import org.apache.kafka.common.config.internals.BrokerSecurityConfigs import org.apache.kafka.common.config.types.Password import org.apache.kafka.common.metrics.Sensor @@ -265,6 +265,7 @@ object Defaults { val MetricReporterClasses = "" val MetricRecordingLevel = Sensor.RecordingLevel.INFO.toString() val MetricReplaceOnDuplicate = false + val RequestMetricsSizeBuckets = "0,1,10,50,100" /** ********* Kafka Yammer Metrics Reporter Configuration ***********/ @@ -619,6 +620,7 @@ object KafkaConfig { val MetricReporterClassesProp: String = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG val MetricRecordingLevelProp: String = CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG val MetricReplaceOnDuplicateProp: String = CommonClientConfigs.METRICS_REPLACE_ON_DUPLICATE_CONFIG + val RequestMetricsSizeBucketsProp: String = "request.metrics.size.buckets" /** ********* Kafka Yammer Metrics Reporters Configuration ***********/ val KafkaMetricsReporterClassesProp = "kafka.metrics.reporters" @@ -1064,6 +1066,7 @@ object KafkaConfig { val MetricReporterClassesDoc = CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC val MetricRecordingLevelDoc = CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC val MetricReplaceOnDuplicateDoc = CommonClientConfigs.METRICS_REPLACE_ON_DUPLICATE_DOC + val RequestMetricsSizeBucketsDoc = "The size buckets used to group requests to provide RequestMetrics for each group" /** ********* Kafka Yammer Metrics Reporter Configuration ***********/ @@ -1389,6 +1392,7 @@ object KafkaConfig { .define(MetricReporterClassesProp, LIST, Defaults.MetricReporterClasses, LOW, MetricReporterClassesDoc) .define(MetricRecordingLevelProp, STRING, Defaults.MetricRecordingLevel, LOW, MetricRecordingLevelDoc) .define(MetricReplaceOnDuplicateProp, BOOLEAN, Defaults.MetricReplaceOnDuplicate, LOW, MetricReplaceOnDuplicateDoc) + .define(RequestMetricsSizeBucketsProp, STRING, Defaults.RequestMetricsSizeBuckets, RequestMetricsSizeBucketsValidator, LOW, RequestMetricsSizeBucketsDoc) /** ********* Kafka Yammer Metrics Reporter Configuration for docs ***********/ .define(KafkaMetricsReporterClassesProp, LIST, Defaults.KafkaMetricReporterClasses, LOW, KafkaMetricsReporterClassesDoc) @@ -1480,6 +1484,29 @@ object KafkaConfig { .define(RaftConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG, INT, Defaults.QuorumRetryBackoffMs, null, LOW, RaftConfig.QUORUM_RETRY_BACKOFF_MS_DOC) } + object RequestMetricsSizeBucketsValidator extends Validator { + override def ensureValid(name: String, value: Any): Unit = { + getRequestMetricsSizeBuckets(value.toString) + } + + override def toString: String = "integer list, separated by ',', at least two integers" + } + // convert requestMetricsSizeBucketsConfig from string to int array. e.g., "0,1,10,50,100" => [0,1,10,50,100] + private def getRequestMetricsSizeBuckets(requestMetricsSizeBuckets: String): Array[Int] = { + val bucketsString = requestMetricsSizeBuckets.replaceAll("\\s", "").split(',') + if(bucketsString.length < 2) { + throw new ConfigException(RequestMetricsSizeBucketsProp, requestMetricsSizeBuckets, + s"requestMetricsSizeBuckets has fewer than 2 buckets: '${requestMetricsSizeBuckets}'.") + } + try { + bucketsString.map(_.toInt) + } catch { + case _: Exception => + throw new ConfigException(RequestMetricsSizeBucketsProp, requestMetricsSizeBuckets, + s"Failed to parse requestMetricsSizeBuckets: '${requestMetricsSizeBuckets}'. ") + } + } + /** ********* Remote Log Management Configuration *********/ RemoteLogManagerConfig.CONFIG_DEF.configKeys().values().forEach(key => configDef.define(key)) @@ -1912,6 +1939,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO val metricSampleWindowMs = getLong(KafkaConfig.MetricSampleWindowMsProp) val metricRecordingLevel = getString(KafkaConfig.MetricRecordingLevelProp) val metricReplaceOnDuplicate = getBoolean(KafkaConfig.MetricReplaceOnDuplicateProp) + def requestMetricsSizeBuckets = getRequestMetricsSizeBuckets() /** ********* SSL/SASL Configuration **************/ // Security configs may be overridden for listeners, so it is not safe to use the base values @@ -2046,6 +2074,10 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO listeners.filterNot(l => controllerListenerNames.contains(l.listenerName.value())) } + def getRequestMetricsSizeBuckets(): Array[Int] = { + KafkaConfig.getRequestMetricsSizeBuckets(getString(KafkaConfig.RequestMetricsSizeBucketsProp)) + } + private def getInterBrokerListenerNameAndSecurityProtocol: (ListenerName, SecurityProtocol) = { Option(getString(KafkaConfig.InterBrokerListenerNameProp)) match { case Some(_) if originals.containsKey(KafkaConfig.InterBrokerSecurityProtocolProp) => diff --git a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala index fd3ca90aea341..8d1f092131355 100644 --- a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala @@ -295,7 +295,7 @@ abstract class QuotaTestClients(topic: String, def yammerMetricValue(name: String): Double = { val allMetrics = KafkaYammerMetrics.defaultRegistry.allMetrics.asScala val (_, metric) = allMetrics.find { case (metricName, _) => - metricName.getMBeanName.startsWith(name) + metricName.getMBeanName == name }.getOrElse(fail(s"Unable to find broker metric $name: allMetrics: ${allMetrics.keySet.map(_.getMBeanName)}")) metric match { case m: Meter => m.count.toDouble diff --git a/core/src/test/scala/integration/kafka/api/MetricsTest.scala b/core/src/test/scala/integration/kafka/api/MetricsTest.scala index 05b518b0602b8..0b291afe5fe6c 100644 --- a/core/src/test/scala/integration/kafka/api/MetricsTest.scala +++ b/core/src/test/scala/integration/kafka/api/MetricsTest.scala @@ -129,7 +129,7 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup { val producer = createProducer() sendRecords(producer, numRecords, recordSize, tp) - val requestRateWithoutVersionMeter = yammerMeterWithPrefix("kafka.network:type=RequestMetrics,name=RequestsPerSecAcrossVersions,request=Produce") + val requestRateWithoutVersionMeter = yammerMeterWithName("kafka.network:type=RequestMetrics,name=RequestsPerSecAcrossVersions,request=Produce") assertTrue(requestRateWithoutVersionMeter.count() > 0, "The Produce RequestsPerSecAcrossVersions metric is not recorded") } @@ -305,7 +305,7 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup { private def yammerMeterWithPrefix(prefix: String): Meter = { val allMetrics = KafkaYammerMetrics.defaultRegistry.allMetrics.asScala - val (_, metric) = allMetrics.find { case (n, _) => n.getMBeanName.startsWith(prefix) } + val (_, metric) = allMetrics.find { case (n, _) => n.getMBeanName.startsWith(prefix) } .getOrElse(fail(s"Unable to find broker metric with prefix $prefix: allMetrics: ${allMetrics.keySet.map(_.getMBeanName)}")) metric match { case m: Meter => m @@ -313,6 +313,16 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup { } } + private def yammerMeterWithName(name: String): Meter = { + val allMetrics = KafkaYammerMetrics.defaultRegistry.allMetrics.asScala + val (_, metric) = allMetrics.find { case (n, _) => n.getMBeanName == name} + .getOrElse(fail(s"Unable to find broker metric with name $name: allMetrics: ${allMetrics.keySet.map(_.getMBeanName)}")) + metric match { + case m: Meter => m + case m => fail(s"Unexpected broker metric of class ${m.getClass}") + } + } + private def verifyNoRequestMetrics(errorMessage: String): Unit = { val metrics = KafkaYammerMetrics.defaultRegistry.allMetrics.asScala.filter { case (n, _) => n.getMBeanName.startsWith("kafka.network:type=RequestMetrics") diff --git a/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala b/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala index 881ea65d3daef..91cf42e028803 100644 --- a/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala +++ b/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala @@ -21,15 +21,18 @@ package kafka.network import java.io.IOException import java.net.InetAddress import java.nio.ByteBuffer -import java.util.Collections +import java.util.{Collections, Optional, Properties} import com.fasterxml.jackson.databind.ObjectMapper import kafka.network +import kafka.network.RequestChannel.{Metrics, Request} +import kafka.server.{Defaults, KafkaConfig} import kafka.utils.TestUtils import org.apache.kafka.clients.admin.AlterConfigOp.OpType +import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.config.types.Password import org.apache.kafka.common.config.{ConfigResource, SaslConfigs, SslConfigs, TopicConfig} import org.apache.kafka.common.memory.MemoryPool -import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData +import org.apache.kafka.common.message.{IncrementalAlterConfigsRequestData, ProduceRequestData} import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData._ import org.apache.kafka.common.network.{ByteBufferSend, ClientInformation, ListenerName} import org.apache.kafka.common.protocol.{ApiKeys, Errors} @@ -255,6 +258,140 @@ class RequestChannelTest { assertTrue(capturedValue.error().equals(Errors.NOT_CONTROLLER)) } + @Test + def testMetricsRequestSizeBucket(): Unit = { + val apis = Seq(ApiKeys.FETCH, ApiKeys.PRODUCE) + var props = new Properties() + props.put(KafkaConfig.ZkConnectProp, "127.0.0.1:2181") + props.put(KafkaConfig.RequestMetricsSizeBucketsProp, "0, 10, 20, 200") + var config = KafkaConfig.fromProps(props) + var metrics = new Metrics(apis, config) + val fetchMetricsNameMap = metrics.consumerFetchRequestSizeMetricNameMap + assertEquals(4, fetchMetricsNameMap.size) + assertTrue(fetchMetricsNameMap.containsKey(0)) + assertTrue(fetchMetricsNameMap.containsKey(10)) + assertTrue(fetchMetricsNameMap.containsKey(20)) + assertTrue(fetchMetricsNameMap.containsKey(200)) + val flattenFetchMetricNames = metrics.getConsumerFetchRequestSizeMetricNames + assertEquals(4, flattenFetchMetricNames.size) + assertEquals("FetchConsumer0To10Mb", fetchMetricsNameMap.get(0)) + assertTrue(flattenFetchMetricNames.contains("FetchConsumer0To10Mb")) + assertEquals("FetchConsumer10To20Mb", fetchMetricsNameMap.get(10)) + assertTrue(flattenFetchMetricNames.contains("FetchConsumer10To20Mb")) + assertEquals("FetchConsumer20To200Mb", fetchMetricsNameMap.get(20)) + assertTrue(flattenFetchMetricNames.contains("FetchConsumer20To200Mb")) + assertEquals("FetchConsumer200MbGreater", fetchMetricsNameMap.get(200)) + assertTrue(flattenFetchMetricNames.contains("FetchConsumer200MbGreater")) + + val produceMetricsNameMaps = metrics.produceRequestAcksSizeMetricNameMap + assertEquals(3, produceMetricsNameMaps.size) + + assertTrue(produceMetricsNameMaps.contains(0)) + assertTrue(produceMetricsNameMaps.contains(1)) + assertTrue(produceMetricsNameMaps.contains(-1)) + val flattenProduceMetricNames = metrics.getProduceRequestAcksSizeMetricNames + assertEquals(12, flattenProduceMetricNames.size) + + for (i <- 0 until 3) { + val ackKey = if (i == 2) -1 else i + val ackKeyString = if(i == 2) "All" else i.toString + val produceMetricsNameMap = produceMetricsNameMaps(ackKey) + assertTrue(produceMetricsNameMap.containsKey(0)) + assertTrue(produceMetricsNameMap.containsKey(10)) + assertTrue(produceMetricsNameMap.containsKey(20)) + assertTrue(produceMetricsNameMap.containsKey(200)) + var metricName = "Produce0To10MbAcks" + ackKeyString + assertEquals(metricName, produceMetricsNameMap.get(0)) + assertTrue(flattenProduceMetricNames.contains(metricName)) + metricName = "Produce10To20MbAcks" + ackKeyString + assertEquals(metricName, produceMetricsNameMap.get(10)) + assertTrue(flattenProduceMetricNames.contains(metricName)) + metricName = "Produce20To200MbAcks" + ackKeyString + assertEquals(metricName, produceMetricsNameMap.get(20)) + assertTrue(flattenProduceMetricNames.contains(metricName)) + metricName = "Produce200MbGreaterAcks" + ackKeyString + assertEquals(metricName, produceMetricsNameMap.get(200)) + assertTrue(flattenProduceMetricNames.contains(metricName)) + } + + // test get the bucket name + val metadataRequest = request(new MetadataRequest.Builder(List("topic").asJava, true).build(), metrics) + assertEquals(None, metadataRequest.getConsumerFetchSizeBucketMetricName) + assertEquals(None, metadataRequest.getProduceAckSizeBucketMetricName) + + var produceRequest = request(new ProduceRequest.Builder(0, 0, + new ProduceRequestData().setAcks(1.toShort).setTimeoutMs(1000)).build(), + metrics) + assertEquals(None, produceRequest.getConsumerFetchSizeBucketMetricName) + assertEquals(Some("Produce0To10MbAcks1"), produceRequest.getProduceAckSizeBucketMetricName) + produceRequest = request(new ProduceRequest.Builder(0, 0, + new ProduceRequestData().setAcks(-1).setTimeoutMs(1000)).build(), + metrics) + assertEquals(Some("Produce0To10MbAcksAll"), produceRequest.getProduceAckSizeBucketMetricName) + + val tp = new TopicPartition("foo", 0) + val fetchData = Map(tp -> new FetchRequest.PartitionData(0, 0, 1000, + Optional.empty())).asJava + val consumeFetchRequest = request(new FetchRequest.Builder(9, 9, -1, 100, 0, fetchData) + .build(), + metrics) + assertEquals(Some("FetchConsumer0To10Mb"), consumeFetchRequest.getConsumerFetchSizeBucketMetricName) + assertEquals(None, consumeFetchRequest.getProduceAckSizeBucketMetricName) + val followerFetchRequest = request(new FetchRequest.Builder(9, 9, 1, 100, 0, fetchData) + .build(), + metrics) + assertEquals(None, followerFetchRequest.getConsumerFetchSizeBucketMetricName) + + assertEquals("FetchConsumer0To10Mb", metrics.getRequestSizeBucketMetricName(metrics.consumerFetchRequestSizeMetricNameMap, 2*1024 *1024)) + assertEquals("Produce10To20MbAcks0", metrics.getRequestSizeBucketMetricName(metrics.produceRequestAcksSizeMetricNameMap(0), 10*1024 *1024)) + assertEquals("Produce200MbGreaterAcks1", metrics.getRequestSizeBucketMetricName(metrics.produceRequestAcksSizeMetricNameMap(1), 201*1024 *1024)) + assertEquals("Produce0To10MbAcksAll", metrics.getRequestSizeBucketMetricName(metrics.produceRequestAcksSizeMetricNameMap(-1), 0)) + assertEquals("Produce20To200MbAcksAll", metrics.getRequestSizeBucketMetricName(metrics.produceRequestAcksSizeMetricNameMap(-1), 35*1024 *1024)) + + // test default config + props = new Properties() + props.put(KafkaConfig.ZkConnectProp, "127.0.0.1:2181") + config = KafkaConfig.fromProps(props) + metrics = new Metrics(apis, config) + testMetricsRequestSizeBucketDefault(metrics) + } + + private def testMetricsRequestSizeBucketDefault(metrics: Metrics): Unit = { + //default bucket "0,1,10,50,100" + val fetchMetricsNameMap = metrics.consumerFetchRequestSizeMetricNameMap + assertEquals(5, fetchMetricsNameMap.size) + assertTrue(fetchMetricsNameMap.containsKey(0)) + assertTrue(fetchMetricsNameMap.containsKey(1)) + assertTrue(fetchMetricsNameMap.containsKey(10)) + assertTrue(fetchMetricsNameMap.containsKey(50)) + assertTrue(fetchMetricsNameMap.containsKey(100)) + assertEquals("FetchConsumer0To1Mb", fetchMetricsNameMap.get(0)) + assertEquals("FetchConsumer1To10Mb", fetchMetricsNameMap.get(1)) + assertEquals("FetchConsumer10To50Mb", fetchMetricsNameMap.get(10)) + assertEquals("FetchConsumer50To100Mb", fetchMetricsNameMap.get(50)) + assertEquals("FetchConsumer100MbGreater", fetchMetricsNameMap.get(100)) + val produceMetricsNameMaps = metrics.produceRequestAcksSizeMetricNameMap + assertEquals(3, produceMetricsNameMaps.size) + assertTrue(produceMetricsNameMaps.contains(0)) + assertTrue(produceMetricsNameMaps.contains(1)) + assertTrue(produceMetricsNameMaps.contains(-1)) + for (i <- 0 until 3) { + val ackKey = if (i == 2) -1 else i + val ackKeyString = if(i == 2) "All" else i.toString + val produceMetricsNameMap = produceMetricsNameMaps(ackKey) + assertTrue(produceMetricsNameMap.containsKey(0)) + assertTrue(produceMetricsNameMap.containsKey(1)) + assertTrue(produceMetricsNameMap.containsKey(10)) + assertTrue(produceMetricsNameMap.containsKey(50)) + assertTrue(produceMetricsNameMap.containsKey(100)) + assertEquals("Produce0To1MbAcks" + ackKeyString, produceMetricsNameMap.get(0)) + assertEquals("Produce1To10MbAcks" + ackKeyString, produceMetricsNameMap.get(1)) + assertEquals("Produce10To50MbAcks" + ackKeyString, produceMetricsNameMap.get(10)) + assertEquals("Produce50To100MbAcks" + ackKeyString, produceMetricsNameMap.get(50)) + assertEquals("Produce100MbGreaterAcks" + ackKeyString, produceMetricsNameMap.get(100)) + } + } + private def buildMetadataRequest(): AbstractRequest = { val resourceName = "topic-1" val header = new RequestHeader(ApiKeys.METADATA, ApiKeys.METADATA.latestVersion, @@ -281,7 +418,7 @@ class RequestChannelTest { } } - def request(req: AbstractRequest): RequestChannel.Request = { + def request(req: AbstractRequest, metrics: Metrics): RequestChannel.Request = { val buffer = req.serializeWithHeader(new RequestHeader(req.apiKey, req.version, "client-id", 1)) val requestContext = newRequestContext(buffer) new network.RequestChannel.Request(processor = 1, @@ -289,10 +426,14 @@ class RequestChannelTest { startTimeNanos = 0, createNiceMock(classOf[MemoryPool]), buffer, - createNiceMock(classOf[RequestChannel.Metrics]) + metrics ) } + def request(req: AbstractRequest): RequestChannel.Request = { + request(req, createNiceMock(classOf[RequestChannel.Metrics])) + } + private def newRequestContext(buffer: ByteBuffer): RequestContext = { new RequestContext( RequestHeader.parse(buffer), diff --git a/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala b/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala index d0fc30fbdec21..a49c93eb400d7 100644 --- a/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala @@ -18,9 +18,8 @@ package kafka.server import java.net.InetAddress import java.nio.ByteBuffer -import java.util.Optional +import java.util.{Optional, Properties} import java.util.concurrent.atomic.AtomicReference - import kafka.network import kafka.network.RequestChannel import kafka.utils.MockTime @@ -232,13 +231,17 @@ class ForwardingManagerTest { Optional.of(principalBuilder) ) + val props = new Properties() + props.put(KafkaConfig.ZkConnectProp, "127.0.0.1:2181") + props.put(KafkaConfig.RequestMetricsSizeBucketsProp, "0, 10, 20, 200") + val config = KafkaConfig.fromProps(props) new network.RequestChannel.Request( processor = 1, context = requestContext, startTimeNanos = time.nanoseconds(), memoryPool = MemoryPool.NONE, buffer = requestBuffer, - metrics = new RequestChannel.Metrics(ListenerType.CONTROLLER), + metrics = new RequestChannel.Metrics(ListenerType.CONTROLLER, config), envelope = None ) } diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index cb58cfaf57941..0d0edff63b910 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -809,6 +809,7 @@ class KafkaConfigTest { //Kafka Yammer metrics reporter configs case KafkaConfig.KafkaMetricsReporterClassesProp => // ignore case KafkaConfig.KafkaMetricsPollingIntervalSecondsProp => //ignore + case KafkaConfig.RequestMetricsSizeBucketsProp => assertPropertyInvalid(baseProperties, name, "", "1", "1, 2, 5, not_a_number, 9") // Broker-side observer configs case KafkaConfig.ObserverClassNameProp => // ignore since even if the class name is invalid, a NoOpObserver class is used instead