Skip to content

Commit

Permalink
[LI-HOTFIX] Clean up purgatory when leader replica is kicked out of r…
Browse files Browse the repository at this point in the history
…eplica list.

TICKET = KAFKA-8571

LI_DESCRIPTION = Upon receiving StopReplicaRequest, broker should check its purgatory and finish related pending produce/consume requests if any; otherwise client will wait unnecessary long time before getting timeout response.

EXIT_CRITERIA = MANUAL ["When the hotfix is pushed to apache/kafka"]
  • Loading branch information
kun du committed Jul 10, 2019
1 parent d76a07d commit 3db6fd8
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 2 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ class Partition(val topic: String,
/**
* Try to complete any pending requests. This should be called without holding the leaderIsrUpdateLock.
*/
private def tryCompleteDelayedRequests() {
def tryCompleteDelayedRequests() {
val requestKey = new TopicPartitionOperationKey(topicPartition)
replicaManager.tryCompleteDelayedFetch(requestKey)
replicaManager.tryCompleteDelayedProduce(requestKey)
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,9 @@ class ReplicaManager(val config: KafkaConfig,
}

if (removedPartition != null) {
// If the deleted replica was leader replica, there might be delayed requests in purgatory need to be cleaned up.
if (removedPartition.leaderReplicaIdOpt.map(_ == localBrokerId).getOrElse(false))
removedPartition.tryCompleteDelayedRequests()
val topicHasPartitions = allPartitions.values.exists(partition => topicPartition.topic == partition.topic)
if (!topicHasPartitions)
brokerTopicStats.removeMetrics(topicPartition.topic)
Expand Down
46 changes: 45 additions & 1 deletion core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.{EpochEndOffset, IsolationLevel, LeaderAndIsrRequest}
import org.apache.kafka.common.requests.{EpochEndOffset, IsolationLevel, LeaderAndIsrRequest, StopReplicaRequest}
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests.FetchRequest.PartitionData
import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
Expand Down Expand Up @@ -191,6 +191,50 @@ class ReplicaManagerTest {
}
}

@Test
def testClearPurgatoryOnReceivingStopReplicaRequest() {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
val config = KafkaConfig.fromProps(props)
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)), LogConfig(new Properties()))
val aliveBrokers = Seq(createBroker(0, "host0", 0), createBroker(1, "host1", 1))
val metadataCache = EasyMock.createMock(classOf[MetadataCache])
EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes()
EasyMock.replay(metadataCache)
val rm = new ReplicaManager(config, metrics, time, kafkaZkClient, new MockScheduler(time), mockLogMgr,
new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats,
metadataCache, new LogDirFailureChannel(config.logDirs.size))

try {
val brokerList = Seq[Integer](0, 1).asJava
// Make this replica the leader.
val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
collection.immutable.Map(new TopicPartition(topic, 0) -> new LeaderAndIsrRequest.PartitionState(0, 0, 0, brokerList, 0, brokerList, false)).asJava,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
rm.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ())

// Try to produce to the leader replica.
val records = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("first message".getBytes()))
val appendResult = appendRecords(rm, new TopicPartition(topic, 0), records).onFire { response =>
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, response.error)
}

// Send StopReplicaRequest(delete = false) to move replica to OfflineReplica state.
val stopReplicaRequest1 = new StopReplicaRequest.Builder(ApiKeys.STOP_REPLICA.latestVersion(), 0, 0, brokerEpoch, false,
Set(new TopicPartition(topic, 0)).asJava).build()
rm.stopReplicas(stopReplicaRequest1)

// Send StopReplicaRequest(delete = true) to move replica to NonExistentReplica state.
val stopReplicaRequest2 = new StopReplicaRequest.Builder(ApiKeys.STOP_REPLICA.latestVersion(), 0, 0, brokerEpoch, true,
Set(new TopicPartition(topic, 0)).asJava).build()
rm.stopReplicas(stopReplicaRequest2)

assertTrue(appendResult.isFired)
} finally {
rm.shutdown(checkpointHW = false)
}
}

@Test
def testReceiveOutOfOrderSequenceExceptionWithLogStartOffset(): Unit = {
val timer = new MockTimer
Expand Down

0 comments on commit 3db6fd8

Please sign in to comment.