diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 08c89bd4ac01a..257b759471e59 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -1001,10 +1001,11 @@ class KafkaApis(val requestChannel: RequestChannel, topicMetadata.headOption.getOrElse(createInternalTopic(topic)) } - private def getTopicMetadata(allowAutoTopicCreation: Boolean, topics: Set[String], listenerName: ListenerName, + private def getTopicMetadata(allowAutoTopicCreation: Boolean, topics: Set[String], requestContext: RequestContext, errorUnavailableEndpoints: Boolean, errorUnavailableListeners: Boolean): Seq[MetadataResponse.TopicMetadata] = { - val topicResponses = metadataCache.getTopicMetadata(topics, listenerName, + + val topicResponses = metadataCache.getTopicMetadata(topics, requestContext.listenerName, errorUnavailableEndpoints, errorUnavailableListeners) if (topics.isEmpty || topicResponses.size == topics.size) { topicResponses @@ -1018,6 +1019,12 @@ class KafkaApis(val requestChannel: RequestChannel, else topicMetadata } else if (allowAutoTopicCreation && config.autoCreateTopicsEnable) { + val msg = "Automatically creating topic: " + topic + " with " + config.numPartitions + + " partitions and replication factor " + config.defaultReplicationFactor + + " due to request from " + requestContext.principal + " at IP address " + + requestContext.clientAddress + " and header " + requestContext.header + + info(msg); createTopic(topic, config.numPartitions, config.defaultReplicationFactor) } else { new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, false, java.util.Collections.emptyList()) @@ -1088,7 +1095,7 @@ class KafkaApis(val requestChannel: RequestChannel, if (authorizedTopics.isEmpty) Seq.empty[MetadataResponse.TopicMetadata] else - getTopicMetadata(metadataRequest.allowAutoTopicCreation, authorizedTopics, request.context.listenerName, + getTopicMetadata(metadataRequest.allowAutoTopicCreation, authorizedTopics, request.context, errorUnavailableEndpoints, errorUnavailableListeners) val completeTopicMetadata = topicMetadata ++ unauthorizedForCreateTopicMetadata ++ unauthorizedForDescribeTopicMetadata