Skip to content
This repository has been archived by the owner on Nov 7, 2022. It is now read-only.

Commit

Permalink
StreamingContext doesn't change so reuse it
Browse files Browse the repository at this point in the history
grpc streaming RPC contexts don't change so we
can consider them lifetime contexts and thus we
can record the initial keys once and reuse the
same context when recording measurements.
  • Loading branch information
odeke-em committed Oct 7, 2018
1 parent a99691d commit 81c1aaf
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 9 deletions.
8 changes: 4 additions & 4 deletions interceptor/opencensus/opencensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,17 +90,17 @@ func (oci *OCInterceptor) Export(tes agenttracepb.TraceService_ExportServer) err
return errTraceExportProtocolViolation
}

spansMetricsFn := internal.NewReceivedSpansRecorder("opencensus")
spansMetricsFn := internal.NewReceivedSpansRecorderStreaming(tes.Context(), "opencensus")

processReceivedSpans := func(ctx context.Context, ni *commonpb.Node, spans []*tracepb.Span) {
processReceivedSpans := func(ni *commonpb.Node, spans []*tracepb.Span) {
// Firstly, we'll add them to the bundler.
if len(recv.Spans) > 0 {
bundlerPayload := &spansAndNode{node: ni, spans: recv.Spans}
traceBundler.Add(bundlerPayload, len(bundlerPayload.spans))
}

// We MUST unconditionally record metrics from this reception.
spansMetricsFn(ctx, ni, recv.Spans)
spansMetricsFn(ni, recv.Spans)
}

var lastNonNilNode *commonpb.Node
Expand All @@ -115,7 +115,7 @@ func (oci *OCInterceptor) Export(tes agenttracepb.TraceService_ExportServer) err
// change but we haven't examined all the corner cases for gRPC streaming
// hence we'll always pass in the context, in case the underlying implementation
// changes a detail. TODO (@odeke-em) investigate if streamer.Context() can change.
processReceivedSpans(tes.Context(), lastNonNilNode, recv.Spans)
processReceivedSpans(lastNonNilNode, recv.Spans)

recv, err = tes.Recv()
if err != nil {
Expand Down
14 changes: 9 additions & 5 deletions internal/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,18 @@ var AllViews = []*view.View{
ViewReceivedSpansInterceptor,
}

// NewReceivedSpansRecorder creates a function that uses a context created
// NewReceivedSpansRecorderStreaming creates a function that uses a context created
// from the name of the interceptor to record the number of the spans received
// by the interceptor.
func NewReceivedSpansRecorder(interceptorName string) func(context.Context, *commonpb.Node, []*tracepb.Span) {
return func(parentCtx context.Context, ni *commonpb.Node, spans []*tracepb.Span) {
ctx, _ := tag.New(parentCtx, tag.Upsert(tagKeyInterceptorName, interceptorName))
// TODO: (@odeke-em) perhaps also record information from the node?
func NewReceivedSpansRecorderStreaming(lifetimeCtx context.Context, interceptorName string) func(*commonpb.Node, []*tracepb.Span) {
// We create and reuse this context because for streaming RPCs e.g. with gRPC
// the context doesn't change, so it is more useful for avoid expensively adding
// keys on each invocation. We can create the context once and then reuse it
// when recording measurements.
ctx, _ := tag.New(lifetimeCtx, tag.Upsert(tagKeyInterceptorName, interceptorName))

return func(ni *commonpb.Node, spans []*tracepb.Span) {
// TODO: (@odeke-em) perhaps also record information from the node?
stats.Record(ctx, mReceivedSpans.M(int64(len(spans))))
}
}
Expand Down

0 comments on commit 81c1aaf

Please sign in to comment.