diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 06fa8d165c553..08c89bd4ac01a 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -2376,7 +2376,7 @@ class KafkaApis(val requestChannel: RequestChannel, if (RequestChannel.isRequestLoggingEnabled) Some(response.toString(request.context.apiVersion)) else None try { - observer.observe(request, response) + observer.observe(request.context, request.body[AbstractRequest], response) } catch { case e: Exception => error(s"Observer failed to observe ${Observer.describeRequestAndResponse(request, response)}", e) } diff --git a/core/src/main/scala/kafka/server/NoOpObserver.scala b/core/src/main/scala/kafka/server/NoOpObserver.scala index 51dc204474974..072201833246d 100644 --- a/core/src/main/scala/kafka/server/NoOpObserver.scala +++ b/core/src/main/scala/kafka/server/NoOpObserver.scala @@ -19,8 +19,7 @@ package kafka.server import java.util.Map import java.util.concurrent.TimeUnit -import kafka.network.RequestChannel -import org.apache.kafka.common.requests.AbstractResponse +import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, RequestContext} /** * An observer implementation that has no operation and serves as a place holder. @@ -32,7 +31,7 @@ class NoOpObserver extends Observer { /** * Observer the record based on the given information. */ - def observe(request: RequestChannel.Request, response: AbstractResponse): Unit = {} + def observe(requestContext: RequestContext, request: AbstractRequest, response: AbstractResponse): Unit = {} /** * Close the observer with timeout. diff --git a/core/src/main/scala/kafka/server/Observer.scala b/core/src/main/scala/kafka/server/Observer.scala index 74e54c8c075c9..060e5931c17f5 100644 --- a/core/src/main/scala/kafka/server/Observer.scala +++ b/core/src/main/scala/kafka/server/Observer.scala @@ -20,6 +20,8 @@ package kafka.server import java.util.concurrent.TimeUnit import kafka.network.RequestChannel import org.apache.kafka.common.requests.AbstractResponse +import org.apache.kafka.common.requests.AbstractRequest +import org.apache.kafka.common.requests.RequestContext import org.apache.kafka.common.Configurable /** @@ -37,10 +39,11 @@ trait Observer extends Configurable { /** * Observe the record based on the given information. * + * @param requestContext the context information about the request * @param request the request being observed for a various purpose(s) * @param response the response to the request */ - def observe(request: RequestChannel.Request, response: AbstractResponse): Unit + def observe(requestContext: RequestContext, request: AbstractRequest, response: AbstractResponse): Unit /** * Close the observer with timeout.