diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 257b759471e59..73c464b2cfacc 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -498,6 +498,13 @@ class KafkaApis(val requestChannel: RequestChannel, responseCallback = sendResponseCallback, recordConversionStatsCallback = processingStatsCallback) + try + observer.observeProduceRequest(request.context, request.body[ProduceRequest]) + catch { + case e: Exception => error(s"Observer failed to observe the produce request " + + s"${Observer.describeRequestAndResponse(request, null)}", e) + } + // if the request is put into the purgatory, it will have a held reference and hence cannot be garbage collected; // hence we clear its data here in order to let GC reclaim its memory since it is already appended to log produceRequest.clearPartitionRecords() @@ -2382,14 +2389,11 @@ class KafkaApis(val requestChannel: RequestChannel, val responseString = if (RequestChannel.isRequestLoggingEnabled) Some(response.toString(request.context.apiVersion)) else None - try { - observer.observe(request.context, request.body[AbstractRequest], response) - } catch { - case e: Exception => error(s"Observer failed to observe ${Observer.describeRequestAndResponse(request, response)}", e) - } + observeRequestResponse(request, response) new RequestChannel.SendResponse(request, responseSend, responseString, onComplete) case None => + observeRequestResponse(request, null) new RequestChannel.NoOpResponse(request) } sendResponse(response) @@ -2411,4 +2415,11 @@ class KafkaApis(val requestChannel: RequestChannel, } } + private def observeRequestResponse(request: RequestChannel.Request, response: AbstractResponse): Unit = { + try { + observer.observe(request.context, request.body[AbstractRequest], response) + } catch { + case e: Exception => error(s"Observer failed to observe ${Observer.describeRequestAndResponse(request, response)}", e) + } + } } diff --git a/core/src/main/scala/kafka/server/NoOpObserver.scala b/core/src/main/scala/kafka/server/NoOpObserver.scala index 072201833246d..3fea54a7c8528 100644 --- a/core/src/main/scala/kafka/server/NoOpObserver.scala +++ b/core/src/main/scala/kafka/server/NoOpObserver.scala @@ -19,7 +19,7 @@ package kafka.server import java.util.Map import java.util.concurrent.TimeUnit -import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, RequestContext} +import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, ProduceRequest, RequestContext} /** * An observer implementation that has no operation and serves as a place holder. @@ -29,10 +29,15 @@ class NoOpObserver extends Observer { def configure(configs: Map[String, _]): Unit = {} /** - * Observer the record based on the given information. + * Observe a request and its corresponding response. */ def observe(requestContext: RequestContext, request: AbstractRequest, response: AbstractResponse): Unit = {} + /** + * Observe a produce request + */ + def observeProduceRequest(requestContext: RequestContext, produceRequest: ProduceRequest): Unit = {} + /** * Close the observer with timeout. */ diff --git a/core/src/main/scala/kafka/server/Observer.scala b/core/src/main/scala/kafka/server/Observer.scala index 060e5931c17f5..624f609fbdd47 100644 --- a/core/src/main/scala/kafka/server/Observer.scala +++ b/core/src/main/scala/kafka/server/Observer.scala @@ -19,9 +19,7 @@ package kafka.server import java.util.concurrent.TimeUnit import kafka.network.RequestChannel -import org.apache.kafka.common.requests.AbstractResponse -import org.apache.kafka.common.requests.AbstractRequest -import org.apache.kafka.common.requests.RequestContext +import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, ProduceRequest, RequestContext} import org.apache.kafka.common.Configurable /** @@ -37,7 +35,7 @@ import org.apache.kafka.common.Configurable trait Observer extends Configurable { /** - * Observe the record based on the given information. + * Observe a request and its corresponding response * * @param requestContext the context information about the request * @param request the request being observed for a various purpose(s) @@ -45,6 +43,20 @@ trait Observer extends Configurable { */ def observe(requestContext: RequestContext, request: AbstractRequest, response: AbstractResponse): Unit + /** + * Observe a produce request. This method handles only the produce request since produce request is special in + * two ways. Firstly, if ACK is set to be 0, there is no produce response associated with the produce request. + * Secondly, the lifecycle of some inner fields in a ProduceRequest is shorter than the lifecycle of the produce + * request itself. That means in some situations, when observe is called on a produce request and + * response pair, some fields in the produce request has been null-ed already so that the produce request and + * response is not observable (or no useful information). Therefore this method exists for the purpose of allowing + * users to observe on the produce request before its corresponding response is created. + * + * @param requestContext the context information about the request + * @param produceRequest the produce request being observed for a various purpose(s) + */ + def observeProduceRequest(requestContext: RequestContext, produceRequest: ProduceRequest): Unit + /** * Close the observer with timeout. *