Skip to content

Commit

Permalink
[LI-HOTFIX] Add a hook for transforming rack ID used by rack-aware re…
Browse files Browse the repository at this point in the history
…plica assignment (#438)

TICKET = LIKAFKA-49537, LIKAFKA-50843
EXIT_CRITERIA = N/A, but will need significant porting for KRaft
LI_DESCRIPTION = This is the first step for enabling fault-domain-aware replica assignment.
The full implementation is to encode the fault domain into rack ID (so that we don't break the open-source CC interface) and let CC take that encoded fault domain into consideration when performing rebalance.

In order to prevent assignment logic flaws during the rollout phase of encoded rack ID, we need to let the broker be able to handle the intermediate state that encoded/unencoded rack IDs are mixed in the cluster.
To achieve so, we start by adding a hook interface, RackAwareReplicaAssignmentRackIdMapper, for the LI internal `kafka-server` to inject the logic for processing encoded rack ID.

Since the logic is pretty critical, an integration test is also added to test if a forged mapper class can be incorporated into the replica assignment flow.
  • Loading branch information
lmr3796 authored Feb 23, 2023
1 parent d8e93c3 commit 27cc188
Show file tree
Hide file tree
Showing 10 changed files with 264 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.admin;

/**
* A transformer for mapping rack ID to different values.
*
* This is to be used to customize rack Id interpretation for extra encoding, specifically should be used on broker side's partition assignment code path.
* For other code path like {@link TopicCommand} invoked from kafka-*.sh, it would require extra effort for injection.
*/
@FunctionalInterface
public interface RackAwareReplicaAssignmentRackIdMapper {
/**
* @param rackId
* @return Transformed rackId from raw rackId
*/
String apply(String rackId);
}
10 changes: 6 additions & 4 deletions core/src/main/scala/kafka/admin/AdminUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ object AdminUtils extends Logging {
nPartitions: Int,
replicationFactor: Int,
fixedStartIndex: Int = -1,
startPartitionId: Int = -1): Map[Int, Seq[Int]] = {
startPartitionId: Int = -1,
rackIdMapperForBrokerAssignment: RackAwareReplicaAssignmentRackIdMapper = identity): Map[Int, Seq[Int]] = {
if (nPartitions <= 0)
throw new InvalidPartitionsException("Number of partitions must be larger than 0.")
if (replicationFactor <= 0)
Expand All @@ -118,7 +119,7 @@ object AdminUtils extends Logging {
if (brokerMetadatas.exists(_.rack.isEmpty))
throw new AdminOperationException("Not all brokers have rack information for replica rack aware assignment.")
assignReplicasToBrokersRackAware(nPartitions, replicationFactor, brokerMetadatas, fixedStartIndex,
startPartitionId)
startPartitionId, rackIdMapperForBrokerAssignment)
}
}

Expand Down Expand Up @@ -149,9 +150,10 @@ object AdminUtils extends Logging {
replicationFactor: Int,
brokerMetadatas: Iterable[BrokerMetadata],
fixedStartIndex: Int,
startPartitionId: Int): Map[Int, Seq[Int]] = {
startPartitionId: Int,
rackIdMapperForBrokerAssignment: RackAwareReplicaAssignmentRackIdMapper): Map[Int, Seq[Int]] = {
val brokerRackMap = brokerMetadatas.collect { case BrokerMetadata(id, Some(rack)) =>
id -> rack
id -> rackIdMapperForBrokerAssignment.apply(rack)
}.toMap
val numRacks = brokerRackMap.values.toSet.size
val arrangedBrokerList = getRackAlternatedBrokerList(brokerRackMap)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,10 @@ object ReassignPartitionsCommand extends Logging {
val proposedAssignments = mutable.Map[TopicPartition, Seq[Int]]()
groupedByTopic.forKeyValue { (topic, assignment) =>
val (_, replicas) = assignment.head
// TODO:
// This is for `./kafka-reassign-partitions.sh --generate` an "external operation" code path that we don't use in LI.
// If in the future, we want to use it and make it compliant with our new rackIdMapperForBrokerAssignment,
// will need to be modified to and repackaged to generate a new kafka-utils package to support passing it in.
val assignedReplicas = AdminUtils.
assignReplicasToBrokers(brokerMetadatas, assignment.size, replicas.size)
proposedAssignments ++= assignedReplicas.map { case (partition, replicas) =>
Expand Down
10 changes: 7 additions & 3 deletions core/src/main/scala/kafka/controller/KafkaController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1268,7 +1268,9 @@ class KafkaController(val config: KafkaConfig,
val numReplica = partitionMap.head._2.replicas.size
val brokers = controllerContext.liveOrShuttingDownBrokers.map { b => kafka.admin.BrokerMetadata(b.id, b.rack) }.toSeq

val replicaAssignment = adminZkClient.assignReplicasToAvailableBrokers(brokers, noNewPartitionBrokerIds.toSet, numPartitions, numReplica)
val replicaAssignment =
adminZkClient.assignReplicasToAvailableBrokers(brokers, noNewPartitionBrokerIds.toSet, numPartitions, numReplica,
rackIdMapperForRackAwareReplicaAssignment = config.rackIdMapperForRackAwareReplicaAssignment)
adminZkClient.writeTopicPartitionAssignment(topic, replicaAssignment.mapValues(ReplicaAssignment(_)).toMap, true)
info(s"Rearrange partition and replica assignment for topic [$topic]")
}
Expand Down Expand Up @@ -3001,8 +3003,10 @@ class KafkaController(val config: KafkaConfig,
val topicId = topicsIdReplicaAssignment.topicId
val numPartitions = topicsIdReplicaAssignment.assignment.size
val assignment =
adminZkClient.assignReplicasToAvailableBrokers(brokers, noNewPartitionBrokerIds, numPartitions, replicationFactor)
.map { case(partition, replicas) => (new TopicPartition(topic, partition), ReplicaAssignment(replicas))}
adminZkClient
.assignReplicasToAvailableBrokers(brokers, noNewPartitionBrokerIds, numPartitions, replicationFactor,
rackIdMapperForRackAwareReplicaAssignment = config.rackIdMapperForRackAwareReplicaAssignment)
.map { case(partition, replicas) => (new TopicPartition(topic, partition), ReplicaAssignment(replicas)) }
zkClient.setTopicAssignment(topic, topicId, assignment, controllerContext.epochZkVersion)
info(s"Updated topic [$topic] with $assignment for replica assignment")
}
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package kafka.server

import kafka.admin.RackAwareReplicaAssignmentRackIdMapper

import java.io.File
import java.util
import java.util.{Collections, Locale, Properties}
Expand Down Expand Up @@ -332,6 +334,7 @@ object Defaults {
val LiDropCorruptedFilesEnabled = false
val LiConsumerFetchSampleRatio = 0.01
val LiZookeeperPaginationEnable = false
val LiRackIdMapperClassNameForRackAwareReplicaAssignment: String = null
}

object KafkaConfig {
Expand Down Expand Up @@ -452,6 +455,7 @@ object KafkaConfig {
val LiDropCorruptedFilesEnableProp = "li.drop.corrupted.files.enable"
val LiConsumerFetchSampleRatioProp = "li.consumer.fetch.sample.ratio"
val LiZookeeperPaginationEnableProp = "li.zookeeper.pagination.enable"
val LiRackIdMapperClassNameForRackAwareReplicaAssignmentProp = "li.rack.aware.assignment.rack.id.mapper.class"
val AllowPreferredControllerFallbackProp = "allow.preferred.controller.fallback"
val UnofficialClientLoggingEnableProp = "unofficial.client.logging.enable"
val UnofficialClientCacheTtlProp = "unofficial.client.cache.ttl"
Expand Down Expand Up @@ -796,6 +800,7 @@ object KafkaConfig {
val LiNumControllerInitThreadsDoc = "Number of threads (and Zookeeper clients + connections) to be used while recursing the topic-partitions tree in Zookeeper during controller startup/failover."
val LiLogCleanerFineGrainedLockEnableDoc = "Specifies whether the log cleaner should use fine grained locks when calculating the filthiest log to clean"
val LiZookeeperPaginationEnableDoc = "Specifies whether Zookeeper pagination should be used when listing the /brokers/topics znode. Required when sum of all topic-name lengths in the cluster exceeds ZK response-size limit (1 MB by default)."
val LiRackIdMapperClassNameForRackAwareReplicaAssignmentDoc = "The mapper class name to translate rack ID for the use of assigning replicas to brokers in a rack-aware manner. The class should implement kafka.admin.RackAwareReplicaAssignmentRackIdMapper."
// Although AllowPreferredControllerFallback is expected to be configured dynamically at per cluster level, providing a static configuration entry
// here allows its value to be obtained without holding the dynamic broker configuration lock.
val AllowPreferredControllerFallbackDoc = "Specifies whether a non-preferred controller node (broker) is allowed to become the controller." +
Expand Down Expand Up @@ -1242,6 +1247,7 @@ object KafkaConfig {
.define(LiDropCorruptedFilesEnableProp, BOOLEAN, Defaults.LiDropCorruptedFilesEnabled, HIGH, LiDropCorruptedFilesEnableDoc)
.define(LiConsumerFetchSampleRatioProp, DOUBLE, Defaults.LiConsumerFetchSampleRatio, between(0.0, 1.0), LOW, LiConsumerFetchSampleRatioDoc)
.define(LiZookeeperPaginationEnableProp, BOOLEAN, Defaults.LiZookeeperPaginationEnable, LOW, LiZookeeperPaginationEnableDoc)
.define(LiRackIdMapperClassNameForRackAwareReplicaAssignmentProp, STRING, Defaults.LiRackIdMapperClassNameForRackAwareReplicaAssignment, LOW, LiRackIdMapperClassNameForRackAwareReplicaAssignmentDoc)
.define(AllowPreferredControllerFallbackProp, BOOLEAN, Defaults.AllowPreferredControllerFallback, HIGH, AllowPreferredControllerFallbackDoc)
.define(UnofficialClientLoggingEnableProp, BOOLEAN, Defaults.UnofficialClientLoggingEnable, LOW, UnofficialClientLoggingEnableDoc)
.define(UnofficialClientCacheTtlProp, LONG, Defaults.UnofficialClientCacheTtl, LOW, UnofficialClientCacheTtlDoc)
Expand Down Expand Up @@ -1831,6 +1837,10 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
/***************** rack configuration **************/
val rack = Option(getString(KafkaConfig.RackProp))
val replicaSelectorClassName = Option(getString(KafkaConfig.ReplicaSelectorClassProp))
val rackIdMapperForRackAwareReplicaAssignment: RackAwareReplicaAssignmentRackIdMapper =
Option(getString(KafkaConfig.LiRackIdMapperClassNameForRackAwareReplicaAssignmentProp))
.map(className => CoreUtils.createObject[RackAwareReplicaAssignmentRackIdMapper](className))
.getOrElse((rackId: String) => rackId)

/** ********* Log Configuration ***********/
def autoCreateTopicsEnable: java.lang.Boolean = getBoolean(KafkaConfig.AutoCreateTopicsEnableProp)
Expand Down
7 changes: 5 additions & 2 deletions core/src/main/scala/kafka/server/ZkAdminManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@ class ZkAdminManager(val config: KafkaConfig,
val assignments = if (topic.assignments.isEmpty) {
adminZkClient.assignReplicasToAvailableBrokers(
brokers, controller.partitionUnassignableBrokerIds.toSet,
resolvedNumPartitions, resolvedReplicationFactor
resolvedNumPartitions, resolvedReplicationFactor,
rackIdMapperForRackAwareReplicaAssignment = config.rackIdMapperForRackAwareReplicaAssignment
)
} else {
val assignments = new mutable.HashMap[Int, Seq[Int]]
Expand Down Expand Up @@ -352,7 +353,9 @@ class ZkAdminManager(val config: KafkaConfig,

val assignmentForNewPartitions = adminZkClient.createNewPartitionsAssignment(
topic, existingAssignment, allBrokers, newPartition.count, newPartitionsAssignment,
noNewPartitionBrokerIds = controller.partitionUnassignableBrokerIds.toSet)
noNewPartitionBrokerIds = controller.partitionUnassignableBrokerIds.toSet,
config.rackIdMapperForRackAwareReplicaAssignment
)

if (validateOnly) {
CreatePartitionsMetadata(topic, (existingAssignment ++ assignmentForNewPartitions).keySet)
Expand Down
47 changes: 31 additions & 16 deletions core/src/main/scala/kafka/zk/AdminZkClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
package kafka.zk

import java.util.Properties

import kafka.admin.{AdminOperationException, AdminUtils, BrokerMetadata, RackAwareMode}
import kafka.admin.{AdminOperationException, AdminUtils, BrokerMetadata, RackAwareMode, RackAwareReplicaAssignmentRackIdMapper}
import kafka.common.TopicAlreadyMarkedForDeletionException
import kafka.controller.ReplicaAssignment
import kafka.log.LogConfig
Expand Down Expand Up @@ -48,16 +47,19 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging {
* @param topicConfig topic configs
* @param rackAwareMode rack aware mode for replica assignment
* @param usesTopicId Boolean indicating whether the topic ID will be created
* @param rackIdMapperForRackAwareReplicaAssignment A transformer for mapping rack ID to different values. This is for customized interpretation of rack ID.
*/
def createTopic(topic: String,
partitions: Int,
replicationFactor: Int,
topicConfig: Properties = new Properties,
rackAwareMode: RackAwareMode = RackAwareMode.Enforced,
usesTopicId: Boolean = false): Unit = {
usesTopicId: Boolean = false,
rackIdMapperForRackAwareReplicaAssignment: RackAwareReplicaAssignmentRackIdMapper = identity): Unit = {
val brokerMetadatas = getBrokerMetadatas(rackAwareMode)
val noNewPartitionBrokerIds = getMaintenanceBrokerList()
val replicaAssignment = assignReplicasToAvailableBrokers(brokerMetadatas, noNewPartitionBrokerIds.toSet, partitions, replicationFactor)
val replicaAssignment = assignReplicasToAvailableBrokers(brokerMetadatas, noNewPartitionBrokerIds.toSet, partitions, replicationFactor,
rackIdMapperForRackAwareReplicaAssignment = rackIdMapperForRackAwareReplicaAssignment)
createTopicWithAssignment(topic, topicConfig, replicaAssignment, usesTopicId = usesTopicId)
}

Expand Down Expand Up @@ -216,27 +218,33 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging {
* @param replicationFactor
* @param fixedStartIndex
* @param startPartitionId
* @param rackIdMapperForRackAwareReplicaAssignment A transformer for mapping rack ID to different values.
* This is to be used to customize rack Id interpretation for extra encoding,
* specifically should be used on broker side's partition assignment code path.
* @return
*/
def assignReplicasToAvailableBrokers(brokerMetadatas: Iterable[BrokerMetadata],
noNewPartitionBrokerIds: Set[Int],
nPartitions: Int,
replicationFactor: Int,
fixedStartIndex: Int = -1,
startPartitionId: Int = -1): Map[Int, Seq[Int]] = {
startPartitionId: Int = -1,
rackIdMapperForRackAwareReplicaAssignment: RackAwareReplicaAssignmentRackIdMapper): Map[Int, Seq[Int]] = {

val availableBrokerMetadata = brokerMetadatas.filter {
brokerMetadata =>
if (noNewPartitionBrokerIds.contains(brokerMetadata.id)) false
else true
brokerMetadata => !noNewPartitionBrokerIds.contains(brokerMetadata.id)
}

if (replicationFactor > availableBrokerMetadata.size) {
val shouldOnlyUseAvailableBrokers: Boolean = replicationFactor <= availableBrokerMetadata.size
if (!shouldOnlyUseAvailableBrokers) {
info(s"Using all brokers for replica assignment since replicationFactor[$replicationFactor] " +
s"is larger than the number of nonMaintenanceBroker[${availableBrokerMetadata.size}]")
AdminUtils.assignReplicasToBrokers(brokerMetadatas, nPartitions, replicationFactor, fixedStartIndex, startPartitionId)
} else
AdminUtils.assignReplicasToBrokers(availableBrokerMetadata, nPartitions, replicationFactor, fixedStartIndex, startPartitionId)
}
AdminUtils.assignReplicasToBrokers(
if (shouldOnlyUseAvailableBrokers) availableBrokerMetadata else brokerMetadatas,
nPartitions, replicationFactor, fixedStartIndex, startPartitionId,
rackIdMapperForRackAwareReplicaAssignment
)
}

/**
Expand All @@ -249,22 +257,25 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging {
* @param numPartitions Number of partitions to be set
* @param replicaAssignment Manual replica assignment, or none
* @param validateOnly If true, validate the parameters without actually adding the partitions
* @param rackIdMapperForRackAwareReplicaAssignment A transformer for mapping rack ID to different values. This is for customized interpretation of rack ID.
* @return the updated replica assignment
*/
def addPartitions(topic: String,
existingAssignment: Map[Int, ReplicaAssignment],
allBrokers: Seq[BrokerMetadata],
numPartitions: Int = 1,
replicaAssignment: Option[Map[Int, Seq[Int]]] = None,
validateOnly: Boolean = false): Map[Int, Seq[Int]] = {
validateOnly: Boolean = false,
rackIdMapperForRackAwareReplicaAssignment: RackAwareReplicaAssignmentRackIdMapper = identity): Map[Int, Seq[Int]] = {

val proposedAssignmentForNewPartitions = createNewPartitionsAssignment(
topic,
existingAssignment,
allBrokers,
numPartitions,
replicaAssignment,
getMaintenanceBrokerList().toSet
getMaintenanceBrokerList().toSet,
rackIdMapperForRackAwareReplicaAssignment
)

if (validateOnly) {
Expand All @@ -286,14 +297,17 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging {
* @param numPartitions Number of partitions to be set
* @param replicaAssignment Manual replica assignment, or none
* @param noNewPartitionBrokerIds Brokers that do not take new partitions
* @param rackIdMapperForRackAwareReplicaAssignment A transformer for mapping rack ID to different values. This is for customized interpretation of rack ID.
*
* @return the assignment for the new partitions
*/
def createNewPartitionsAssignment(topic: String,
existingAssignment: Map[Int, ReplicaAssignment],
allBrokers: Seq[BrokerMetadata],
numPartitions: Int = 1,
replicaAssignment: Option[Map[Int, Seq[Int]]] = None,
noNewPartitionBrokerIds: Set[Int] = Set.empty[Int]): Map[Int, ReplicaAssignment] = {
noNewPartitionBrokerIds: Set[Int] = Set.empty[Int],
rackIdMapperForRackAwareReplicaAssignment: RackAwareReplicaAssignmentRackIdMapper): Map[Int, ReplicaAssignment] = {
val existingAssignmentPartition0 = existingAssignment.getOrElse(0,
throw new AdminOperationException(
s"Unexpected existing replica assignment for topic '$topic', partition id 0 is missing. " +
Expand All @@ -314,7 +328,8 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging {
val proposedAssignmentForNewPartitions = replicaAssignment.getOrElse {
val startIndex = math.max(0, allBrokers.indexWhere(_.id >= existingAssignmentPartition0.head))
assignReplicasToAvailableBrokers(allBrokers, noNewPartitionBrokerIds, partitionsToAdd,
existingAssignmentPartition0.size, startIndex, existingAssignment.size)
existingAssignmentPartition0.size, startIndex, existingAssignment.size,
rackIdMapperForRackAwareReplicaAssignment = rackIdMapperForRackAwareReplicaAssignment)
}

proposedAssignmentForNewPartitions.map { case (tp, replicas) =>
Expand Down
Loading

0 comments on commit 27cc188

Please sign in to comment.