Skip to content

Commit

Permalink
Add logging when automatic topic creation happens in order to track w…
Browse files Browse the repository at this point in the history
…hich application might be relying on auto.topic.create.enable being true (#12)
  • Loading branch information
Lincong authored Apr 16, 2019
1 parent 48db990 commit 76aec3e
Showing 1 changed file with 10 additions and 3 deletions.
13 changes: 10 additions & 3 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1001,10 +1001,11 @@ class KafkaApis(val requestChannel: RequestChannel,
topicMetadata.headOption.getOrElse(createInternalTopic(topic))
}

private def getTopicMetadata(allowAutoTopicCreation: Boolean, topics: Set[String], listenerName: ListenerName,
private def getTopicMetadata(allowAutoTopicCreation: Boolean, topics: Set[String], requestContext: RequestContext,
errorUnavailableEndpoints: Boolean,
errorUnavailableListeners: Boolean): Seq[MetadataResponse.TopicMetadata] = {
val topicResponses = metadataCache.getTopicMetadata(topics, listenerName,

val topicResponses = metadataCache.getTopicMetadata(topics, requestContext.listenerName,
errorUnavailableEndpoints, errorUnavailableListeners)
if (topics.isEmpty || topicResponses.size == topics.size) {
topicResponses
Expand All @@ -1018,6 +1019,12 @@ class KafkaApis(val requestChannel: RequestChannel,
else
topicMetadata
} else if (allowAutoTopicCreation && config.autoCreateTopicsEnable) {
val msg = "Automatically creating topic: " + topic + " with " + config.numPartitions +
" partitions and replication factor " + config.defaultReplicationFactor +
" due to request from " + requestContext.principal + " at IP address " +
requestContext.clientAddress + " and header " + requestContext.header

info(msg);
createTopic(topic, config.numPartitions, config.defaultReplicationFactor)
} else {
new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, false, java.util.Collections.emptyList())
Expand Down Expand Up @@ -1088,7 +1095,7 @@ class KafkaApis(val requestChannel: RequestChannel,
if (authorizedTopics.isEmpty)
Seq.empty[MetadataResponse.TopicMetadata]
else
getTopicMetadata(metadataRequest.allowAutoTopicCreation, authorizedTopics, request.context.listenerName,
getTopicMetadata(metadataRequest.allowAutoTopicCreation, authorizedTopics, request.context,
errorUnavailableEndpoints, errorUnavailableListeners)

val completeTopicMetadata = topicMetadata ++ unauthorizedForCreateTopicMetadata ++ unauthorizedForDescribeTopicMetadata
Expand Down

0 comments on commit 76aec3e

Please sign in to comment.