From a4eefc95a043ac48eac794563ffeb510e246073b Mon Sep 17 00:00:00 2001 From: Greg Roelofs Date: Fri, 10 Feb 2023 17:54:24 -0800 Subject: [PATCH] [LI-HOTFIX] Add Zookeeper pagination support for /brokers/topics znode (#435) TICKET = LIKAFKA-49497 LI_DESCRIPTION = Switch from Apache to LI Zookeeper dependency and add a GetAllChildrenPaginated option for the /brokers/topics znode (which supports 'list topics' responses greater than 1 MB). The feature is controlled by a new li.zookeeper.pagination.enable config (default = false), with the intention that it be enabled only for critical clusters, at least until it's proven itself in battle. EXIT_CRITERIA = When this change is accepted upstream and pulled to this repo. --- build.gradle | 3 + .../main/scala/kafka/server/KafkaConfig.scala | 9 +-- .../main/scala/kafka/server/KafkaServer.scala | 3 +- .../main/scala/kafka/zk/KafkaZkClient.scala | 14 +++-- .../kafka/zookeeper/ZooKeeperClient.scala | 53 +++++++++++++--- .../unit/kafka/zk/EmbeddedZookeeper.scala | 9 +++ .../unit/kafka/zk/KafkaZkClientTest.scala | 61 ++++++++++++++++++- .../kafka/zookeeper/ZooKeeperClientTest.scala | 2 +- gradle/dependencies.gradle | 4 +- .../jmh/server/PartitionCreationBench.java | 2 +- 10 files changed, 136 insertions(+), 24 deletions(-) diff --git a/build.gradle b/build.gradle index 03803f0af4653..87737b73b1b71 100644 --- a/build.gradle +++ b/build.gradle @@ -58,6 +58,9 @@ allprojects { repositories { mavenCentral() + maven { + url "https://linkedin.jfrog.io/artifactory/zookeeper" + } } dependencyUpdates { diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 74115244e4013..43b87da67df34 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -49,7 +49,6 @@ import org.apache.kafka.server.authorizer.Authorizer import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig import org.apache.zookeeper.client.ZKClientConfig -import scala.annotation.nowarn import scala.jdk.CollectionConverters._ import scala.collection.{Map, Seq} @@ -332,6 +331,7 @@ object Defaults { val LiLogCleanerFineGrainedLockEnabled = true val LiDropCorruptedFilesEnabled = false val LiConsumerFetchSampleRatio = 0.01 + val LiZookeeperPaginationEnable = false } object KafkaConfig { @@ -451,6 +451,7 @@ object KafkaConfig { val LiLogCleanerFineGrainedLockEnableProp = "li.log.cleaner.fine.grained.lock.enable" val LiDropCorruptedFilesEnableProp = "li.drop.corrupted.files.enable" val LiConsumerFetchSampleRatioProp = "li.consumer.fetch.sample.ratio" + val LiZookeeperPaginationEnableProp = "li.zookeeper.pagination.enable" val AllowPreferredControllerFallbackProp = "allow.preferred.controller.fallback" val UnofficialClientLoggingEnableProp = "unofficial.client.logging.enable" val UnofficialClientCacheTtlProp = "unofficial.client.cache.ttl" @@ -794,6 +795,7 @@ object KafkaConfig { val LiDenyAlterIsrDoc = "Test only config, and never enable this in a real cluster. Specifies whether controller should deny the AlterISRRequest." val LiNumControllerInitThreadsDoc = "Number of threads (and Zookeeper clients + connections) to be used while recursing the topic-partitions tree in Zookeeper during controller startup/failover." val LiLogCleanerFineGrainedLockEnableDoc = "Specifies whether the log cleaner should use fine grained locks when calculating the filthiest log to clean" + val LiZookeeperPaginationEnableDoc = "Specifies whether Zookeeper pagination should be used when listing the /brokers/topics znode. Required when sum of all topic-name lengths in the cluster exceeds ZK response-size limit (1 MB by default)." // Although AllowPreferredControllerFallback is expected to be configured dynamically at per cluster level, providing a static configuration entry // here allows its value to be obtained without holding the dynamic broker configuration lock. val AllowPreferredControllerFallbackDoc = "Specifies whether a non-preferred controller node (broker) is allowed to become the controller." + @@ -1158,7 +1160,6 @@ object KafkaConfig { "zero. When closing/shutting down an observer, most time is spent on flushing the observed stats. The reasonable timeout should be close to " + "the time it takes to flush the stats." - @nowarn("cat=deprecation") private[server] val configDef = { import ConfigDef.Importance._ import ConfigDef.Range._ @@ -1240,6 +1241,7 @@ object KafkaConfig { .define(LiLogCleanerFineGrainedLockEnableProp, BOOLEAN, Defaults.LiLogCleanerFineGrainedLockEnabled, LOW, LiLogCleanerFineGrainedLockEnableDoc) .define(LiDropCorruptedFilesEnableProp, BOOLEAN, Defaults.LiDropCorruptedFilesEnabled, HIGH, LiDropCorruptedFilesEnableDoc) .define(LiConsumerFetchSampleRatioProp, DOUBLE, Defaults.LiConsumerFetchSampleRatio, between(0.0, 1.0), LOW, LiConsumerFetchSampleRatioDoc) + .define(LiZookeeperPaginationEnableProp, BOOLEAN, Defaults.LiZookeeperPaginationEnable, LOW, LiZookeeperPaginationEnableDoc) .define(AllowPreferredControllerFallbackProp, BOOLEAN, Defaults.AllowPreferredControllerFallback, HIGH, AllowPreferredControllerFallbackDoc) .define(UnofficialClientLoggingEnableProp, BOOLEAN, Defaults.UnofficialClientLoggingEnable, LOW, UnofficialClientLoggingEnableDoc) .define(UnofficialClientCacheTtlProp, LONG, Defaults.UnofficialClientCacheTtl, LOW, UnofficialClientCacheTtlDoc) @@ -1784,6 +1786,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO def liLogCleanerFineGrainedLockEnable = getBoolean(KafkaConfig.LiLogCleanerFineGrainedLockEnableProp) val liDropCorruptedFilesEnable = getBoolean(KafkaConfig.LiDropCorruptedFilesEnableProp) val liConsumerFetchSampleRatio = getDouble(KafkaConfig.LiConsumerFetchSampleRatioProp) + def liZookeeperPaginationEnable = getBoolean(KafkaConfig.LiZookeeperPaginationEnableProp) def unofficialClientLoggingEnable = getBoolean(KafkaConfig.UnofficialClientLoggingEnableProp) def unofficialClientCacheTtl = getLong(KafkaConfig.UnofficialClientCacheTtlProp) def expectedClientSoftwareNames = getList(KafkaConfig.ExpectedClientSoftwareNamesProp) @@ -1866,7 +1869,6 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO // We keep the user-provided String as `ApiVersion.apply` can choose a slightly different version (eg if `0.10.0` // is passed, `0.10.0-IV0` may be picked) - @nowarn("cat=deprecation") private val logMessageFormatVersionString = getString(KafkaConfig.LogMessageFormatVersionProp) /* See `TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG` for details */ @@ -2151,7 +2153,6 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO validateValues() - @nowarn("cat=deprecation") private def validateValues(): Unit = { if (requiresZookeeper) { if (zkConnect == null) { diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 563eb456d82c8..dbb3e3d884235 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -563,7 +563,8 @@ class KafkaServer( // for tests with multiple brokers running and probably useful for multi-host log-grepping in prod) _zkClients += KafkaZkClient(config.zkConnect, secureAclsEnabled, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, config.zkMaxInFlightRequests, time, name = s"zk${i} Kafka server", - zkClientConfig = zkClientConfig, createChrootIfNecessary = true) + zkClientConfig = zkClientConfig, createChrootIfNecessary = true, + paginateTopics = config.liZookeeperPaginationEnable) } zkClient.createTopLevelPaths() diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala index 94371e3ad86ae..64e7d0dd8b54b 100644 --- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala +++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala @@ -51,8 +51,10 @@ import scala.collection.{Map, Seq, mutable} * easier to migrate away from `ZkUtils` (since removed). We should revisit this. We should also consider whether a * monolithic [[kafka.zk.ZkData]] is the way to go. */ -class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boolean, time: Time) extends AutoCloseable with - Logging with KafkaMetricsGroup { +class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, + isSecure: Boolean, + time: Time, + paginateTopics: Boolean) extends AutoCloseable with Logging with KafkaMetricsGroup { override def metricName(name: String, metricTags: scala.collection.Map[String, String]): MetricName = { explicitMetricName("kafka.server", "ZooKeeperClientMetrics", name, metricTags) @@ -546,7 +548,8 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo */ def getAllTopicsInCluster(registerWatch: Boolean = false): Set[String] = { val getChildrenResponse = retryRequestUntilConnected( - GetChildrenRequest(TopicsZNode.path, registerWatch)) + if (paginateTopics) GetChildrenPaginatedRequest(TopicsZNode.path, registerWatch) + else GetChildrenRequest(TopicsZNode.path, registerWatch)) getChildrenResponse.resultCode match { case Code.OK => getChildrenResponse.children.toSet case Code.NONODE => Set.empty @@ -2096,7 +2099,8 @@ object KafkaZkClient { zkClientConfig: ZKClientConfig, metricGroup: String = "kafka.server", metricType: String = "SessionExpireListener", - createChrootIfNecessary: Boolean = false + createChrootIfNecessary: Boolean = false, + paginateTopics: Boolean = false ): KafkaZkClient = { /* ZooKeeper 3.6.0 changed the default configuration for JUTE_MAXBUFFER from 4 MB to 1 MB. @@ -2131,7 +2135,7 @@ object KafkaZkClient { } val zooKeeperClient = new ZooKeeperClient(connectString, sessionTimeoutMs, connectionTimeoutMs, maxInFlightRequests, time, metricGroup, metricType, zkClientConfig, name) - new KafkaZkClient(zooKeeperClient, isSecure, time) + new KafkaZkClient(zooKeeperClient, isSecure, time, paginateTopics) } // A helper function to transform a regular request into a MultiRequest diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala index bc634a88e33a5..163d446241a93 100755 --- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala +++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala @@ -20,6 +20,7 @@ package kafka.zookeeper import java.util.Locale import java.util.concurrent.locks.{ReentrantLock, ReentrantReadWriteLock} import java.util.concurrent._ +import java.util.function.{Consumer, Supplier} import java.util.{List => JList} import com.yammer.metrics.core.MetricName @@ -47,12 +48,15 @@ object ZooKeeperClient { /** * A ZooKeeper client that encourages pipelined requests. * - * @param connectString comma separated host:port pairs, each corresponding to a zk server + * @param connectString comma-separated host:port pairs, each corresponding to a zk server * @param sessionTimeoutMs session timeout in milliseconds * @param connectionTimeoutMs connection timeout in milliseconds - * @param maxInFlightRequests maximum number of unacknowledged requests the client will send before blocking. - * @param name name of the client instance + * @param maxInFlightRequests maximum number of unacknowledged requests the client will send before blocking + * @param time clock for timing; can be overridden or mocked in tests + * @param metricGroup name of the group of ZK-related metrics + * @param metricType type of the ZK-related metrics * @param zkClientConfig ZooKeeper client configuration, for TLS configs if desired + * @param name name of the client instance */ class ZooKeeperClient(connectString: String, sessionTimeoutMs: Int, @@ -61,7 +65,7 @@ class ZooKeeperClient(connectString: String, time: Time, metricGroup: String, metricType: String, - private[zookeeper] val clientConfig: ZKClientConfig, + private[zookeeper] val zkClientConfig: ZKClientConfig, name: String) extends Logging with KafkaMetricsGroup { this.logIdent = s"[ZooKeeperClient $name] " @@ -75,6 +79,11 @@ class ZooKeeperClient(connectString: String, private[zookeeper] val reinitializeScheduler = new KafkaScheduler(threads = 1, s"zk-client-${threadPrefix}reinit-") private var isFirstConnectionEstablished = false + // There should never be more than one controller-failover call at a time, but there are no guarantees + // for end-user listTopics() calls, and zookeeper.max.in.flight.requests defaults to 10 (and apparently + // is not overridden), so go with 10 threads as a safe bet: + private val zkPaginationExecutor = Executors.newFixedThreadPool(10) + private val metricNames = Set[String]() // The state map has to be created before creating ZooKeeper since it's needed in the ZooKeeper callback. @@ -98,7 +107,7 @@ class ZooKeeperClient(connectString: String, info(s"Initializing a new session to $connectString.") // Fail-fast if there's an error during construction (so don't call initialize, which retries forever) @volatile private var zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher, - clientConfig) + zkClientConfig) newGauge("SessionState", () => connectionState.toString) @@ -198,6 +207,23 @@ class ZooKeeperClient(connectString: String, callback(GetChildrenResponse(Code.get(rc), path, Option(ctx), Option(children).map(_.asScala).getOrElse(Seq.empty), stat, responseMetadata(sendTimeMs))) }, ctx.orNull) + case GetChildrenPaginatedRequest(path, _, ctx) => + CompletableFuture.supplyAsync(new Supplier[GetChildrenPaginatedResponse]() { + def get(): GetChildrenPaginatedResponse = { + var topicsList: JList[String] = null + var resultCode: Code = Code.OK + try { + topicsList = zooKeeper.getAllChildrenPaginated(path, shouldWatch(request)) + } catch { + case e: KeeperException => + resultCode = e.code + } + GetChildrenPaginatedResponse(resultCode, path, Option(ctx), Option(topicsList).map(_.asScala).getOrElse(Seq.empty), + responseMetadata(sendTimeMs)) + } + }, zkPaginationExecutor).thenAccept(new Consumer[GetChildrenPaginatedResponse] { + override def accept(response: GetChildrenPaginatedResponse) = callback(response) + }) case CreateRequest(path, data, acl, createMode, ctx) => zooKeeper.create(path, data, acl.asJava, createMode, (rc, path, ctx, name) => @@ -270,6 +296,7 @@ class ZooKeeperClient(connectString: String, // may need to be updated. private def shouldWatch(request: AsyncRequest): Boolean = request match { case GetChildrenRequest(_, registerWatch, _) => registerWatch && zNodeChildChangeHandlers.contains(request.path) + case GetChildrenPaginatedRequest(_, registerWatch, _) => registerWatch && zNodeChildChangeHandlers.contains(request.path) case _: ExistsRequest | _: GetDataRequest => zNodeChangeHandlers.contains(request.path) case _ => throw new IllegalArgumentException(s"Request $request is not watchable") } @@ -371,7 +398,7 @@ class ZooKeeperClient(connectString: String, var connected = false while (!connected) { try { - zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher, clientConfig) + zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher, zkClientConfig) connected = true } catch { case e: Exception => @@ -542,6 +569,11 @@ case class GetChildrenRequest(path: String, registerWatch: Boolean, ctx: Option[ type Response = GetChildrenResponse } +// GetChildrenPaginatedResponse is identical to GetChildrenResponse except no Stat argument +case class GetChildrenPaginatedRequest(path: String, registerWatch: Boolean, ctx: Option[Any] = None) extends AsyncRequest { + type Response = GetChildrenPaginatedResponse +} + case class MultiRequest(zkOps: Seq[ZkOp], ctx: Option[Any] = None) extends AsyncRequest { type Response = MultiResponse @@ -569,6 +601,11 @@ sealed abstract class AsyncResponse { def metadata: ResponseMetadata } +sealed trait GetChildrenResponseNoStat extends AsyncResponse { + def children: Seq[String] + def metadata: ResponseMetadata +} + case class ResponseMetadata(sendTimeMs: Long, receivedTimeMs: Long) { def responseTimeMs: Long = receivedTimeMs - sendTimeMs } @@ -588,7 +625,9 @@ case class GetAclResponse(resultCode: Code, path: String, ctx: Option[Any], acl: case class SetAclResponse(resultCode: Code, path: String, ctx: Option[Any], stat: Stat, metadata: ResponseMetadata) extends AsyncResponse case class GetChildrenResponse(resultCode: Code, path: String, ctx: Option[Any], children: Seq[String], stat: Stat, - metadata: ResponseMetadata) extends AsyncResponse + metadata: ResponseMetadata) extends GetChildrenResponseNoStat +case class GetChildrenPaginatedResponse(resultCode: Code, path: String, ctx: Option[Any], children: Seq[String], + metadata: ResponseMetadata) extends GetChildrenResponseNoStat case class MultiResponse(resultCode: Code, path: String, ctx: Option[Any], zkOpResults: Seq[ZkOpResult], metadata: ResponseMetadata) extends AsyncResponse diff --git a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala index 28b592eaf7af8..39d6ade307926 100755 --- a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala +++ b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala @@ -24,6 +24,14 @@ import java.net.InetSocketAddress import org.apache.kafka.common.utils.Utils +object EmbeddedZookeeper { + // The ZK jute.maxbuffer config defines the maximum response size from the ZK server (anything larger throws an + // exception and disconnects the ZK client). The historical value was 4194304 bytes, but several years ago it was + // reduced to 1048575 (per https://zookeeper.apache.org/doc/r3.6.2/zookeeperAdmin.html) for performance reasons, + // so use that for more realistic tests: + final val JUTE_MAXBUFFER_VALUE: String = "1048575" +} + /** * ZooKeeperServer wrapper that starts the server with temporary directories during construction and deletes * the directories when `shutdown()` is called. @@ -41,6 +49,7 @@ class EmbeddedZookeeper() extends Logging { val tickTime = 800 // allow a maxSessionTimeout of 20 * 800ms = 16 secs System.setProperty("zookeeper.forceSync", "no") //disable fsync to ZK txn log in tests to avoid timeout + System.setProperty("jute.maxbuffer", EmbeddedZookeeper.JUTE_MAXBUFFER_VALUE) // configure realistic response-buffer size val zookeeper = new ZooKeeperServer(snapshotDir, logDir, tickTime) val factory = new NIOServerCnxnFactory() private val addr = new InetSocketAddress("127.0.0.1", TestUtils.RandomPort) diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala index 2088e5f3c097f..e31b63f4fbcb5 100644 --- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala +++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala @@ -1057,6 +1057,60 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { assertEquals(Set(topicPartition10, topicPartition11), zkClient.getAllPartitions) } + /** + * Create a large number of topic names, such that their total (aggregate) length is more than 1 MB, + * and verify that our ZK client can still read them all despite being configured for 1 MB max response + * size. + * + * NOTE THAT THIS LOOPS INDEFINITELY if paginateTopics = false, so a negative test is currently not + * possible. The problem is that only org.apache.zookeeper.ClientCnxn knows that its SessionExpiredException + * isn't actually due to session expiry but rather "Packet len 1374978 is out of range!", and it doesn't + * bother to pass that information down to our ZooKeeperClient. The fix will probably be to add a counter + * for the number of connection attempts and a new max-retry config that we can set to something small in + * the negative version of this test. TODO/FIXME + */ + @Test + def testGetManyTopicsWithPagination(): Unit = { + // replace zkClient with one supporting pagination + zkClient.close() // ...but don't leave the existing one running, test framework doesn't like that + zkClient = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSaslEnabled), zkSessionTimeout, + zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM, name = "ZooKeeperTestHarness", new ZKClientConfig, + paginateTopics = true) + adminZkClient = new AdminZkClient(zkClient) + zkClient.createControllerEpochRaw(1) + + assertTrue(zkClient.getAllTopicsInCluster().isEmpty) + + val superLongTopicNamePrefix = + "topic-name-of-super-excessive-length-so-as-to-minimize-total-number-of-topics-to-be-created--just-how-lengthy-is-it-I-wonder--here-comes-the-indexing-value-woohoo--" + val numTopics = 8000 + // total size of all topics names we're creating, plus their ZK overhead (i.e., what getChildren() on + // /brokers/topics should return): + var sumOfInputTopicNameLengths = 0 + (0 until numTopics).foreach { j => + if (((j+1) % 1000) == 0) info(s"creating very long topic-name #${j+1}") + val name = s"${superLongTopicNamePrefix}${j}" + sumOfInputTopicNameLengths += name.length + 4 // 4 = measured ZK overhead per znode + zkClient.createRecursive(TopicZNode.path(name)) + } + debug(s"total /brokers/topics getChildren size should be ${sumOfInputTopicNameLengths}") + + val topicsList = zkClient.getAllTopicsInCluster() + debug(s"got back list of ${topicsList.size} topics") + assertEquals(numTopics, topicsList.size) + + // the whole point of our 8000 long-named topics is to exceed EmbeddedZookeeper's (and real ZK's) 1 MB + // jute.maxbuffer size: + assertTrue(sumOfInputTopicNameLengths > Integer.valueOf(EmbeddedZookeeper.JUTE_MAXBUFFER_VALUE)) + // can't just do topicsList.map here since map of a Set is also a Set, and the _lengths_ of our + // topic names are NOT globally unique: + val sumOfOutputTopicNameLengths = topicsList.toSeq.map(_.length + 4).sum + assertEquals(sumOfInputTopicNameLengths, sumOfOutputTopicNameLengths) + // spot-check that the first and last topic names are present: + assertTrue(topicsList.contains(s"${superLongTopicNamePrefix}0")) + assertTrue(topicsList.contains(s"${superLongTopicNamePrefix}${numTopics - 1}")) + } + @Test def testCreateAndGetTopicPartitionStatesRaw(): Unit = { zkClient.createRecursive(TopicZNode.path(topic1)) @@ -1354,8 +1408,9 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { finally client.close() } - // default case - assertEquals("4194304", zkClient.currentZooKeeper.getClientConfig.getProperty(ZKConfig.JUTE_MAXBUFFER)) + // "default" (server-side) case + assertEquals(EmbeddedZookeeper.JUTE_MAXBUFFER_VALUE, + zkClient.currentZooKeeper.getClientConfig.getProperty(ZKConfig.JUTE_MAXBUFFER)) // Value set directly on ZKClientConfig takes precedence over system property System.setProperty(ZKConfig.JUTE_MAXBUFFER, (3000 * 1024).toString) @@ -1371,7 +1426,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { } class ExpiredKafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean, time: Time) - extends KafkaZkClient(zooKeeperClient, isSecure, time) { + extends KafkaZkClient(zooKeeperClient, isSecure, time, false) { // Overwriting this method from the parent class to force the client to re-register the Broker. override def shouldReCreateEphemeralZNode(ephemeralOwnerId: Long): Boolean = { true diff --git a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala index 37954e602ba53..74ec2d22ef3b5 100644 --- a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala +++ b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala @@ -107,7 +107,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { KafkaConfig.setZooKeeperClientProperty(clientConfig, propKey, propVal) val client = newZooKeeperClient(clientConfig = clientConfig) try { - assertEquals(Some(propVal), KafkaConfig.zooKeeperClientProperty(client.clientConfig, propKey)) + assertEquals(Some(propVal), KafkaConfig.zooKeeperClientProperty(client.zkClientConfig, propKey)) // For a sanity check, make sure a bad client connection socket class name generates an exception val badClientConfig = new ZKClientConfig() KafkaConfig.setZooKeeperClientProperty(badClientConfig, propKey, propVal + "BadClassName") diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index a0c4bb9cb8726..d889a8c1bc40c 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -116,7 +116,7 @@ versions += [ snappy: "1.1.8.4", spotbugs: "4.2.2", zinc: "1.3.5", - zookeeper: "3.6.3", + zookeeper: "3.6.3-23", zstd: "1.5.0-4" ] libs += [ @@ -196,7 +196,7 @@ libs += [ slf4jApi: "org.slf4j:slf4j-api:$versions.slf4j", slf4jlog4j: "org.slf4j:slf4j-log4j12:$versions.slf4j", snappy: "org.xerial.snappy:snappy-java:$versions.snappy", - zookeeper: "org.apache.zookeeper:zookeeper:$versions.zookeeper", + zookeeper: "com.linkedin.zookeeper:zookeeper:$versions.zookeeper", jfreechart: "jfreechart:jfreechart:$versions.jfreechart", mavenArtifact: "org.apache.maven:maven-artifact:$versions.mavenArtifact", zstd: "com.github.luben:zstd-jni:$versions.zstd", diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java index 87c48dcfff9aa..593ed76ec3b52 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java @@ -151,7 +151,7 @@ public void setup() { this.metrics, this.time, ""); - KafkaZkClient zkClient = new KafkaZkClient(null, false, Time.SYSTEM) { + KafkaZkClient zkClient = new KafkaZkClient(null, false, Time.SYSTEM, false) { @Override public Properties getEntityConfigs(String rootEntityType, String sanitizedEntityName) { return new Properties();