Skip to content

Commit

Permalink
Adjust the observer interface to make handling produce request a spec…
Browse files Browse the repository at this point in the history
…ial case. (#16)
  • Loading branch information
Lincong authored Apr 19, 2019
1 parent cf18791 commit daaf4a5
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 11 deletions.
21 changes: 16 additions & 5 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,13 @@ class KafkaApis(val requestChannel: RequestChannel,
responseCallback = sendResponseCallback,
recordConversionStatsCallback = processingStatsCallback)

try
observer.observeProduceRequest(request.context, request.body[ProduceRequest])
catch {
case e: Exception => error(s"Observer failed to observe the produce request " +
s"${Observer.describeRequestAndResponse(request, null)}", e)
}

// if the request is put into the purgatory, it will have a held reference and hence cannot be garbage collected;
// hence we clear its data here in order to let GC reclaim its memory since it is already appended to log
produceRequest.clearPartitionRecords()
Expand Down Expand Up @@ -2382,14 +2389,11 @@ class KafkaApis(val requestChannel: RequestChannel,
val responseString =
if (RequestChannel.isRequestLoggingEnabled) Some(response.toString(request.context.apiVersion))
else None
try {
observer.observe(request.context, request.body[AbstractRequest], response)
} catch {
case e: Exception => error(s"Observer failed to observe ${Observer.describeRequestAndResponse(request, response)}", e)
}
observeRequestResponse(request, response)

new RequestChannel.SendResponse(request, responseSend, responseString, onComplete)
case None =>
observeRequestResponse(request, null)
new RequestChannel.NoOpResponse(request)
}
sendResponse(response)
Expand All @@ -2411,4 +2415,11 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}

private def observeRequestResponse(request: RequestChannel.Request, response: AbstractResponse): Unit = {
try {
observer.observe(request.context, request.body[AbstractRequest], response)
} catch {
case e: Exception => error(s"Observer failed to observe ${Observer.describeRequestAndResponse(request, response)}", e)
}
}
}
9 changes: 7 additions & 2 deletions core/src/main/scala/kafka/server/NoOpObserver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package kafka.server

import java.util.Map
import java.util.concurrent.TimeUnit
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, RequestContext}
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, ProduceRequest, RequestContext}

/**
* An observer implementation that has no operation and serves as a place holder.
Expand All @@ -29,10 +29,15 @@ class NoOpObserver extends Observer {
def configure(configs: Map[String, _]): Unit = {}

/**
* Observer the record based on the given information.
* Observe a request and its corresponding response.
*/
def observe(requestContext: RequestContext, request: AbstractRequest, response: AbstractResponse): Unit = {}

/**
* Observe a produce request
*/
def observeProduceRequest(requestContext: RequestContext, produceRequest: ProduceRequest): Unit = {}

/**
* Close the observer with timeout.
*/
Expand Down
20 changes: 16 additions & 4 deletions core/src/main/scala/kafka/server/Observer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@ package kafka.server

import java.util.concurrent.TimeUnit
import kafka.network.RequestChannel
import org.apache.kafka.common.requests.AbstractResponse
import org.apache.kafka.common.requests.AbstractRequest
import org.apache.kafka.common.requests.RequestContext
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, ProduceRequest, RequestContext}
import org.apache.kafka.common.Configurable

/**
Expand All @@ -37,14 +35,28 @@ import org.apache.kafka.common.Configurable
trait Observer extends Configurable {

/**
* Observe the record based on the given information.
* Observe a request and its corresponding response
*
* @param requestContext the context information about the request
* @param request the request being observed for a various purpose(s)
* @param response the response to the request
*/
def observe(requestContext: RequestContext, request: AbstractRequest, response: AbstractResponse): Unit

/**
* Observe a produce request. This method handles only the produce request since produce request is special in
* two ways. Firstly, if ACK is set to be 0, there is no produce response associated with the produce request.
* Secondly, the lifecycle of some inner fields in a ProduceRequest is shorter than the lifecycle of the produce
* request itself. That means in some situations, when <code>observe</code> is called on a produce request and
* response pair, some fields in the produce request has been null-ed already so that the produce request and
* response is not observable (or no useful information). Therefore this method exists for the purpose of allowing
* users to observe on the produce request before its corresponding response is created.
*
* @param requestContext the context information about the request
* @param produceRequest the produce request being observed for a various purpose(s)
*/
def observeProduceRequest(requestContext: RequestContext, produceRequest: ProduceRequest): Unit

/**
* Close the observer with timeout.
*
Expand Down

0 comments on commit daaf4a5

Please sign in to comment.