Skip to content

Commit

Permalink
[LI-CHERRY-PICK] [31d191f] KAFKA-7904; Add AtMinIsr partition metric …
Browse files Browse the repository at this point in the history
…and TopicCommand option (KIP-427) (#30)

TICKET = KAFKA-7904
LI_DESCRIPTION =
This patch includes broker metrics for AtMinIsr and modified unit tests. However, it doesn't include topiccommand related change from the original cherry-picked commit because of extra dependency.

EXIT_CRITERIA = HASH [31d191f]
ORIGINAL_DESCRIPTION =

- Add `AtMinIsrPartitionCount` metric to `ReplicaManager`
- Add `AtMinIsr` metric to `Partition`
- Add `--at-min-isr-partitions` describe `TopicCommand` option

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103089398

Author: Kevin Lu <[email protected]>
Author: [email protected] <[email protected]>

Reviewers: Gwen Shapira

Closes apache#6421 from KevinLiLu/KAFKA-7904

(cherry picked from commit 31d191f)
  • Loading branch information
xiowu0 authored Jul 16, 2019
1 parent 3db6fd8 commit c3b03ea
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 1 deletion.
19 changes: 19 additions & 0 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,15 @@ class Partition(val topic: String,
tags
)

newGauge("AtMinIsr",
new Gauge[Int] {
def value = {
if (isAtMinIsr) 1 else 0
}
},
tags
)

newGauge("ReplicasCount",
new Gauge[Int] {
def value = {
Expand Down Expand Up @@ -141,6 +150,15 @@ class Partition(val topic: String,
}
}

def isAtMinIsr: Boolean = {
leaderReplicaIfLocal match {
case Some(leaderReplica) =>
inSyncReplicas.size == leaderReplica.log.get.config.minInSyncReplicas
case None =>
false
}
}

/**
* Create the future replica if 1) the current replica is not in the given log directory and 2) the future replica
* does not exist. This method assumes that the current replica has already been created.
Expand Down Expand Up @@ -763,6 +781,7 @@ class Partition(val topic: String,
removeMetric("InSyncReplicasCount", tags)
removeMetric("ReplicasCount", tags)
removeMetric("LastStableOffsetLag", tags)
removeMetric("AtMinIsr", tags)
}

override def equals(that: Any): Boolean = that match {
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,12 @@ class ReplicaManager(val config: KafkaConfig,
def value = leaderPartitionsIterator.count(_.isUnderMinIsr)
}
)
val atMinIsrPartitionCount = newGauge(
"AtMinIsrPartitionCount",
new Gauge[Int] {
def value = leaderPartitionsIterator.count(_.isAtMinIsr)
}
)

val recompressionCount = newGauge(
"recompressionCount",
Expand Down Expand Up @@ -1479,6 +1485,7 @@ class ReplicaManager(val config: KafkaConfig,
removeMetric("UnderReplicatedPartitions")
removeMetric("UnderMinIsrPartitionCount")
removeMetric("recompressedBatch")
removeMetric("AtMinIsrPartitionCount")
}

// High watermark do not need to be checkpointed only when under unit tests
Expand Down
31 changes: 30 additions & 1 deletion core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@ import kafka.common.UnexpectedAppendOffsetException
import kafka.log.{CleanerConfig, LogConfig, LogManager}
import kafka.server._
import kafka.utils.{MockScheduler, MockTime, TestUtils}
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.ReplicaNotAvailableException
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.LeaderAndIsrRequest
import org.easymock.EasyMock
import org.junit.{After, Before, Test}
import org.junit.Assert._
import org.scalatest.Assertions.assertThrows
Expand Down Expand Up @@ -69,13 +71,19 @@ class PartitionTest {
logDirs = Seq(logDir1, logDir2), defaultConfig = logConfig, CleanerConfig(enableCleaner = false), time)
logManager.startup()

val kafkaZkClient: KafkaZkClient = EasyMock.createMock(classOf[KafkaZkClient])
val brokerProps = TestUtils.createBrokerConfig(brokerId, TestUtils.MockZkConnect)
brokerProps.put(KafkaConfig.LogDirsProp, Seq(logDir1, logDir2).map(_.getAbsolutePath).mkString(","))
val brokerConfig = KafkaConfig.fromProps(brokerProps)
replicaManager = new ReplicaManager(
config = brokerConfig, metrics, time, zkClient = null, new MockScheduler(time),
config = brokerConfig, metrics, time, zkClient = kafkaZkClient, new MockScheduler(time),
logManager, new AtomicBoolean(false), QuotaFactory.instantiate(brokerConfig, metrics, time, ""),
brokerTopicStats, new MetadataCache(brokerId), new LogDirFailureChannel(brokerConfig.logDirs.size))

EasyMock.expect(kafkaZkClient.getEntityConfigs(EasyMock.anyString(), EasyMock.anyString())).andReturn(logProps).anyTimes()
EasyMock.expect(kafkaZkClient.conditionalUpdatePath(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject()))
.andReturn((true, 0)).anyTimes()
EasyMock.replay(kafkaZkClient)
}

@After
Expand Down Expand Up @@ -284,4 +292,25 @@ class PartitionTest {
builder.build()
}

/**
* Test for AtMinIsr partition state. We set the partition replica set size as 3, but only set one replica as an ISR.
* As the default minIsr configuration is 1, then the partition should be at min ISR (isAtMinIsr = true).
*/
@Test
def testAtMinIsr(): Unit = {
val controllerEpoch = 3
val leader = brokerId
val follower1 = brokerId + 1
val follower2 = brokerId + 2
val controllerId = brokerId + 3
val replicas = List[Integer](leader, follower1, follower2).asJava
val isr = List[Integer](leader).asJava
val leaderEpoch = 8

val partition = new Partition(topicPartition.topic, topicPartition.partition, time, replicaManager)
assertFalse(partition.isAtMinIsr)
// Make isr set to only have leader to trigger AtMinIsr (default min isr config is 1)
partition.makeLeader(controllerId, new LeaderAndIsrRequest.PartitionState(controllerEpoch, leader, leaderEpoch, isr, 1, replicas, true), 0)
assertTrue(partition.isAtMinIsr)
}
}
5 changes: 5 additions & 0 deletions docs/ops.html
Original file line number Diff line number Diff line change
Expand Up @@ -827,6 +827,11 @@ <h3><a id="monitoring" href="#monitoring">6.6 Monitoring</a></h3>
<td>kafka.server:type=ReplicaManager,name=UnderMinIsrPartitionCount</td>
<td>0</td>
</tr>
<tr>
<td># of at minIsr partitions (|ISR| = min.insync.replicas)</td>
<td>kafka.server:type=ReplicaManager,name=AtMinIsrPartitionCount</td>
<td>0</td>
</tr>
<tr>
<td># of offline log directories</td>
<td>kafka.log:type=LogManager,name=OfflineLogDirectoryCount</td>
Expand Down

0 comments on commit c3b03ea

Please sign in to comment.