Skip to content

Commit

Permalink
[LI-HOTFIX] Add metric to sample the age of consumed data (#415)
Browse files Browse the repository at this point in the history
TICKET = N/A
LI_DESCRIPTION =
For SKU tiering design, it's useful to know the age of consumed data. However, if we simply record
the age of all consumed batches, it means the consumed data must be loaded into user space memory,
which defeats the purpose of the kernel TLS work.

Thus, instead of recording the age of all consumed batches, we run a sampling
of the consumers' Fetch requests (with a default ratio of 0.01).

EXIT_CRITERIA = N/A
  • Loading branch information
gitlw authored Nov 15, 2022
1 parent 5b5aef2 commit fb7f981
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 4 deletions.
6 changes: 5 additions & 1 deletion core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ object Defaults {
val LiNumControllerInitThreads = 1
val LiLogCleanerFineGrainedLockEnabled = true
val LiDropCorruptedFilesEnabled = false
val LiConsumerFetchSampleRatio = 0.01
}

object KafkaConfig {
Expand Down Expand Up @@ -448,6 +449,7 @@ object KafkaConfig {
val LiNumControllerInitThreadsProp = "li.num.controller.init.threads"
val LiLogCleanerFineGrainedLockEnableProp = "li.log.cleaner.fine.grained.lock.enable"
val LiDropCorruptedFilesEnableProp = "li.drop.corrupted.files.enable"
val LiConsumerFetchSampleRatioProp = "li.consumer.fetch.sample.ratio"
val AllowPreferredControllerFallbackProp = "allow.preferred.controller.fallback"
val UnofficialClientLoggingEnableProp = "unofficial.client.logging.enable"
val UnofficialClientCacheTtlProp = "unofficial.client.cache.ttl"
Expand Down Expand Up @@ -785,6 +787,7 @@ object KafkaConfig {
val LiCombinedControlRequestEnableDoc = "Specifies whether the controller should use the LiCombinedControlRequest."
val LiUpdateMetadataDelayMsDoc = "Specifies how long a UpdateMetadata request with partitions should be delayed before its processing can start. This config is purely for testing the LiCombinedControl feature and should not be enabled in a production environment."
val LiDropCorruptedFilesEnableDoc = "Specifies whether the broker should delete corrupted files during startup."
val LiConsumerFetchSampleRatioDoc = "Specifies the ratio of consumer Fetch requests to sample, which must be a number in the range [0.0, 1.0]. For now, the sampling is used to derive the age of consumed data."
val LiDropFetchFollowerEnableDoc = "Specifies whether a leader should drop Fetch requests from followers. This config is used to simulate a slow leader and test the leader initiated leadership transfer"
val LiAlterIsrEnabledDoc = "Specifies whether the brokers should use the AlterISR request to propagate ISR changes to the controller. If set to false, brokers will propagate the updates via Zookeeper."
val LiNumControllerInitThreadsDoc = "Number of threads (and Zookeeper clients + connections) to be used while recursing the topic-partitions tree in Zookeeper during controller startup/failover."
Expand Down Expand Up @@ -1233,6 +1236,7 @@ object KafkaConfig {
.define(LiNumControllerInitThreadsProp, INT, Defaults.LiNumControllerInitThreads, atLeast(1), LOW, LiNumControllerInitThreadsDoc)
.define(LiLogCleanerFineGrainedLockEnableProp, BOOLEAN, Defaults.LiLogCleanerFineGrainedLockEnabled, LOW, LiLogCleanerFineGrainedLockEnableDoc)
.define(LiDropCorruptedFilesEnableProp, BOOLEAN, Defaults.LiDropCorruptedFilesEnabled, HIGH, LiDropCorruptedFilesEnableDoc)
.define(LiConsumerFetchSampleRatioProp, DOUBLE, Defaults.LiConsumerFetchSampleRatio, between(0.0, 1.0), LOW, LiConsumerFetchSampleRatioDoc)
.define(AllowPreferredControllerFallbackProp, BOOLEAN, Defaults.AllowPreferredControllerFallback, HIGH, AllowPreferredControllerFallbackDoc)
.define(UnofficialClientLoggingEnableProp, BOOLEAN, Defaults.UnofficialClientLoggingEnable, LOW, UnofficialClientLoggingEnableDoc)
.define(UnofficialClientCacheTtlProp, LONG, Defaults.UnofficialClientCacheTtl, LOW, UnofficialClientCacheTtlDoc)
Expand Down Expand Up @@ -1770,7 +1774,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
def liNumControllerInitThreads = getInt(KafkaConfig.LiNumControllerInitThreadsProp)
def liLogCleanerFineGrainedLockEnable = getBoolean(KafkaConfig.LiLogCleanerFineGrainedLockEnableProp)
val liDropCorruptedFilesEnable = getBoolean(KafkaConfig.LiDropCorruptedFilesEnableProp)

val liConsumerFetchSampleRatio = getDouble(KafkaConfig.LiConsumerFetchSampleRatioProp)
def unofficialClientLoggingEnable = getBoolean(KafkaConfig.UnofficialClientLoggingEnableProp)
def unofficialClientCacheTtl = getLong(KafkaConfig.UnofficialClientCacheTtlProp)
def expectedClientSoftwareNames = getList(KafkaConfig.ExpectedClientSoftwareNamesProp)
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/scala/kafka/server/KafkaRequestHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -450,3 +450,7 @@ class BrokerTopicStats extends Logging {
object BrokerMetadataStats extends KafkaMetricsGroup {
val outgoingBytesRate: Meter = newMeter("MetadataOutgoingBytesPerSec", "bytes", TimeUnit.SECONDS)
}

object ConsumerStats extends KafkaMetricsGroup {
val consumedDataAgeHist = newHistogram("ConsumedDataAgeMs")
}
16 changes: 13 additions & 3 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.util.Optional
import java.util.concurrent.{CompletableFuture, RejectedExecutionException, TimeUnit}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
import java.util.concurrent.locks.Lock

import com.yammer.metrics.core.Meter
import kafka.api._
import kafka.cluster.{BrokerEndPoint, Partition}
Expand Down Expand Up @@ -65,6 +64,7 @@ import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta}
import scala.jdk.CollectionConverters._
import scala.collection.{Map, Seq, Set, mutable}
import scala.compat.java8.OptionConverters._
import scala.util.Random

/*
* Result metadata of a log append operation on the log
Expand Down Expand Up @@ -257,7 +257,7 @@ class ReplicaManager(val config: KafkaConfig,

private var logDirFailureHandler: LogDirFailureHandler = null
val recompressedBatchCount: AtomicLong = new AtomicLong(0)

val random = new Random
def getHighWatermarkCheckpoints: Map[String, OffsetCheckpointFile] = highWatermarkCheckpoints

private class LogDirFailureHandler(name: String, haltBrokerOnDirFailure: Boolean) extends ShutdownableThread(name) {
Expand Down Expand Up @@ -1088,7 +1088,17 @@ class ReplicaManager(val config: KafkaConfig,
quota = quota,
clientMetadata = clientMetadata)
if (isFromFollower) updateFollowerFetchState(replicaId, result)
else result
else {
// the fetch is from a consumer, sample the age of consumed data
if (random.nextDouble() <= config.liConsumerFetchSampleRatio) {
result.foreach { case (_, logReadResult) =>
logReadResult.info.records.batches().asScala.foreach { batch =>
ConsumerStats.consumedDataAgeHist.update(time.milliseconds - batch.maxTimestamp())
}
}
}
result
}
}

val logReadResults = readFromLog()
Expand Down

0 comments on commit fb7f981

Please sign in to comment.