Skip to content

Commit

Permalink
[LI-HOTFIX] Support listing all partitions that are being reassigned …
Browse files Browse the repository at this point in the history
…in the kafka-topics.sh (#416)

TICKET = https://issues.apache.org/jira/browse/KAFKA-14381
LI_DESCRIPTION =
Currently, we don't have the tooling to list all of the partitions that are being reassigned in a
cluster.
This PR adds support in the kafka-topics.sh script, after which the partitions being reassigned can be shown via

./bin/kafka-topics.sh --bootstrap-server <> --command-config ./ssl_configs.properties --describe --inflight-partitions

EXIT_CRITERIA = When the change is merged upstream and pulled internally
  • Loading branch information
gitlw authored Nov 11, 2022
1 parent f9e3679 commit 5b5aef2
Showing 1 changed file with 13 additions and 3 deletions.
16 changes: 13 additions & 3 deletions core/src/main/scala/kafka/admin/TopicCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ object TopicCommand extends Logging {
!opts.reportUnavailablePartitions &&
!opts.reportUnderReplicatedPartitions &&
!opts.reportUnderMinIsrPartitions &&
!opts.reportAtMinIsrPartitions
!opts.reportAtMinIsrPartitions &&
!opts.reportInFlightPartitions
val describePartitions = !opts.reportOverriddenConfigs

private def shouldPrintUnderReplicatedPartitions(partitionDescription: PartitionDescription): Boolean = {
Expand All @@ -181,13 +182,17 @@ object TopicCommand extends Logging {
private def shouldPrintAtMinIsrPartitions(partitionDescription: PartitionDescription): Boolean = {
opts.reportAtMinIsrPartitions && partitionDescription.isAtMinIsrPartitions
}
private def shouldPrintInFlightPartitions(partitionDescription: PartitionDescription): Boolean = {
opts.reportInFlightPartitions && partitionDescription.reassignment.isDefined
}

private def shouldPrintTopicPartition(partitionDesc: PartitionDescription): Boolean = {
describeConfigs ||
shouldPrintUnderReplicatedPartitions(partitionDesc) ||
shouldPrintUnavailablePartitions(partitionDesc) ||
shouldPrintUnderMinIsrPartitions(partitionDesc) ||
shouldPrintAtMinIsrPartitions(partitionDesc)
shouldPrintAtMinIsrPartitions(partitionDesc) ||
shouldPrintInFlightPartitions(partitionDesc)
}

def maybePrintPartitionDescription(desc: PartitionDescription): Unit = {
Expand Down Expand Up @@ -502,6 +507,8 @@ object TopicCommand extends Logging {
"if set when describing topics, only show partitions whose isr count is less than the configured minimum.")
private val reportAtMinIsrPartitionsOpt = parser.accepts("at-min-isr-partitions",
"if set when describing topics, only show partitions whose isr count is equal to the configured minimum.")
private val reportInFlightPartitionsOpt = parser.accepts("inflight-partitions",
"if set when describing topics, only show partitions who is undergoing reassignment.")
private val topicsWithOverridesOpt = parser.accepts("topics-with-overrides",
"if set when describing topics, only show topics that have overridden configs")
private val ifExistsOpt = parser.accepts("if-exists",
Expand All @@ -518,7 +525,7 @@ object TopicCommand extends Logging {

private val allTopicLevelOpts = immutable.Set[OptionSpec[_]](alterOpt, createOpt, describeOpt, listOpt, deleteOpt)

private val allReplicationReportOpts = Set(reportUnderReplicatedPartitionsOpt, reportUnderMinIsrPartitionsOpt, reportAtMinIsrPartitionsOpt, reportUnavailablePartitionsOpt)
private val allReplicationReportOpts = Set(reportUnderReplicatedPartitionsOpt, reportUnderMinIsrPartitionsOpt, reportAtMinIsrPartitionsOpt, reportInFlightPartitionsOpt, reportUnavailablePartitionsOpt)

def has(builder: OptionSpec[_]): Boolean = options.has(builder)
def valueAsOption[A](option: OptionSpec[A], defaultValue: Option[A] = None): Option[A] = if (has(option)) Some(options.valueOf(option)) else defaultValue
Expand All @@ -545,6 +552,7 @@ object TopicCommand extends Logging {
def reportUnavailablePartitions: Boolean = has(reportUnavailablePartitionsOpt)
def reportUnderMinIsrPartitions: Boolean = has(reportUnderMinIsrPartitionsOpt)
def reportAtMinIsrPartitions: Boolean = has(reportAtMinIsrPartitionsOpt)
def reportInFlightPartitions: Boolean = has(reportInFlightPartitionsOpt)
def reportOverriddenConfigs: Boolean = has(topicsWithOverridesOpt)
def ifExists: Boolean = has(ifExistsOpt)
def ifNotExists: Boolean = has(ifNotExistsOpt)
Expand Down Expand Up @@ -590,6 +598,8 @@ object TopicCommand extends Logging {
allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts - reportUnderMinIsrPartitionsOpt + topicsWithOverridesOpt)
CommandLineUtils.checkInvalidArgs(parser, options, reportAtMinIsrPartitionsOpt,
allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts - reportAtMinIsrPartitionsOpt + topicsWithOverridesOpt)
CommandLineUtils.checkInvalidArgs(parser, options, reportInFlightPartitionsOpt,
allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts - reportInFlightPartitionsOpt + topicsWithOverridesOpt)
CommandLineUtils.checkInvalidArgs(parser, options, reportUnavailablePartitionsOpt,
allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts - reportUnavailablePartitionsOpt + topicsWithOverridesOpt)
CommandLineUtils.checkInvalidArgs(parser, options, topicsWithOverridesOpt,
Expand Down

0 comments on commit 5b5aef2

Please sign in to comment.