From 3db6fd830caf4778df066a90eae5417dfa91d282 Mon Sep 17 00:00:00 2001 From: kun du Date: Wed, 10 Jul 2019 10:30:09 -0700 Subject: [PATCH] [LI-HOTFIX] Clean up purgatory when leader replica is kicked out of replica list. TICKET = KAFKA-8571 LI_DESCRIPTION = Upon receiving StopReplicaRequest, broker should check its purgatory and finish related pending produce/consume requests if any; otherwise client will wait unnecessary long time before getting timeout response. EXIT_CRITERIA = MANUAL ["When the hotfix is pushed to apache/kafka"] --- .../main/scala/kafka/cluster/Partition.scala | 2 +- .../scala/kafka/server/ReplicaManager.scala | 3 ++ .../kafka/server/ReplicaManagerTest.scala | 46 ++++++++++++++++++- 3 files changed, 49 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 861355c05e9b2..d1ccc78740ae7 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -526,7 +526,7 @@ class Partition(val topic: String, /** * Try to complete any pending requests. This should be called without holding the leaderIsrUpdateLock. */ - private def tryCompleteDelayedRequests() { + def tryCompleteDelayedRequests() { val requestKey = new TopicPartitionOperationKey(topicPartition) replicaManager.tryCompleteDelayedFetch(requestKey) replicaManager.tryCompleteDelayedProduce(requestKey) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 5b4dcd0e319bc..e3f555f889950 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -358,6 +358,9 @@ class ReplicaManager(val config: KafkaConfig, } if (removedPartition != null) { + // If the deleted replica was leader replica, there might be delayed requests in purgatory need to be cleaned up. + if (removedPartition.leaderReplicaIdOpt.map(_ == localBrokerId).getOrElse(false)) + removedPartition.tryCompleteDelayedRequests() val topicHasPartitions = allPartitions.values.exists(partition => topicPartition.topic == partition.topic) if (!topicHasPartitions) brokerTopicStats.removeMetrics(topicPartition.topic) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 81665a24978f3..5db81a244e221 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -34,7 +34,7 @@ import org.I0Itec.zkclient.ZkClient import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record._ -import org.apache.kafka.common.requests.{EpochEndOffset, IsolationLevel, LeaderAndIsrRequest} +import org.apache.kafka.common.requests.{EpochEndOffset, IsolationLevel, LeaderAndIsrRequest, StopReplicaRequest} import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests.FetchRequest.PartitionData import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction @@ -191,6 +191,50 @@ class ReplicaManagerTest { } } + @Test + def testClearPurgatoryOnReceivingStopReplicaRequest() { + val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect) + props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath) + val config = KafkaConfig.fromProps(props) + val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)), LogConfig(new Properties())) + val aliveBrokers = Seq(createBroker(0, "host0", 0), createBroker(1, "host1", 1)) + val metadataCache = EasyMock.createMock(classOf[MetadataCache]) + EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes() + EasyMock.replay(metadataCache) + val rm = new ReplicaManager(config, metrics, time, kafkaZkClient, new MockScheduler(time), mockLogMgr, + new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats, + metadataCache, new LogDirFailureChannel(config.logDirs.size)) + + try { + val brokerList = Seq[Integer](0, 1).asJava + // Make this replica the leader. + val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, + collection.immutable.Map(new TopicPartition(topic, 0) -> new LeaderAndIsrRequest.PartitionState(0, 0, 0, brokerList, 0, brokerList, false)).asJava, + Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() + rm.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ()) + + // Try to produce to the leader replica. + val records = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("first message".getBytes())) + val appendResult = appendRecords(rm, new TopicPartition(topic, 0), records).onFire { response => + assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, response.error) + } + + // Send StopReplicaRequest(delete = false) to move replica to OfflineReplica state. + val stopReplicaRequest1 = new StopReplicaRequest.Builder(ApiKeys.STOP_REPLICA.latestVersion(), 0, 0, brokerEpoch, false, + Set(new TopicPartition(topic, 0)).asJava).build() + rm.stopReplicas(stopReplicaRequest1) + + // Send StopReplicaRequest(delete = true) to move replica to NonExistentReplica state. + val stopReplicaRequest2 = new StopReplicaRequest.Builder(ApiKeys.STOP_REPLICA.latestVersion(), 0, 0, brokerEpoch, true, + Set(new TopicPartition(topic, 0)).asJava).build() + rm.stopReplicas(stopReplicaRequest2) + + assertTrue(appendResult.isFired) + } finally { + rm.shutdown(checkpointHW = false) + } + } + @Test def testReceiveOutOfOrderSequenceExceptionWithLogStartOffset(): Unit = { val timer = new MockTimer