From 703aa5b9b7968ca3e1f9546d68851be3b9c3eed0 Mon Sep 17 00:00:00 2001 From: Ke Hu Date: Wed, 27 Mar 2019 10:16:22 -0700 Subject: [PATCH] Disable delete snapshot file during topic deletion and fix bug in grouping stop replica request when callback is null --- .../main/scala/kafka/controller/ReplicaStateMachine.scala | 5 ++++- core/src/main/scala/kafka/log/LogManager.scala | 4 +++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index 433ab5668379e..bdcdfc97e3def 100644 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -202,9 +202,12 @@ class ReplicaStateMachine(config: KafkaConfig, replicaState.put(replica, OnlineReplica) } case OfflineReplica => + // callback needs to be null so that later when controller sends STOP_REPLICA request to destination + // broker to change replica from online to offline state, it will batch the requests and send one request + // for the topic instead of one request for each topic partition validReplicas.foreach { replica => controllerBrokerRequestBatch.addStopReplicaRequestForBrokers(Seq(replicaId), replica.topicPartition, - deletePartition = false, (_, _) => ()) + deletePartition = false, null) } val (replicasWithLeadershipInfo, replicasWithoutLeadershipInfo) = validReplicas.partition { replica => controllerContext.partitionLeadershipInfo.contains(replica.topicPartition) diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index a522d43dec7aa..1cc3f9c362299 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -807,7 +807,9 @@ class LogManager(logDirs: Seq[File], // Now that replica in source log directory has been successfully renamed for deletion. // Close the log, update checkpoint files, and enqueue this log to be deleted. sourceLog.close() - checkpointLogRecoveryOffsetsInDir(sourceLog.dir.getParentFile) + // Set deleteSnapshotFiles to false to speed up topic deletion, since snapshot file is only used + // for transaction, which is not used anywhere in LinkedIn + checkpointLogRecoveryOffsetsInDir(sourceLog.dir.getParentFile, false) checkpointLogStartOffsetsInDir(sourceLog.dir.getParentFile) addLogToBeDeleted(sourceLog) } catch {