Skip to content

Commit

Permalink
Adjust broker-side observer interface so that the interface only take…
Browse files Browse the repository at this point in the history
…s Java objects as arguments. (#10)

Reviewers: Radai Rosenblatt, Ke Hu
  • Loading branch information
Lincong authored Mar 30, 2019
1 parent 768baa4 commit e8d2de1
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 5 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2376,7 +2376,7 @@ class KafkaApis(val requestChannel: RequestChannel,
if (RequestChannel.isRequestLoggingEnabled) Some(response.toString(request.context.apiVersion))
else None
try {
observer.observe(request, response)
observer.observe(request.context, request.body[AbstractRequest], response)
} catch {
case e: Exception => error(s"Observer failed to observe ${Observer.describeRequestAndResponse(request, response)}", e)
}
Expand Down
5 changes: 2 additions & 3 deletions core/src/main/scala/kafka/server/NoOpObserver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ package kafka.server

import java.util.Map
import java.util.concurrent.TimeUnit
import kafka.network.RequestChannel
import org.apache.kafka.common.requests.AbstractResponse
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, RequestContext}

/**
* An observer implementation that has no operation and serves as a place holder.
Expand All @@ -32,7 +31,7 @@ class NoOpObserver extends Observer {
/**
* Observer the record based on the given information.
*/
def observe(request: RequestChannel.Request, response: AbstractResponse): Unit = {}
def observe(requestContext: RequestContext, request: AbstractRequest, response: AbstractResponse): Unit = {}

/**
* Close the observer with timeout.
Expand Down
5 changes: 4 additions & 1 deletion core/src/main/scala/kafka/server/Observer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package kafka.server
import java.util.concurrent.TimeUnit
import kafka.network.RequestChannel
import org.apache.kafka.common.requests.AbstractResponse
import org.apache.kafka.common.requests.AbstractRequest
import org.apache.kafka.common.requests.RequestContext
import org.apache.kafka.common.Configurable

/**
Expand All @@ -37,10 +39,11 @@ trait Observer extends Configurable {
/**
* Observe the record based on the given information.
*
* @param requestContext the context information about the request
* @param request the request being observed for a various purpose(s)
* @param response the response to the request
*/
def observe(request: RequestChannel.Request, response: AbstractResponse): Unit
def observe(requestContext: RequestContext, request: AbstractRequest, response: AbstractResponse): Unit

/**
* Close the observer with timeout.
Expand Down

0 comments on commit e8d2de1

Please sign in to comment.