diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 7349513675222..bfa412d9ff86d 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -25,7 +25,7 @@ import com.typesafe.scalalogging.Logger import com.yammer.metrics.core.{Histogram, Meter} import kafka.metrics.KafkaMetricsGroup import kafka.network -import kafka.server.{BrokerMetadataStats, KafkaConfig, Observer} +import kafka.server.{BrokerMetadataStats, Defaults, KafkaConfig, Observer} import kafka.utils.{Logging, NotNothing, Pool} import kafka.utils.Implicits._ import org.apache.kafka.common.config.ConfigResource @@ -38,6 +38,7 @@ import org.apache.kafka.common.requests._ import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.utils.{Sanitizer, Time} +import java.util import scala.annotation.nowarn import scala.collection.mutable import scala.jdk.CollectionConverters._ @@ -59,17 +60,26 @@ object RequestChannel extends Logging { val sanitizedUser: String = Sanitizer.sanitize(principal.getName) } - class Metrics(enabledApis: Iterable[ApiKeys]) { - def this(scope: ListenerType) = { - this(ApiKeys.apisForListener(scope).asScala) + class Metrics(enabledApis: Iterable[ApiKeys], config: KafkaConfig) { + def this(scope: ListenerType, config: KafkaConfig) = { + this(ApiKeys.apisForListener(scope).asScala, config) } private val metricsMap = mutable.Map[String, RequestMetrics]() + // map[acks, map[requestSize, metricName]] + // this is used to create metrics for produce requests of different size and acks + val produceRequestAcksSizeMetricNameMap = getProduceRequestAcksSizeMetricNameMap() + // map[responseSize, metricName] + // this is used to create metrics for fetch requests of different response size + val consumerFetchRequestSizeMetricNameMap = getConsumerFetchRequestAcksSizeMetricNameMap + (enabledApis.map(_.name) ++ Seq(RequestMetrics.MetadataAllTopics) ++ - Seq(RequestMetrics.consumerFetchMetricName, RequestMetrics.followFetchMetricName)).foreach { name => - metricsMap.put(name, new RequestMetrics(name)) + Seq(RequestMetrics.consumerFetchMetricName, RequestMetrics.followFetchMetricName) ++ + getConsumerFetchRequestSizeMetricNames ++ + getProduceRequestAcksSizeMetricNames) + .foreach { name => metricsMap.put(name, new RequestMetrics(name)) } def apply(metricName: String): RequestMetrics = metricsMap(metricName) @@ -77,6 +87,59 @@ object RequestChannel extends Logging { def close(): Unit = { metricsMap.values.foreach(_.removeMetrics()) } + + // generate map[request/responseSize, metricName] for requests of different size + def getRequestSizeMetricNameMap(requestType: String):util.TreeMap[Int, String] = { + val buckets = config.requestMetricsSizeBuckets + // get size and corresponding term to be used in metric name + // [0,1,10,50,100] => requestSizeBuckets:Seq((0, "0To1Mb"), (1, "1To10Mb"), (10, "10To50Mb"), (50, "50To100Mb"), + // (100, "100MbGreater")) + val requestSizeBuckets = for (i <- 0 until buckets.length) yield { + val size = buckets(i) + if (i == buckets.length - 1) (size, s"${size}MbGreater") + else (size, s"${size}To${buckets(i + 1)}Mb") + } + val treeMap = new util.TreeMap[Int, String] + requestSizeBuckets.map(bucket => (bucket._1, s"${requestType}${bucket._2}")).toMap + .foreach{case (size, bucket) => treeMap.put(size, bucket)} + treeMap + } + + // generate map[acks, map[requestSize, metricName]] for produce requests of different request size and acks + private def getProduceRequestAcksSizeMetricNameMap():Map[Int, util.TreeMap[Int, String]] = { + val produceRequestAcks = Seq((0, "0"), (1, "1"), (-1, "All")) + val requestSizeMetricNameMap = getRequestSizeMetricNameMap(ApiKeys.PRODUCE.name) + + val ackSizeMetricNames = for(ack <- produceRequestAcks) yield { + val treeMap = new util.TreeMap[Int, String] + requestSizeMetricNameMap.asScala.map({case(size, name) => treeMap.put(size, s"${name}Acks${ack._2}")}) + (ack._1, treeMap) + } + ackSizeMetricNames.toMap + } + + // generate map[responseSize, metricName] for consumerFetch requests of different request size and acks + private def getConsumerFetchRequestAcksSizeMetricNameMap():util.TreeMap[Int, String] = { + getRequestSizeMetricNameMap(RequestMetrics.consumerFetchMetricName) + } + + // get all the metric names for produce requests of different acks and size + def getProduceRequestAcksSizeMetricNames : Seq[String] = { + produceRequestAcksSizeMetricNameMap.values.toSeq.map(a => a.values.asScala.toSeq).flatten + } + + // get all the metric names for fetch requests of different size + def getConsumerFetchRequestSizeMetricNames : Seq[String] = { + consumerFetchRequestSizeMetricNameMap.values.asScala.toSeq + } + + // get the metric name for a given request/response size + // the bucket is [left, right) + def getRequestSizeBucketMetricName(sizeMetricNameMap: util.TreeMap[Int, String], sizeBytes: Long): String = { + val sizeMb = sizeBytes / 1024 / 1024 + if(sizeMb < sizeMetricNameMap.firstKey()) sizeMetricNameMap.firstEntry().getValue + else sizeMetricNameMap.floorEntry(sizeMb.toInt).getValue + } } class Request(val processor: Int, @@ -207,6 +270,29 @@ object RequestChannel extends Logging { math.max(apiLocalCompleteTimeNanos - requestDequeueTimeNanos, 0L) } + def getConsumerFetchSizeBucketMetricName: Option[String] = { + if (header.apiKey != ApiKeys.FETCH) + None + else { + val isFromFollower = body[FetchRequest].isFromFollower + if (isFromFollower) None + else Some(metrics.getRequestSizeBucketMetricName(metrics.consumerFetchRequestSizeMetricNameMap, responseBytes)) + } + } + + def getProduceAckSizeBucketMetricName: Option[String] = { + if (header.apiKey != ApiKeys.PRODUCE) + None + else { + var acks = body[ProduceRequest].acks() + if(!metrics.produceRequestAcksSizeMetricNameMap.contains(acks)) { + error(s"metrics.produceRequestAcksSizeMetricNameMap does not contain key acks '${acks}', use -1 instead.") + acks = -1 + } + Some(metrics.getRequestSizeBucketMetricName(metrics.produceRequestAcksSizeMetricNameMap(acks), sizeOfBodyInBytes)) + } + } + def updateRequestMetrics(networkThreadTimeNanos: Long, response: Response): Unit = { val endTimeNanos = Time.SYSTEM.nanoseconds @@ -240,7 +326,9 @@ object RequestChannel extends Logging { if (header.apiKey() == ApiKeys.METADATA && body[MetadataRequest].isAllTopics) { Seq(RequestMetrics.MetadataAllTopics) } else Seq.empty - val metricNames = (fetchMetricNames ++ metadataMetricNames) :+ header.apiKey.name + + val metricNames = (fetchMetricNames ++ metadataMetricNames ++ getConsumerFetchSizeBucketMetricName.toSeq ++ + getProduceAckSizeBucketMetricName.toSeq) :+ header.apiKey.name metricNames.foreach { metricName => val m = metrics(metricName) m.requestRate(header.apiVersion).mark() diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index c8133d613b731..039f2fd124844 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -106,12 +106,12 @@ class SocketServer(val config: KafkaConfig, // data-plane private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]() private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, Acceptor]() - val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneMetricPrefix, time, observer, apiVersionManager.newRequestMetrics) + val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneMetricPrefix, time, observer, apiVersionManager.newRequestMetrics(config)) // control-plane private var controlPlaneProcessorOpt : Option[Processor] = None private[network] var controlPlaneAcceptorOpt : Option[Acceptor] = None val controlPlaneRequestChannelOpt: Option[RequestChannel] = config.controlPlaneListenerName.map(_ => - new RequestChannel(20, ControlPlaneMetricPrefix, time, observer, apiVersionManager.newRequestMetrics)) + new RequestChannel(20, ControlPlaneMetricPrefix, time, observer, apiVersionManager.newRequestMetrics(config))) private var nextProcessorId = 0 val connectionQuotas = new ConnectionQuotas(config, time, metrics) diff --git a/core/src/main/scala/kafka/server/ApiVersionManager.scala b/core/src/main/scala/kafka/server/ApiVersionManager.scala index aa29ee02fa961..c3dc137213777 100644 --- a/core/src/main/scala/kafka/server/ApiVersionManager.scala +++ b/core/src/main/scala/kafka/server/ApiVersionManager.scala @@ -32,7 +32,7 @@ trait ApiVersionManager { def enabledApis: collection.Set[ApiKeys] def apiVersionResponse(throttleTimeMs: Int): ApiVersionsResponse def isApiEnabled(apiKey: ApiKeys): Boolean = enabledApis.contains(apiKey) - def newRequestMetrics: RequestChannel.Metrics = new network.RequestChannel.Metrics(enabledApis) + def newRequestMetrics(config: KafkaConfig): RequestChannel.Metrics = new network.RequestChannel.Metrics(enabledApis, config) } object ApiVersionManager { diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 3601a362d3c14..bec3b0b3a6cbf 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -34,7 +34,7 @@ import kafka.utils.Implicits._ import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.common.Reconfigurable import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, ConfigResource, SaslConfigs, SecurityConfig, SslClientAuth, SslConfigs, TopicConfig} -import org.apache.kafka.common.config.ConfigDef.{ConfigKey, ValidList} +import org.apache.kafka.common.config.ConfigDef.{ConfigKey, ValidList, Validator} import org.apache.kafka.common.config.internals.BrokerSecurityConfigs import org.apache.kafka.common.config.types.Password import org.apache.kafka.common.metrics.Sensor @@ -265,6 +265,7 @@ object Defaults { val MetricReporterClasses = "" val MetricRecordingLevel = Sensor.RecordingLevel.INFO.toString() val MetricReplaceOnDuplicate = false + val RequestMetricsSizeBuckets = "0,1,10,50,100" /** ********* Kafka Yammer Metrics Reporter Configuration ***********/ @@ -619,6 +620,7 @@ object KafkaConfig { val MetricReporterClassesProp: String = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG val MetricRecordingLevelProp: String = CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG val MetricReplaceOnDuplicateProp: String = CommonClientConfigs.METRICS_REPLACE_ON_DUPLICATE_CONFIG + val RequestMetricsSizeBucketsProp: String = "request.metrics.size.buckets" /** ********* Kafka Yammer Metrics Reporters Configuration ***********/ val KafkaMetricsReporterClassesProp = "kafka.metrics.reporters" @@ -1064,6 +1066,7 @@ object KafkaConfig { val MetricReporterClassesDoc = CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC val MetricRecordingLevelDoc = CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC val MetricReplaceOnDuplicateDoc = CommonClientConfigs.METRICS_REPLACE_ON_DUPLICATE_DOC + val RequestMetricsSizeBucketsDoc = "The size buckets used to group requests to provide RequestMetrics for each group" /** ********* Kafka Yammer Metrics Reporter Configuration ***********/ @@ -1389,6 +1392,7 @@ object KafkaConfig { .define(MetricReporterClassesProp, LIST, Defaults.MetricReporterClasses, LOW, MetricReporterClassesDoc) .define(MetricRecordingLevelProp, STRING, Defaults.MetricRecordingLevel, LOW, MetricRecordingLevelDoc) .define(MetricReplaceOnDuplicateProp, BOOLEAN, Defaults.MetricReplaceOnDuplicate, LOW, MetricReplaceOnDuplicateDoc) + .define(RequestMetricsSizeBucketsProp, STRING, Defaults.RequestMetricsSizeBuckets, RequestMetricsSizeBucketsValidator, LOW, RequestMetricsSizeBucketsDoc) /** ********* Kafka Yammer Metrics Reporter Configuration for docs ***********/ .define(KafkaMetricsReporterClassesProp, LIST, Defaults.KafkaMetricReporterClasses, LOW, KafkaMetricsReporterClassesDoc) @@ -1480,6 +1484,29 @@ object KafkaConfig { .define(RaftConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG, INT, Defaults.QuorumRetryBackoffMs, null, LOW, RaftConfig.QUORUM_RETRY_BACKOFF_MS_DOC) } + object RequestMetricsSizeBucketsValidator extends Validator { + override def ensureValid(name: String, value: Any): Unit = { + getRequestMetricsSizeBuckets(value.toString) + } + + override def toString: String = "integer list, separated by ',', at least two integers" + } + // convert requestMetricsSizeBucketsConfig from string to int array. e.g., "0,1,10,50,100" => [0,1,10,50,100] + private def getRequestMetricsSizeBuckets(requestMetricsSizeBuckets: String): Array[Int] = { + val bucketsString = requestMetricsSizeBuckets.replaceAll("\\s", "").split(',') + if(bucketsString.length < 2) { + throw new ConfigException(RequestMetricsSizeBucketsProp, requestMetricsSizeBuckets, + s"requestMetricsSizeBuckets has fewer than 2 buckets: '${requestMetricsSizeBuckets}'.") + } + try { + bucketsString.map(_.toInt) + } catch { + case _: Exception => + throw new ConfigException(RequestMetricsSizeBucketsProp, requestMetricsSizeBuckets, + s"Failed to parse requestMetricsSizeBuckets: '${requestMetricsSizeBuckets}'. ") + } + } + /** ********* Remote Log Management Configuration *********/ RemoteLogManagerConfig.CONFIG_DEF.configKeys().values().forEach(key => configDef.define(key)) @@ -1912,6 +1939,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO val metricSampleWindowMs = getLong(KafkaConfig.MetricSampleWindowMsProp) val metricRecordingLevel = getString(KafkaConfig.MetricRecordingLevelProp) val metricReplaceOnDuplicate = getBoolean(KafkaConfig.MetricReplaceOnDuplicateProp) + def requestMetricsSizeBuckets = getRequestMetricsSizeBuckets() /** ********* SSL/SASL Configuration **************/ // Security configs may be overridden for listeners, so it is not safe to use the base values @@ -2046,6 +2074,10 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO listeners.filterNot(l => controllerListenerNames.contains(l.listenerName.value())) } + def getRequestMetricsSizeBuckets(): Array[Int] = { + KafkaConfig.getRequestMetricsSizeBuckets(getString(KafkaConfig.RequestMetricsSizeBucketsProp)) + } + private def getInterBrokerListenerNameAndSecurityProtocol: (ListenerName, SecurityProtocol) = { Option(getString(KafkaConfig.InterBrokerListenerNameProp)) match { case Some(_) if originals.containsKey(KafkaConfig.InterBrokerSecurityProtocolProp) => diff --git a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala index fd3ca90aea341..8d1f092131355 100644 --- a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala @@ -295,7 +295,7 @@ abstract class QuotaTestClients(topic: String, def yammerMetricValue(name: String): Double = { val allMetrics = KafkaYammerMetrics.defaultRegistry.allMetrics.asScala val (_, metric) = allMetrics.find { case (metricName, _) => - metricName.getMBeanName.startsWith(name) + metricName.getMBeanName == name }.getOrElse(fail(s"Unable to find broker metric $name: allMetrics: ${allMetrics.keySet.map(_.getMBeanName)}")) metric match { case m: Meter => m.count.toDouble diff --git a/core/src/test/scala/integration/kafka/api/MetricsTest.scala b/core/src/test/scala/integration/kafka/api/MetricsTest.scala index 05b518b0602b8..0b291afe5fe6c 100644 --- a/core/src/test/scala/integration/kafka/api/MetricsTest.scala +++ b/core/src/test/scala/integration/kafka/api/MetricsTest.scala @@ -129,7 +129,7 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup { val producer = createProducer() sendRecords(producer, numRecords, recordSize, tp) - val requestRateWithoutVersionMeter = yammerMeterWithPrefix("kafka.network:type=RequestMetrics,name=RequestsPerSecAcrossVersions,request=Produce") + val requestRateWithoutVersionMeter = yammerMeterWithName("kafka.network:type=RequestMetrics,name=RequestsPerSecAcrossVersions,request=Produce") assertTrue(requestRateWithoutVersionMeter.count() > 0, "The Produce RequestsPerSecAcrossVersions metric is not recorded") } @@ -305,7 +305,7 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup { private def yammerMeterWithPrefix(prefix: String): Meter = { val allMetrics = KafkaYammerMetrics.defaultRegistry.allMetrics.asScala - val (_, metric) = allMetrics.find { case (n, _) => n.getMBeanName.startsWith(prefix) } + val (_, metric) = allMetrics.find { case (n, _) => n.getMBeanName.startsWith(prefix) } .getOrElse(fail(s"Unable to find broker metric with prefix $prefix: allMetrics: ${allMetrics.keySet.map(_.getMBeanName)}")) metric match { case m: Meter => m @@ -313,6 +313,16 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup { } } + private def yammerMeterWithName(name: String): Meter = { + val allMetrics = KafkaYammerMetrics.defaultRegistry.allMetrics.asScala + val (_, metric) = allMetrics.find { case (n, _) => n.getMBeanName == name} + .getOrElse(fail(s"Unable to find broker metric with name $name: allMetrics: ${allMetrics.keySet.map(_.getMBeanName)}")) + metric match { + case m: Meter => m + case m => fail(s"Unexpected broker metric of class ${m.getClass}") + } + } + private def verifyNoRequestMetrics(errorMessage: String): Unit = { val metrics = KafkaYammerMetrics.defaultRegistry.allMetrics.asScala.filter { case (n, _) => n.getMBeanName.startsWith("kafka.network:type=RequestMetrics") diff --git a/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala b/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala index 881ea65d3daef..91cf42e028803 100644 --- a/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala +++ b/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala @@ -21,15 +21,18 @@ package kafka.network import java.io.IOException import java.net.InetAddress import java.nio.ByteBuffer -import java.util.Collections +import java.util.{Collections, Optional, Properties} import com.fasterxml.jackson.databind.ObjectMapper import kafka.network +import kafka.network.RequestChannel.{Metrics, Request} +import kafka.server.{Defaults, KafkaConfig} import kafka.utils.TestUtils import org.apache.kafka.clients.admin.AlterConfigOp.OpType +import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.config.types.Password import org.apache.kafka.common.config.{ConfigResource, SaslConfigs, SslConfigs, TopicConfig} import org.apache.kafka.common.memory.MemoryPool -import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData +import org.apache.kafka.common.message.{IncrementalAlterConfigsRequestData, ProduceRequestData} import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData._ import org.apache.kafka.common.network.{ByteBufferSend, ClientInformation, ListenerName} import org.apache.kafka.common.protocol.{ApiKeys, Errors} @@ -255,6 +258,140 @@ class RequestChannelTest { assertTrue(capturedValue.error().equals(Errors.NOT_CONTROLLER)) } + @Test + def testMetricsRequestSizeBucket(): Unit = { + val apis = Seq(ApiKeys.FETCH, ApiKeys.PRODUCE) + var props = new Properties() + props.put(KafkaConfig.ZkConnectProp, "127.0.0.1:2181") + props.put(KafkaConfig.RequestMetricsSizeBucketsProp, "0, 10, 20, 200") + var config = KafkaConfig.fromProps(props) + var metrics = new Metrics(apis, config) + val fetchMetricsNameMap = metrics.consumerFetchRequestSizeMetricNameMap + assertEquals(4, fetchMetricsNameMap.size) + assertTrue(fetchMetricsNameMap.containsKey(0)) + assertTrue(fetchMetricsNameMap.containsKey(10)) + assertTrue(fetchMetricsNameMap.containsKey(20)) + assertTrue(fetchMetricsNameMap.containsKey(200)) + val flattenFetchMetricNames = metrics.getConsumerFetchRequestSizeMetricNames + assertEquals(4, flattenFetchMetricNames.size) + assertEquals("FetchConsumer0To10Mb", fetchMetricsNameMap.get(0)) + assertTrue(flattenFetchMetricNames.contains("FetchConsumer0To10Mb")) + assertEquals("FetchConsumer10To20Mb", fetchMetricsNameMap.get(10)) + assertTrue(flattenFetchMetricNames.contains("FetchConsumer10To20Mb")) + assertEquals("FetchConsumer20To200Mb", fetchMetricsNameMap.get(20)) + assertTrue(flattenFetchMetricNames.contains("FetchConsumer20To200Mb")) + assertEquals("FetchConsumer200MbGreater", fetchMetricsNameMap.get(200)) + assertTrue(flattenFetchMetricNames.contains("FetchConsumer200MbGreater")) + + val produceMetricsNameMaps = metrics.produceRequestAcksSizeMetricNameMap + assertEquals(3, produceMetricsNameMaps.size) + + assertTrue(produceMetricsNameMaps.contains(0)) + assertTrue(produceMetricsNameMaps.contains(1)) + assertTrue(produceMetricsNameMaps.contains(-1)) + val flattenProduceMetricNames = metrics.getProduceRequestAcksSizeMetricNames + assertEquals(12, flattenProduceMetricNames.size) + + for (i <- 0 until 3) { + val ackKey = if (i == 2) -1 else i + val ackKeyString = if(i == 2) "All" else i.toString + val produceMetricsNameMap = produceMetricsNameMaps(ackKey) + assertTrue(produceMetricsNameMap.containsKey(0)) + assertTrue(produceMetricsNameMap.containsKey(10)) + assertTrue(produceMetricsNameMap.containsKey(20)) + assertTrue(produceMetricsNameMap.containsKey(200)) + var metricName = "Produce0To10MbAcks" + ackKeyString + assertEquals(metricName, produceMetricsNameMap.get(0)) + assertTrue(flattenProduceMetricNames.contains(metricName)) + metricName = "Produce10To20MbAcks" + ackKeyString + assertEquals(metricName, produceMetricsNameMap.get(10)) + assertTrue(flattenProduceMetricNames.contains(metricName)) + metricName = "Produce20To200MbAcks" + ackKeyString + assertEquals(metricName, produceMetricsNameMap.get(20)) + assertTrue(flattenProduceMetricNames.contains(metricName)) + metricName = "Produce200MbGreaterAcks" + ackKeyString + assertEquals(metricName, produceMetricsNameMap.get(200)) + assertTrue(flattenProduceMetricNames.contains(metricName)) + } + + // test get the bucket name + val metadataRequest = request(new MetadataRequest.Builder(List("topic").asJava, true).build(), metrics) + assertEquals(None, metadataRequest.getConsumerFetchSizeBucketMetricName) + assertEquals(None, metadataRequest.getProduceAckSizeBucketMetricName) + + var produceRequest = request(new ProduceRequest.Builder(0, 0, + new ProduceRequestData().setAcks(1.toShort).setTimeoutMs(1000)).build(), + metrics) + assertEquals(None, produceRequest.getConsumerFetchSizeBucketMetricName) + assertEquals(Some("Produce0To10MbAcks1"), produceRequest.getProduceAckSizeBucketMetricName) + produceRequest = request(new ProduceRequest.Builder(0, 0, + new ProduceRequestData().setAcks(-1).setTimeoutMs(1000)).build(), + metrics) + assertEquals(Some("Produce0To10MbAcksAll"), produceRequest.getProduceAckSizeBucketMetricName) + + val tp = new TopicPartition("foo", 0) + val fetchData = Map(tp -> new FetchRequest.PartitionData(0, 0, 1000, + Optional.empty())).asJava + val consumeFetchRequest = request(new FetchRequest.Builder(9, 9, -1, 100, 0, fetchData) + .build(), + metrics) + assertEquals(Some("FetchConsumer0To10Mb"), consumeFetchRequest.getConsumerFetchSizeBucketMetricName) + assertEquals(None, consumeFetchRequest.getProduceAckSizeBucketMetricName) + val followerFetchRequest = request(new FetchRequest.Builder(9, 9, 1, 100, 0, fetchData) + .build(), + metrics) + assertEquals(None, followerFetchRequest.getConsumerFetchSizeBucketMetricName) + + assertEquals("FetchConsumer0To10Mb", metrics.getRequestSizeBucketMetricName(metrics.consumerFetchRequestSizeMetricNameMap, 2*1024 *1024)) + assertEquals("Produce10To20MbAcks0", metrics.getRequestSizeBucketMetricName(metrics.produceRequestAcksSizeMetricNameMap(0), 10*1024 *1024)) + assertEquals("Produce200MbGreaterAcks1", metrics.getRequestSizeBucketMetricName(metrics.produceRequestAcksSizeMetricNameMap(1), 201*1024 *1024)) + assertEquals("Produce0To10MbAcksAll", metrics.getRequestSizeBucketMetricName(metrics.produceRequestAcksSizeMetricNameMap(-1), 0)) + assertEquals("Produce20To200MbAcksAll", metrics.getRequestSizeBucketMetricName(metrics.produceRequestAcksSizeMetricNameMap(-1), 35*1024 *1024)) + + // test default config + props = new Properties() + props.put(KafkaConfig.ZkConnectProp, "127.0.0.1:2181") + config = KafkaConfig.fromProps(props) + metrics = new Metrics(apis, config) + testMetricsRequestSizeBucketDefault(metrics) + } + + private def testMetricsRequestSizeBucketDefault(metrics: Metrics): Unit = { + //default bucket "0,1,10,50,100" + val fetchMetricsNameMap = metrics.consumerFetchRequestSizeMetricNameMap + assertEquals(5, fetchMetricsNameMap.size) + assertTrue(fetchMetricsNameMap.containsKey(0)) + assertTrue(fetchMetricsNameMap.containsKey(1)) + assertTrue(fetchMetricsNameMap.containsKey(10)) + assertTrue(fetchMetricsNameMap.containsKey(50)) + assertTrue(fetchMetricsNameMap.containsKey(100)) + assertEquals("FetchConsumer0To1Mb", fetchMetricsNameMap.get(0)) + assertEquals("FetchConsumer1To10Mb", fetchMetricsNameMap.get(1)) + assertEquals("FetchConsumer10To50Mb", fetchMetricsNameMap.get(10)) + assertEquals("FetchConsumer50To100Mb", fetchMetricsNameMap.get(50)) + assertEquals("FetchConsumer100MbGreater", fetchMetricsNameMap.get(100)) + val produceMetricsNameMaps = metrics.produceRequestAcksSizeMetricNameMap + assertEquals(3, produceMetricsNameMaps.size) + assertTrue(produceMetricsNameMaps.contains(0)) + assertTrue(produceMetricsNameMaps.contains(1)) + assertTrue(produceMetricsNameMaps.contains(-1)) + for (i <- 0 until 3) { + val ackKey = if (i == 2) -1 else i + val ackKeyString = if(i == 2) "All" else i.toString + val produceMetricsNameMap = produceMetricsNameMaps(ackKey) + assertTrue(produceMetricsNameMap.containsKey(0)) + assertTrue(produceMetricsNameMap.containsKey(1)) + assertTrue(produceMetricsNameMap.containsKey(10)) + assertTrue(produceMetricsNameMap.containsKey(50)) + assertTrue(produceMetricsNameMap.containsKey(100)) + assertEquals("Produce0To1MbAcks" + ackKeyString, produceMetricsNameMap.get(0)) + assertEquals("Produce1To10MbAcks" + ackKeyString, produceMetricsNameMap.get(1)) + assertEquals("Produce10To50MbAcks" + ackKeyString, produceMetricsNameMap.get(10)) + assertEquals("Produce50To100MbAcks" + ackKeyString, produceMetricsNameMap.get(50)) + assertEquals("Produce100MbGreaterAcks" + ackKeyString, produceMetricsNameMap.get(100)) + } + } + private def buildMetadataRequest(): AbstractRequest = { val resourceName = "topic-1" val header = new RequestHeader(ApiKeys.METADATA, ApiKeys.METADATA.latestVersion, @@ -281,7 +418,7 @@ class RequestChannelTest { } } - def request(req: AbstractRequest): RequestChannel.Request = { + def request(req: AbstractRequest, metrics: Metrics): RequestChannel.Request = { val buffer = req.serializeWithHeader(new RequestHeader(req.apiKey, req.version, "client-id", 1)) val requestContext = newRequestContext(buffer) new network.RequestChannel.Request(processor = 1, @@ -289,10 +426,14 @@ class RequestChannelTest { startTimeNanos = 0, createNiceMock(classOf[MemoryPool]), buffer, - createNiceMock(classOf[RequestChannel.Metrics]) + metrics ) } + def request(req: AbstractRequest): RequestChannel.Request = { + request(req, createNiceMock(classOf[RequestChannel.Metrics])) + } + private def newRequestContext(buffer: ByteBuffer): RequestContext = { new RequestContext( RequestHeader.parse(buffer), diff --git a/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala b/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala index d0fc30fbdec21..a49c93eb400d7 100644 --- a/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala @@ -18,9 +18,8 @@ package kafka.server import java.net.InetAddress import java.nio.ByteBuffer -import java.util.Optional +import java.util.{Optional, Properties} import java.util.concurrent.atomic.AtomicReference - import kafka.network import kafka.network.RequestChannel import kafka.utils.MockTime @@ -232,13 +231,17 @@ class ForwardingManagerTest { Optional.of(principalBuilder) ) + val props = new Properties() + props.put(KafkaConfig.ZkConnectProp, "127.0.0.1:2181") + props.put(KafkaConfig.RequestMetricsSizeBucketsProp, "0, 10, 20, 200") + val config = KafkaConfig.fromProps(props) new network.RequestChannel.Request( processor = 1, context = requestContext, startTimeNanos = time.nanoseconds(), memoryPool = MemoryPool.NONE, buffer = requestBuffer, - metrics = new RequestChannel.Metrics(ListenerType.CONTROLLER), + metrics = new RequestChannel.Metrics(ListenerType.CONTROLLER, config), envelope = None ) } diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index cb58cfaf57941..0d0edff63b910 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -809,6 +809,7 @@ class KafkaConfigTest { //Kafka Yammer metrics reporter configs case KafkaConfig.KafkaMetricsReporterClassesProp => // ignore case KafkaConfig.KafkaMetricsPollingIntervalSecondsProp => //ignore + case KafkaConfig.RequestMetricsSizeBucketsProp => assertPropertyInvalid(baseProperties, name, "", "1", "1, 2, 5, not_a_number, 9") // Broker-side observer configs case KafkaConfig.ObserverClassNameProp => // ignore since even if the class name is invalid, a NoOpObserver class is used instead