Skip to content

Commit

Permalink
Exclude topics being deleted from the offlinePartitionCount metric an…
Browse files Browse the repository at this point in the history
…d clean up partitionState in PartitionStateMachine after topic deletion is done (#13)

Exclude topics being deleted from the offlinePartitionCount metric and clean up partitionState in PartitionStateMachine after topic deletion is done

Currently the offlinePartitionCount metric also reports the partitions of the topic
that has already been queued for deletion, which creates noise for the alerting
system, especially for the cluster that has frequent topic deletion operation. This
patch adds a mechanism to exclude partitions already been queued for deletion from
the offlinePartitionCount metric and also remove the in-memory topicsWithDeletionStarted
in TopicDeletionManager since we no longer use it to update the metric.

This patch also addresses a potential memory pressure issue of not cleaning up the in-memory
partition states in PartitionStateMachine even after the topic has already been deleted.
  • Loading branch information
hzxa21 authored Apr 18, 2019
1 parent 76aec3e commit cf18791
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 47 deletions.
15 changes: 13 additions & 2 deletions core/src/main/scala/kafka/controller/PartitionStateMachine.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class PartitionStateMachine(config: KafkaConfig,
stateChangeLogger: StateChangeLogger,
controllerContext: ControllerContext,
zkClient: KafkaZkClient,
partitionState: mutable.Map[TopicPartition, PartitionState],
val partitionState: mutable.Map[TopicPartition, PartitionState],
controllerBrokerRequestBatch: ControllerBrokerRequestBatch) extends Logging {
private val controllerId = config.brokerId

Expand Down Expand Up @@ -137,13 +137,24 @@ class PartitionStateMachine(config: KafkaConfig,
partitionState.filter { case (_, s) => s == state }.keySet.toSet
}

def excludeDeletingTopicFromOfflinePartitionCount(topic: String): Unit = {
if (topicDeletionManager.isTopicQueuedUpForDeletion(topic)) {
offlinePartitionCount = offlinePartitionCount -
controllerContext.partitionsForTopic(topic).count(partition => partitionState(partition) == OfflinePartition)
}
}

def removePartitionStatesForTopic(topic: String): Unit = {
controllerContext.partitionsForTopic(topic).foreach(partitionState.remove)
}

private def changeStateTo(partition: TopicPartition, currentState: PartitionState, targetState: PartitionState): Unit = {
partitionState.put(partition, targetState)
updateControllerMetrics(partition, currentState, targetState)
}

private def updateControllerMetrics(partition: TopicPartition, currentState: PartitionState, targetState: PartitionState) : Unit = {
if (!topicDeletionManager.isTopicWithDeletionStarted(partition.topic)) {
if (!topicDeletionManager.isTopicQueuedUpForDeletion(partition.topic)) {
if (currentState != OfflinePartition && targetState == OfflinePartition) {
offlinePartitionCount = offlinePartitionCount + 1
} else if (currentState == OfflinePartition && targetState != OfflinePartition) {
Expand Down
50 changes: 8 additions & 42 deletions core/src/main/scala/kafka/controller/TopicDeletionManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,27 +63,6 @@ class TopicDeletionManager(controller: KafkaController,
val controllerContext = controller.controllerContext
var isDeleteTopicEnabled = controller.config.deleteTopicEnable
val topicsToBeDeleted = mutable.Set.empty[String]
/** The following topicsWithDeletionStarted variable is used to properly update the offlinePartitionCount metric.
* When a topic is going through deletion, we don't want to keep track of its partition state
* changes in the offlinePartitionCount metric, see the PartitionStateMachine#updateControllerMetrics
* for detailed logic. This goal means if some partitions of a topic are already
* in OfflinePartition state when deletion starts, we need to change the corresponding partition
* states to NonExistentPartition first before starting the deletion.
*
* However we can NOT change partition states to NonExistentPartition at the time of enqueuing topics
* for deletion. The reason is that when a topic is enqueued for deletion, it may be ineligible for
* deletion due to ongoing partition reassignments. Hence there might be a delay between enqueuing
* a topic for deletion and the actual start of deletion. In this delayed interval, partitions may still
* transition to or out of the OfflinePartition state.
*
* Hence we decide to change partition states to NonExistentPartition only when the actual deletion have started.
* For topics whose deletion have actually started, we keep track of them in the following topicsWithDeletionStarted
* variable. And once a topic is in the topicsWithDeletionStarted set, we are sure there will no longer
* be partition reassignments to any of its partitions, and only then it's safe to move its partitions to
* NonExistentPartition state. Once a topic is in the topicsWithDeletionStarted set, we will stop monitoring
* its partition state changes in the offlinePartitionCount metric
*/
val topicsWithDeletionStarted = mutable.Set.empty[String]
val topicsIneligibleForDeletion = mutable.Set.empty[String]

// Try to create the znode for delete topic flag
Expand Down Expand Up @@ -118,7 +97,6 @@ class TopicDeletionManager(controller: KafkaController,
def reset() {
if (isDeleteTopicEnabled) {
topicsToBeDeleted.clear()
topicsWithDeletionStarted.clear()
topicsIneligibleForDeletion.clear()
}
}
Expand All @@ -131,7 +109,9 @@ class TopicDeletionManager(controller: KafkaController,
*/
def enqueueTopicsForDeletion(topics: Set[String]) {
if (isDeleteTopicEnabled) {
topicsToBeDeleted ++= topics
val newTopicsToBeDeleted = topics -- topicsToBeDeleted
topicsToBeDeleted ++= newTopicsToBeDeleted
newTopicsToBeDeleted.foreach(controller.partitionStateMachine.excludeDeletingTopicFromOfflinePartitionCount)
resumeDeletions()
}
}
Expand Down Expand Up @@ -201,13 +181,6 @@ class TopicDeletionManager(controller: KafkaController,
false
}

def isTopicWithDeletionStarted(topic: String) = {
if (isDeleteTopicEnabled) {
topicsWithDeletionStarted.contains(topic)
} else
false
}

def isTopicQueuedUpForDeletion(topic: String): Boolean = {
if (isDeleteTopicEnabled) {
topicsToBeDeleted.contains(topic)
Expand Down Expand Up @@ -260,10 +233,10 @@ class TopicDeletionManager(controller: KafkaController,
// controller will remove this replica from the state machine as well as its partition assignment cache
controller.replicaStateMachine.handleStateChanges(replicasForDeletedTopic.toSeq, NonExistentReplica)
topicsToBeDeleted -= topic
topicsWithDeletionStarted -= topic
zkClient.deleteTopicZNode(topic, controllerContext.epochZkVersion)
zkClient.deleteTopicConfigs(Seq(topic), controllerContext.epochZkVersion)
zkClient.deleteTopicDeletions(Seq(topic), controllerContext.epochZkVersion)
controller.partitionStateMachine.removePartitionStatesForTopic(topic)
controllerContext.removeTopic(topic)
}

Expand All @@ -277,18 +250,11 @@ class TopicDeletionManager(controller: KafkaController,
private def onTopicDeletion(topics: Set[String]) {
info(s"Topic deletion callback for ${topics.mkString(",")}")
// send update metadata so that brokers stop serving data for topics to be deleted
val partitions = topics.flatMap(controllerContext.partitionsForTopic)
val unseenTopicsForDeletion = topics -- topicsWithDeletionStarted
if (unseenTopicsForDeletion.nonEmpty) {
val unseenPartitionsForDeletion = unseenTopicsForDeletion.flatMap(controllerContext.partitionsForTopic)
controller.partitionStateMachine.handleStateChanges(unseenPartitionsForDeletion.toSeq, OfflinePartition)
controller.partitionStateMachine.handleStateChanges(unseenPartitionsForDeletion.toSeq, NonExistentPartition)
// adding of unseenTopicsForDeletion to topicsBeingDeleted must be done after the partition state changes
// to make sure the offlinePartitionCount metric is properly updated
topicsWithDeletionStarted ++= unseenTopicsForDeletion
}
val partitionsForDeletion = topics.flatMap(controllerContext.partitionsForTopic)
controller.partitionStateMachine.handleStateChanges(partitionsForDeletion.toSeq, OfflinePartition)
controller.partitionStateMachine.handleStateChanges(partitionsForDeletion.toSeq, NonExistentPartition)

controller.sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, partitions)
controller.sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, partitionsForDeletion)
topics.foreach { topic =>
onPartitionDeletion(controllerContext.partitionsForTopic(topic))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,24 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
"failed to get expected partition state upon broker startup")
}

@Test
def testTopicDeletionCleanUpPartitionState(): Unit = {
servers = makeServers(3, enableDeleteTopic = true)
val controllerId = TestUtils.waitUntilControllerElected(zkClient)
val controller = servers.find(broker => broker.config.brokerId == controllerId).get.kafkaController
val tp = new TopicPartition("t", 0)
val assignment = Map(tp.partition -> Seq(1, 0, 2))
TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)

// Make sure the partition state has been populated
assertTrue(controller.partitionStateMachine.partitionState.contains(tp))
adminZkClient.deleteTopic(tp.topic())
TestUtils.verifyTopicDeletion(zkClient, tp.topic(), 1, servers)

// Make sure the partition state has been removed
assertTrue(!controller.partitionStateMachine.partitionState.contains(tp))
}

@Test
def testLeaderAndIsrWhenEntireIsrOfflineAndUncleanLeaderElectionDisabled(): Unit = {
servers = makeServers(2)
Expand Down Expand Up @@ -624,12 +642,14 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
private def makeServers(numConfigs: Int,
autoLeaderRebalanceEnable: Boolean = false,
uncleanLeaderElectionEnable: Boolean = false,
enableControlledShutdown: Boolean = true) = {
enableControlledShutdown: Boolean = true,
enableDeleteTopic: Boolean = false) = {
val configs = TestUtils.createBrokerConfigs(numConfigs, zkConnect, enableControlledShutdown = enableControlledShutdown)
configs.foreach { config =>
config.setProperty(KafkaConfig.AutoLeaderRebalanceEnableProp, autoLeaderRebalanceEnable.toString)
config.setProperty(KafkaConfig.UncleanLeaderElectionEnableProp, uncleanLeaderElectionEnable.toString)
config.setProperty(KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp, "1")
config.setProperty(KafkaConfig.DeleteTopicEnableProp, enableDeleteTopic.toString)
}
configs.map(config => TestUtils.createServer(KafkaConfig.fromProps(config)))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ class PartitionStateMachineTest extends JUnitSuite {
controllerContext.updatePartitionReplicaAssignment(partition, Seq(brokerId))
}

EasyMock.expect(mockTopicDeletionManager.isTopicWithDeletionStarted(topic)).andReturn(false)
EasyMock.expect(mockTopicDeletionManager.isTopicQueuedUpForDeletion(topic)).andReturn(false)
EasyMock.expectLastCall().anyTimes()
prepareMockToElectLeaderForPartitions(partitions)
EasyMock.replay(mockZkClient, mockTopicDeletionManager)
Expand All @@ -404,7 +404,7 @@ class PartitionStateMachineTest extends JUnitSuite {
val topic = "test"
val partitions = partitionIds.map(new TopicPartition("test", _))

EasyMock.expect(mockTopicDeletionManager.isTopicWithDeletionStarted(topic)).andReturn(true)
EasyMock.expect(mockTopicDeletionManager.isTopicQueuedUpForDeletion(topic)).andReturn(true)
EasyMock.expectLastCall().anyTimes()
EasyMock.replay(mockTopicDeletionManager)

Expand Down

0 comments on commit cf18791

Please sign in to comment.