From 76aec3e455976d0f482a8a78c6454a9910a2e02e Mon Sep 17 00:00:00 2001 From: Lincong Li Date: Mon, 15 Apr 2019 17:23:41 -0700 Subject: [PATCH] Add logging when automatic topic creation happens in order to track which application might be relying on auto.topic.create.enable being true (#12) --- core/src/main/scala/kafka/server/KafkaApis.scala | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 08c89bd4ac01a..257b759471e59 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -1001,10 +1001,11 @@ class KafkaApis(val requestChannel: RequestChannel, topicMetadata.headOption.getOrElse(createInternalTopic(topic)) } - private def getTopicMetadata(allowAutoTopicCreation: Boolean, topics: Set[String], listenerName: ListenerName, + private def getTopicMetadata(allowAutoTopicCreation: Boolean, topics: Set[String], requestContext: RequestContext, errorUnavailableEndpoints: Boolean, errorUnavailableListeners: Boolean): Seq[MetadataResponse.TopicMetadata] = { - val topicResponses = metadataCache.getTopicMetadata(topics, listenerName, + + val topicResponses = metadataCache.getTopicMetadata(topics, requestContext.listenerName, errorUnavailableEndpoints, errorUnavailableListeners) if (topics.isEmpty || topicResponses.size == topics.size) { topicResponses @@ -1018,6 +1019,12 @@ class KafkaApis(val requestChannel: RequestChannel, else topicMetadata } else if (allowAutoTopicCreation && config.autoCreateTopicsEnable) { + val msg = "Automatically creating topic: " + topic + " with " + config.numPartitions + + " partitions and replication factor " + config.defaultReplicationFactor + + " due to request from " + requestContext.principal + " at IP address " + + requestContext.clientAddress + " and header " + requestContext.header + + info(msg); createTopic(topic, config.numPartitions, config.defaultReplicationFactor) } else { new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, false, java.util.Collections.emptyList()) @@ -1088,7 +1095,7 @@ class KafkaApis(val requestChannel: RequestChannel, if (authorizedTopics.isEmpty) Seq.empty[MetadataResponse.TopicMetadata] else - getTopicMetadata(metadataRequest.allowAutoTopicCreation, authorizedTopics, request.context.listenerName, + getTopicMetadata(metadataRequest.allowAutoTopicCreation, authorizedTopics, request.context, errorUnavailableEndpoints, errorUnavailableListeners) val completeTopicMetadata = topicMetadata ++ unauthorizedForCreateTopicMetadata ++ unauthorizedForDescribeTopicMetadata