Skip to content

Commit

Permalink
Fixed NPE in LiCreateTopicPolicy, setup test in AdminClientIntegratio…
Browse files Browse the repository at this point in the history
…nTest.
  • Loading branch information
Alex Wang authored and jonlee2 committed Mar 5, 2019
1 parent 1c906e4 commit 00c75e7
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 5 deletions.
11 changes: 6 additions & 5 deletions core/src/main/scala/kafka/server/LiCreateTopicPolicy.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,18 @@ class LiCreateTopicPolicy extends CreateTopicPolicy {
override def validate(requestMetadata: CreateTopicPolicy.RequestMetadata): Unit = {
val requestTopic = requestMetadata.topic
val requestRF = requestMetadata.replicationFactor()
import collection.JavaConverters._
val requestAssignment = requestMetadata.replicasAssignments().asScala

if (requestAssignment == null && requestRF == null)
// For scala 2.11.x, a scala object converted from a null java object via asScala is NOT null. This could lead to
// NPE, thus we check the java object directly. AdminClientIntegrationTest verifies the behavior.
if (requestMetadata.replicasAssignments() == null && requestRF == null)
throw new PolicyViolationException(s"Topic [$requestTopic] is missing both replica assignment and " +
s"replication factor.")

// In createTopics() in AdminManager, replicationFactor and replicasAssignments are not both set at same time. We
// follow the same rationale here and prioritize replicasAssignments over replicationFactor
if (requestAssignment != null) {
requestAssignment.foreach { case (p, assignment) =>
if (requestMetadata.replicasAssignments() != null) {
import collection.JavaConverters._
requestMetadata.replicasAssignments().asScala.foreach { case (p, assignment) =>
if (assignment.size() < minRf)
throw new PolicyViolationException(s"Topic [$requestTopic] fails RF requirement. Received RF for " +
s"[partition-$p]: ${assignment.size()}, min required RF: $minRf.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
config.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, s"${listenerName.value}:${securityProtocol.name}")
config.setProperty(KafkaConfig.DeleteTopicEnableProp, "true")
config.setProperty(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0")
// Set up CreateTopicPolicy to be included in test.
config.setProperty(KafkaConfig.DefaultReplicationFactorProp, "1");
config.setProperty(KafkaConfig.CreateTopicPolicyClassNameProp, "kafka.server.LiCreateTopicPolicy")
// We set this in order to test that we don't expose sensitive data via describe configs. This will already be
// set for subclasses with security enabled and we don't want to overwrite it.
if (!config.containsKey(KafkaConfig.SslTruststorePasswordProp))
Expand Down

0 comments on commit 00c75e7

Please sign in to comment.