Skip to content

Commit

Permalink
Avoid the potential race condition for observing produce request. (#17)
Browse files Browse the repository at this point in the history
  • Loading branch information
Lincong authored Apr 19, 2019
1 parent daaf4a5 commit d2c093b
Showing 1 changed file with 8 additions and 7 deletions.
15 changes: 8 additions & 7 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,14 @@ class KafkaApis(val requestChannel: RequestChannel,
if (authorizedRequestInfo.isEmpty)
sendResponseCallback(Map.empty)
else {

try
observer.observeProduceRequest(request.context, request.body[ProduceRequest])
catch {
case e: Exception => error(s"Observer failed to observe the produce request " +
s"${Observer.describeRequestAndResponse(request, null)}", e)
}

val internalTopicsAllowed = request.header.clientId == AdminUtils.AdminClientId

// call the replica manager to append messages to the replicas
Expand All @@ -498,13 +506,6 @@ class KafkaApis(val requestChannel: RequestChannel,
responseCallback = sendResponseCallback,
recordConversionStatsCallback = processingStatsCallback)

try
observer.observeProduceRequest(request.context, request.body[ProduceRequest])
catch {
case e: Exception => error(s"Observer failed to observe the produce request " +
s"${Observer.describeRequestAndResponse(request, null)}", e)
}

// if the request is put into the purgatory, it will have a held reference and hence cannot be garbage collected;
// hence we clear its data here in order to let GC reclaim its memory since it is already appended to log
produceRequest.clearPartitionRecords()
Expand Down

0 comments on commit d2c093b

Please sign in to comment.