Skip to content

Commit

Permalink
[LI-HOTFIX] Add Zookeeper pagination support for /brokers/topics znode (
Browse files Browse the repository at this point in the history
#435)

TICKET = LIKAFKA-49497

LI_DESCRIPTION =
Switch from Apache to LI Zookeeper dependency and add a GetAllChildrenPaginated option for the /brokers/topics znode (which supports 'list topics' responses greater than 1 MB). The feature is controlled by a new li.zookeeper.pagination.enable config (default = false), with the intention that it be enabled only for critical clusters, at least until it's proven itself in battle.

EXIT_CRITERIA = When this change is accepted upstream and pulled to this repo.
  • Loading branch information
groelofs authored Feb 11, 2023
1 parent e15ba9d commit a4eefc9
Show file tree
Hide file tree
Showing 10 changed files with 136 additions and 24 deletions.
3 changes: 3 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ allprojects {

repositories {
mavenCentral()
maven {
url "https://linkedin.jfrog.io/artifactory/zookeeper"
}
}

dependencyUpdates {
Expand Down
9 changes: 5 additions & 4 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.zookeeper.client.ZKClientConfig

import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
import scala.collection.{Map, Seq}

Expand Down Expand Up @@ -332,6 +331,7 @@ object Defaults {
val LiLogCleanerFineGrainedLockEnabled = true
val LiDropCorruptedFilesEnabled = false
val LiConsumerFetchSampleRatio = 0.01
val LiZookeeperPaginationEnable = false
}

object KafkaConfig {
Expand Down Expand Up @@ -451,6 +451,7 @@ object KafkaConfig {
val LiLogCleanerFineGrainedLockEnableProp = "li.log.cleaner.fine.grained.lock.enable"
val LiDropCorruptedFilesEnableProp = "li.drop.corrupted.files.enable"
val LiConsumerFetchSampleRatioProp = "li.consumer.fetch.sample.ratio"
val LiZookeeperPaginationEnableProp = "li.zookeeper.pagination.enable"
val AllowPreferredControllerFallbackProp = "allow.preferred.controller.fallback"
val UnofficialClientLoggingEnableProp = "unofficial.client.logging.enable"
val UnofficialClientCacheTtlProp = "unofficial.client.cache.ttl"
Expand Down Expand Up @@ -794,6 +795,7 @@ object KafkaConfig {
val LiDenyAlterIsrDoc = "Test only config, and never enable this in a real cluster. Specifies whether controller should deny the AlterISRRequest."
val LiNumControllerInitThreadsDoc = "Number of threads (and Zookeeper clients + connections) to be used while recursing the topic-partitions tree in Zookeeper during controller startup/failover."
val LiLogCleanerFineGrainedLockEnableDoc = "Specifies whether the log cleaner should use fine grained locks when calculating the filthiest log to clean"
val LiZookeeperPaginationEnableDoc = "Specifies whether Zookeeper pagination should be used when listing the /brokers/topics znode. Required when sum of all topic-name lengths in the cluster exceeds ZK response-size limit (1 MB by default)."
// Although AllowPreferredControllerFallback is expected to be configured dynamically at per cluster level, providing a static configuration entry
// here allows its value to be obtained without holding the dynamic broker configuration lock.
val AllowPreferredControllerFallbackDoc = "Specifies whether a non-preferred controller node (broker) is allowed to become the controller." +
Expand Down Expand Up @@ -1158,7 +1160,6 @@ object KafkaConfig {
"zero. When closing/shutting down an observer, most time is spent on flushing the observed stats. The reasonable timeout should be close to " +
"the time it takes to flush the stats."

@nowarn("cat=deprecation")
private[server] val configDef = {
import ConfigDef.Importance._
import ConfigDef.Range._
Expand Down Expand Up @@ -1240,6 +1241,7 @@ object KafkaConfig {
.define(LiLogCleanerFineGrainedLockEnableProp, BOOLEAN, Defaults.LiLogCleanerFineGrainedLockEnabled, LOW, LiLogCleanerFineGrainedLockEnableDoc)
.define(LiDropCorruptedFilesEnableProp, BOOLEAN, Defaults.LiDropCorruptedFilesEnabled, HIGH, LiDropCorruptedFilesEnableDoc)
.define(LiConsumerFetchSampleRatioProp, DOUBLE, Defaults.LiConsumerFetchSampleRatio, between(0.0, 1.0), LOW, LiConsumerFetchSampleRatioDoc)
.define(LiZookeeperPaginationEnableProp, BOOLEAN, Defaults.LiZookeeperPaginationEnable, LOW, LiZookeeperPaginationEnableDoc)
.define(AllowPreferredControllerFallbackProp, BOOLEAN, Defaults.AllowPreferredControllerFallback, HIGH, AllowPreferredControllerFallbackDoc)
.define(UnofficialClientLoggingEnableProp, BOOLEAN, Defaults.UnofficialClientLoggingEnable, LOW, UnofficialClientLoggingEnableDoc)
.define(UnofficialClientCacheTtlProp, LONG, Defaults.UnofficialClientCacheTtl, LOW, UnofficialClientCacheTtlDoc)
Expand Down Expand Up @@ -1784,6 +1786,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
def liLogCleanerFineGrainedLockEnable = getBoolean(KafkaConfig.LiLogCleanerFineGrainedLockEnableProp)
val liDropCorruptedFilesEnable = getBoolean(KafkaConfig.LiDropCorruptedFilesEnableProp)
val liConsumerFetchSampleRatio = getDouble(KafkaConfig.LiConsumerFetchSampleRatioProp)
def liZookeeperPaginationEnable = getBoolean(KafkaConfig.LiZookeeperPaginationEnableProp)
def unofficialClientLoggingEnable = getBoolean(KafkaConfig.UnofficialClientLoggingEnableProp)
def unofficialClientCacheTtl = getLong(KafkaConfig.UnofficialClientCacheTtlProp)
def expectedClientSoftwareNames = getList(KafkaConfig.ExpectedClientSoftwareNamesProp)
Expand Down Expand Up @@ -1866,7 +1869,6 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO

// We keep the user-provided String as `ApiVersion.apply` can choose a slightly different version (eg if `0.10.0`
// is passed, `0.10.0-IV0` may be picked)
@nowarn("cat=deprecation")
private val logMessageFormatVersionString = getString(KafkaConfig.LogMessageFormatVersionProp)

/* See `TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG` for details */
Expand Down Expand Up @@ -2151,7 +2153,6 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO

validateValues()

@nowarn("cat=deprecation")
private def validateValues(): Unit = {
if (requiresZookeeper) {
if (zkConnect == null) {
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/server/KafkaServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,8 @@ class KafkaServer(
// for tests with multiple brokers running and probably useful for multi-host log-grepping in prod)
_zkClients += KafkaZkClient(config.zkConnect, secureAclsEnabled, config.zkSessionTimeoutMs,
config.zkConnectionTimeoutMs, config.zkMaxInFlightRequests, time, name = s"zk${i} Kafka server",
zkClientConfig = zkClientConfig, createChrootIfNecessary = true)
zkClientConfig = zkClientConfig, createChrootIfNecessary = true,
paginateTopics = config.liZookeeperPaginationEnable)
}

zkClient.createTopLevelPaths()
Expand Down
14 changes: 9 additions & 5 deletions core/src/main/scala/kafka/zk/KafkaZkClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,10 @@ import scala.collection.{Map, Seq, mutable}
* easier to migrate away from `ZkUtils` (since removed). We should revisit this. We should also consider whether a
* monolithic [[kafka.zk.ZkData]] is the way to go.
*/
class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boolean, time: Time) extends AutoCloseable with
Logging with KafkaMetricsGroup {
class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient,
isSecure: Boolean,
time: Time,
paginateTopics: Boolean) extends AutoCloseable with Logging with KafkaMetricsGroup {

override def metricName(name: String, metricTags: scala.collection.Map[String, String]): MetricName = {
explicitMetricName("kafka.server", "ZooKeeperClientMetrics", name, metricTags)
Expand Down Expand Up @@ -546,7 +548,8 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
*/
def getAllTopicsInCluster(registerWatch: Boolean = false): Set[String] = {
val getChildrenResponse = retryRequestUntilConnected(
GetChildrenRequest(TopicsZNode.path, registerWatch))
if (paginateTopics) GetChildrenPaginatedRequest(TopicsZNode.path, registerWatch)
else GetChildrenRequest(TopicsZNode.path, registerWatch))
getChildrenResponse.resultCode match {
case Code.OK => getChildrenResponse.children.toSet
case Code.NONODE => Set.empty
Expand Down Expand Up @@ -2096,7 +2099,8 @@ object KafkaZkClient {
zkClientConfig: ZKClientConfig,
metricGroup: String = "kafka.server",
metricType: String = "SessionExpireListener",
createChrootIfNecessary: Boolean = false
createChrootIfNecessary: Boolean = false,
paginateTopics: Boolean = false
): KafkaZkClient = {

/* ZooKeeper 3.6.0 changed the default configuration for JUTE_MAXBUFFER from 4 MB to 1 MB.
Expand Down Expand Up @@ -2131,7 +2135,7 @@ object KafkaZkClient {
}
val zooKeeperClient = new ZooKeeperClient(connectString, sessionTimeoutMs, connectionTimeoutMs, maxInFlightRequests,
time, metricGroup, metricType, zkClientConfig, name)
new KafkaZkClient(zooKeeperClient, isSecure, time)
new KafkaZkClient(zooKeeperClient, isSecure, time, paginateTopics)
}

// A helper function to transform a regular request into a MultiRequest
Expand Down
53 changes: 46 additions & 7 deletions core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package kafka.zookeeper
import java.util.Locale
import java.util.concurrent.locks.{ReentrantLock, ReentrantReadWriteLock}
import java.util.concurrent._
import java.util.function.{Consumer, Supplier}
import java.util.{List => JList}

import com.yammer.metrics.core.MetricName
Expand Down Expand Up @@ -47,12 +48,15 @@ object ZooKeeperClient {
/**
* A ZooKeeper client that encourages pipelined requests.
*
* @param connectString comma separated host:port pairs, each corresponding to a zk server
* @param connectString comma-separated host:port pairs, each corresponding to a zk server
* @param sessionTimeoutMs session timeout in milliseconds
* @param connectionTimeoutMs connection timeout in milliseconds
* @param maxInFlightRequests maximum number of unacknowledged requests the client will send before blocking.
* @param name name of the client instance
* @param maxInFlightRequests maximum number of unacknowledged requests the client will send before blocking
* @param time clock for timing; can be overridden or mocked in tests
* @param metricGroup name of the group of ZK-related metrics
* @param metricType type of the ZK-related metrics
* @param zkClientConfig ZooKeeper client configuration, for TLS configs if desired
* @param name name of the client instance
*/
class ZooKeeperClient(connectString: String,
sessionTimeoutMs: Int,
Expand All @@ -61,7 +65,7 @@ class ZooKeeperClient(connectString: String,
time: Time,
metricGroup: String,
metricType: String,
private[zookeeper] val clientConfig: ZKClientConfig,
private[zookeeper] val zkClientConfig: ZKClientConfig,
name: String) extends Logging with KafkaMetricsGroup {

this.logIdent = s"[ZooKeeperClient $name] "
Expand All @@ -75,6 +79,11 @@ class ZooKeeperClient(connectString: String,
private[zookeeper] val reinitializeScheduler = new KafkaScheduler(threads = 1, s"zk-client-${threadPrefix}reinit-")
private var isFirstConnectionEstablished = false

// There should never be more than one controller-failover call at a time, but there are no guarantees
// for end-user listTopics() calls, and zookeeper.max.in.flight.requests defaults to 10 (and apparently
// is not overridden), so go with 10 threads as a safe bet:
private val zkPaginationExecutor = Executors.newFixedThreadPool(10)

private val metricNames = Set[String]()

// The state map has to be created before creating ZooKeeper since it's needed in the ZooKeeper callback.
Expand All @@ -98,7 +107,7 @@ class ZooKeeperClient(connectString: String,
info(s"Initializing a new session to $connectString.")
// Fail-fast if there's an error during construction (so don't call initialize, which retries forever)
@volatile private var zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher,
clientConfig)
zkClientConfig)

newGauge("SessionState", () => connectionState.toString)

Expand Down Expand Up @@ -198,6 +207,23 @@ class ZooKeeperClient(connectString: String,
callback(GetChildrenResponse(Code.get(rc), path, Option(ctx), Option(children).map(_.asScala).getOrElse(Seq.empty),
stat, responseMetadata(sendTimeMs)))
}, ctx.orNull)
case GetChildrenPaginatedRequest(path, _, ctx) =>
CompletableFuture.supplyAsync(new Supplier[GetChildrenPaginatedResponse]() {
def get(): GetChildrenPaginatedResponse = {
var topicsList: JList[String] = null
var resultCode: Code = Code.OK
try {
topicsList = zooKeeper.getAllChildrenPaginated(path, shouldWatch(request))
} catch {
case e: KeeperException =>
resultCode = e.code
}
GetChildrenPaginatedResponse(resultCode, path, Option(ctx), Option(topicsList).map(_.asScala).getOrElse(Seq.empty),
responseMetadata(sendTimeMs))
}
}, zkPaginationExecutor).thenAccept(new Consumer[GetChildrenPaginatedResponse] {
override def accept(response: GetChildrenPaginatedResponse) = callback(response)
})
case CreateRequest(path, data, acl, createMode, ctx) =>
zooKeeper.create(path, data, acl.asJava, createMode,
(rc, path, ctx, name) =>
Expand Down Expand Up @@ -270,6 +296,7 @@ class ZooKeeperClient(connectString: String,
// may need to be updated.
private def shouldWatch(request: AsyncRequest): Boolean = request match {
case GetChildrenRequest(_, registerWatch, _) => registerWatch && zNodeChildChangeHandlers.contains(request.path)
case GetChildrenPaginatedRequest(_, registerWatch, _) => registerWatch && zNodeChildChangeHandlers.contains(request.path)
case _: ExistsRequest | _: GetDataRequest => zNodeChangeHandlers.contains(request.path)
case _ => throw new IllegalArgumentException(s"Request $request is not watchable")
}
Expand Down Expand Up @@ -371,7 +398,7 @@ class ZooKeeperClient(connectString: String,
var connected = false
while (!connected) {
try {
zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher, clientConfig)
zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher, zkClientConfig)
connected = true
} catch {
case e: Exception =>
Expand Down Expand Up @@ -542,6 +569,11 @@ case class GetChildrenRequest(path: String, registerWatch: Boolean, ctx: Option[
type Response = GetChildrenResponse
}

// GetChildrenPaginatedResponse is identical to GetChildrenResponse except no Stat argument
case class GetChildrenPaginatedRequest(path: String, registerWatch: Boolean, ctx: Option[Any] = None) extends AsyncRequest {
type Response = GetChildrenPaginatedResponse
}

case class MultiRequest(zkOps: Seq[ZkOp], ctx: Option[Any] = None) extends AsyncRequest {
type Response = MultiResponse

Expand Down Expand Up @@ -569,6 +601,11 @@ sealed abstract class AsyncResponse {
def metadata: ResponseMetadata
}

sealed trait GetChildrenResponseNoStat extends AsyncResponse {
def children: Seq[String]
def metadata: ResponseMetadata
}

case class ResponseMetadata(sendTimeMs: Long, receivedTimeMs: Long) {
def responseTimeMs: Long = receivedTimeMs - sendTimeMs
}
Expand All @@ -588,7 +625,9 @@ case class GetAclResponse(resultCode: Code, path: String, ctx: Option[Any], acl:
case class SetAclResponse(resultCode: Code, path: String, ctx: Option[Any], stat: Stat,
metadata: ResponseMetadata) extends AsyncResponse
case class GetChildrenResponse(resultCode: Code, path: String, ctx: Option[Any], children: Seq[String], stat: Stat,
metadata: ResponseMetadata) extends AsyncResponse
metadata: ResponseMetadata) extends GetChildrenResponseNoStat
case class GetChildrenPaginatedResponse(resultCode: Code, path: String, ctx: Option[Any], children: Seq[String],
metadata: ResponseMetadata) extends GetChildrenResponseNoStat
case class MultiResponse(resultCode: Code, path: String, ctx: Option[Any], zkOpResults: Seq[ZkOpResult],
metadata: ResponseMetadata) extends AsyncResponse

Expand Down
9 changes: 9 additions & 0 deletions core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@ import java.net.InetSocketAddress

import org.apache.kafka.common.utils.Utils

object EmbeddedZookeeper {
// The ZK jute.maxbuffer config defines the maximum response size from the ZK server (anything larger throws an
// exception and disconnects the ZK client). The historical value was 4194304 bytes, but several years ago it was
// reduced to 1048575 (per https://zookeeper.apache.org/doc/r3.6.2/zookeeperAdmin.html) for performance reasons,
// so use that for more realistic tests:
final val JUTE_MAXBUFFER_VALUE: String = "1048575"
}

/**
* ZooKeeperServer wrapper that starts the server with temporary directories during construction and deletes
* the directories when `shutdown()` is called.
Expand All @@ -41,6 +49,7 @@ class EmbeddedZookeeper() extends Logging {
val tickTime = 800 // allow a maxSessionTimeout of 20 * 800ms = 16 secs

System.setProperty("zookeeper.forceSync", "no") //disable fsync to ZK txn log in tests to avoid timeout
System.setProperty("jute.maxbuffer", EmbeddedZookeeper.JUTE_MAXBUFFER_VALUE) // configure realistic response-buffer size
val zookeeper = new ZooKeeperServer(snapshotDir, logDir, tickTime)
val factory = new NIOServerCnxnFactory()
private val addr = new InetSocketAddress("127.0.0.1", TestUtils.RandomPort)
Expand Down
Loading

0 comments on commit a4eefc9

Please sign in to comment.