diff --git a/cmd/ocagent/main.go b/cmd/ocagent/main.go index bb9ecf4f..d8542d3a 100644 --- a/cmd/ocagent/main.go +++ b/cmd/ocagent/main.go @@ -26,13 +26,14 @@ import ( "os/signal" "time" - "google.golang.org/grpc" - agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1" "github.com/census-instrumentation/opencensus-service/cmd/ocagent/exporterparser" "github.com/census-instrumentation/opencensus-service/exporter" "github.com/census-instrumentation/opencensus-service/interceptor/opencensus" + "github.com/census-instrumentation/opencensus-service/internal" "github.com/census-instrumentation/opencensus-service/spanreceiver" + "go.opencensus.io/plugin/ocgrpc" + "go.opencensus.io/stats/view" ) func main() { @@ -84,7 +85,14 @@ func runOCInterceptor(ocInterceptorPort int, sr spanreceiver.SpanReceiver) (done if err != nil { return nil, fmt.Errorf("Cannot bind to address %q: %v", addr, err) } - srv := grpc.NewServer() + srv := internal.GRPCServerWithObservabilityEnabled() + if err := view.Register(internal.AllViews...); err != nil { + return nil, fmt.Errorf("Failed to register internal.AllViews: %v", err) + } + if err := view.Register(ocgrpc.DefaultServerViews...); err != nil { + return nil, fmt.Errorf("Failed to register ocgrpc.DefaultServerViews: %v", err) + } + agenttracepb.RegisterTraceServiceServer(srv, oci) go func() { log.Printf("Running OpenCensus interceptor as a gRPC service at %q", addr) diff --git a/interceptor/opencensus/observability_test.go b/interceptor/opencensus/observability_test.go new file mode 100644 index 00000000..bb6833f3 --- /dev/null +++ b/interceptor/opencensus/observability_test.go @@ -0,0 +1,216 @@ +// Copyright 2018, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ocinterceptor_test + +import ( + "fmt" + "reflect" + "strings" + "sync" + "testing" + "time" + + "contrib.go.opencensus.io/exporter/ocagent" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" + "go.opencensus.io/trace" + + commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" + agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1" + "github.com/census-instrumentation/opencensus-service/interceptor/opencensus" + "github.com/census-instrumentation/opencensus-service/internal" +) + +// Ensure that if we add a metrics exporter that our target metrics +// will be recorded but also with the proper tag keys and values. +// See Issue https://github.com/census-instrumentation/opencensus-service/issues/63 +// +// Note: we are intentionally skipping the ocgrpc.ServerDefaultViews as this +// test is to ensure exactness, but with the mentioned views registered, the +// output will be quite noisy. +func TestEnsureRecordedMetrics(t *testing.T) { + sappender := newSpanAppender() + + _, port, doneFn := ocInterceptorOnGRPCServer(t, sappender, ocinterceptor.WithSpanBufferPeriod(2*time.Millisecond)) + defer doneFn() + + // Now the opencensus-agent exporter. + oce, err := ocagent.NewExporter(ocagent.WithPort(uint16(port)), ocagent.WithInsecure()) + if err != nil { + t.Fatalf("Failed to create the ocagent-exporter: %v", err) + } + trace.RegisterExporter(oce) + defer func() { + oce.Stop() + trace.UnregisterExporter(oce) + }() + + // Now for the stats exporter + if err := view.Register(internal.AllViews...); err != nil { + t.Fatalf("Failed to register all views: %v", err) + } + defer view.Unregister(internal.AllViews...) + + metricsReportingPeriod := 5 * time.Millisecond + view.SetReportingPeriod(metricsReportingPeriod) + // On exit, revert the metrics reporting period. + defer view.SetReportingPeriod(60 * time.Second) + + cme := newCountMetricsExporter() + view.RegisterExporter(cme) + defer view.UnregisterExporter(cme) + + n := 20 + // Now it is time to send over some spans + // and we'll count the numbers received. + for i := 0; i < n; i++ { + now := time.Now().UTC() + oce.ExportSpan(&trace.SpanData{ + StartTime: now.Add(-10 * time.Second), + EndTime: now.Add(20 * time.Second), + SpanContext: trace.SpanContext{ + TraceID: trace.TraceID{byte(0x20 + i), 0x4E, 0x4D, 0x4C, 0x4B, 0x4A, 0x49, 0x48, 0x47, 0x46, 0x45, 0x44, 0x43, 0x42, 0x41}, + SpanID: trace.SpanID{0x7F, 0x7E, 0x7D, 0x7C, 0x7B, 0x7A, 0x79, 0x78}, + TraceOptions: trace.TraceOptions(i & 0x01), + }, + ParentSpanID: trace.SpanID{byte(0x01 + i), 0x31, 0x32, 0x33, 0x34, 0x35, 0x36, 0x37}, + Name: fmt.Sprintf("Span-%d", i), + Status: trace.Status{Code: trace.StatusCodeInternal, Message: "Blocked by firewall"}, + }) + } + + // Give them some time to be exported. + // say n * metricsReportingPeriod + <-time.After(time.Duration(n) * metricsReportingPeriod) + oce.Flush() + + checkCountMetricsExporterResults(t, cme, n, 1) +} + +func TestEnsureRecordedMetrics_zeroLengthSpansSender(t *testing.T) { + t.Skipf("Currently disabled, enable this test when the following are fixed:\nIssue %s\nPR %s", + "https://github.com/census-instrumentation/opencensus-go/issues/862", + "https://github.com/census-instrumentation/opencensus-go/pull/922", + ) + sappender := newSpanAppender() + + _, port, doneFn := ocInterceptorOnGRPCServer(t, sappender, ocinterceptor.WithSpanBufferPeriod(2*time.Millisecond)) + defer doneFn() + + // Now the opencensus-agent exporter. + oce, err := ocagent.NewExporter(ocagent.WithPort(uint16(port)), ocagent.WithInsecure()) + if err != nil { + t.Fatalf("Failed to create the ocagent-exporter: %v", err) + } + trace.RegisterExporter(oce) + defer func() { + oce.Stop() + trace.UnregisterExporter(oce) + }() + + // Now for the stats exporter + if err := view.Register(internal.AllViews...); err != nil { + t.Fatalf("Failed to register all views: %v", err) + } + defer view.Unregister(internal.AllViews...) + + metricsReportingPeriod := 10 * time.Millisecond + view.SetReportingPeriod(metricsReportingPeriod) + // On exit, revert the metrics reporting period. + defer view.SetReportingPeriod(60 * time.Second) + + cme := newCountMetricsExporter() + view.RegisterExporter(cme) + defer view.UnregisterExporter(cme) + + n := 20 + // Now for the traceExporter that sends 0 length spans + traceSvcClient, traceSvcDoneFn, err := makeTraceServiceClient(port) + if err != nil { + t.Fatalf("Failed to create the trace service client: %v", err) + } + defer traceSvcDoneFn() + for i := 0; i <= n; i++ { + _ = traceSvcClient.Send(&agenttracepb.ExportTraceServiceRequest{Spans: nil, Node: &commonpb.Node{}}) + } + <-time.After(time.Duration(n) * metricsReportingPeriod) + checkCountMetricsExporterResults(t, cme, n, 0) +} + +func checkCountMetricsExporterResults(t *testing.T, cme *countMetricsExporter, n int, wantAllCountsToBe int64) { + cme.mu.Lock() + defer cme.mu.Unlock() + + // The only tags that we are expecting are "opencensus_interceptor": "opencensus" * n + wantTagKey, _ := tag.NewKey("opencensus_interceptor") + valuesPlusBlank := strings.Split(strings.Repeat("opencensus,opencensus,", n/2), ",") + wantValues := valuesPlusBlank[:len(valuesPlusBlank)-1] + wantTags := map[tag.Key][]string{ + wantTagKey: wantValues, + } + + gotTags := cme.tags + if !reflect.DeepEqual(gotTags, wantTags) { + t.Errorf("\nGotTags:\n\t%#v\n\nWantTags:\n\t%#v\n", gotTags, wantTags) + } + + // The only data types we are expecting are: + // * DistributionData + for key, aggregation := range cme.data { + switch agg := aggregation.(type) { + case *view.DistributionData: + if g, w := agg.Count, int64(1); g != w { + t.Errorf("Data point #%d GotCount %d Want %d", key, g, w) + } + default: + t.Errorf("Data point #%d Got %T want %T", key, agg, (*view.DistributionData)(nil)) + } + } +} + +type countMetricsExporter struct { + mu sync.Mutex + tags map[tag.Key][]string + data map[int]view.AggregationData +} + +func newCountMetricsExporter() *countMetricsExporter { + return &countMetricsExporter{ + tags: make(map[tag.Key][]string), + data: make(map[int]view.AggregationData), + } +} + +func (cme *countMetricsExporter) clear() { + cme.mu.Lock() + defer cme.mu.Unlock() + + cme.data = make(map[int]view.AggregationData) + cme.tags = make(map[tag.Key][]string) +} + +var _ view.Exporter = (*countMetricsExporter)(nil) + +func (cme *countMetricsExporter) ExportView(vd *view.Data) { + cme.mu.Lock() + defer cme.mu.Unlock() + + for _, row := range vd.Rows { + cme.data[len(cme.data)] = row.Data + for _, tag_ := range row.Tags { + cme.tags[tag_.Key] = append(cme.tags[tag_.Key], tag_.Value) + } + } +} diff --git a/interceptor/opencensus/opencensus.go b/interceptor/opencensus/opencensus.go index 3aed74d9..13fe02d3 100644 --- a/interceptor/opencensus/opencensus.go +++ b/interceptor/opencensus/opencensus.go @@ -24,6 +24,7 @@ import ( commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1" tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" + "github.com/census-instrumentation/opencensus-service/internal" "github.com/census-instrumentation/opencensus-service/spanreceiver" ) @@ -89,8 +90,20 @@ func (oci *OCInterceptor) Export(tes agenttracepb.TraceService_ExportServer) err return errTraceExportProtocolViolation } - var lastNonNilNode *commonpb.Node + spansMetricsFn := internal.NewReceivedSpansRecorderStreaming(tes.Context(), "opencensus") + + processReceivedSpans := func(ni *commonpb.Node, spans []*tracepb.Span) { + // Firstly, we'll add them to the bundler. + if len(recv.Spans) > 0 { + bundlerPayload := &spansAndNode{node: ni, spans: recv.Spans} + traceBundler.Add(bundlerPayload, len(bundlerPayload.spans)) + } + // We MUST unconditionally record metrics from this reception. + spansMetricsFn(ni, recv.Spans) + } + + var lastNonNilNode *commonpb.Node // Now that we've got the first message with a Node, we can start to receive streamed up spans. for { // If a Node has been sent from downstream, save and use it. @@ -98,11 +111,7 @@ func (oci *OCInterceptor) Export(tes agenttracepb.TraceService_ExportServer) err lastNonNilNode = recv.Node } - // Otherwise add them to the bundler. - if len(recv.Spans) > 0 { - bundlerPayload := &spansAndNode{node: lastNonNilNode, spans: recv.Spans} - traceBundler.Add(bundlerPayload, len(bundlerPayload.spans)) - } + processReceivedSpans(lastNonNilNode, recv.Spans) recv, err = tes.Recv() if err != nil { diff --git a/interceptor/opencensus/opencensus_test.go b/interceptor/opencensus/opencensus_test.go index 639610b3..2e08fb68 100644 --- a/interceptor/opencensus/opencensus_test.go +++ b/interceptor/opencensus/opencensus_test.go @@ -38,6 +38,7 @@ import ( agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1" tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" "github.com/census-instrumentation/opencensus-service/interceptor/opencensus" + "github.com/census-instrumentation/opencensus-service/internal" "github.com/census-instrumentation/opencensus-service/spanreceiver" "go.opencensus.io/trace" "go.opencensus.io/trace/tracestate" @@ -483,7 +484,7 @@ func ocInterceptorOnGRPCServer(t *testing.T, sr spanreceiver.SpanReceiver, opts } // Now run it as a gRPC server - srv := grpc.NewServer() + srv := internal.GRPCServerWithObservabilityEnabled() agenttracepb.RegisterTraceServiceServer(srv, oci) go func() { _ = srv.Serve(ln) diff --git a/interceptor/opencensus/trace_interceptor.go b/interceptor/opencensus/trace_interceptor.go index 0544539a..88ee7f78 100644 --- a/interceptor/opencensus/trace_interceptor.go +++ b/interceptor/opencensus/trace_interceptor.go @@ -25,6 +25,7 @@ import ( agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1" "github.com/census-instrumentation/opencensus-service/interceptor" + "github.com/census-instrumentation/opencensus-service/internal" "github.com/census-instrumentation/opencensus-service/spanreceiver" ) @@ -86,9 +87,7 @@ func (ocih *ocInterceptorHandler) startInternal(ctx context.Context, sr spanrece return err } - // TODO: (@odeke-em) in the future, also add OpenCensus - // stats handlers to start this gPRC server. - srv := grpc.NewServer() + srv := internal.GRPCServerWithObservabilityEnabled() agenttracepb.RegisterTraceServiceServer(srv, oci) go func() { diff --git a/internal/observability.go b/internal/observability.go new file mode 100644 index 00000000..6a361011 --- /dev/null +++ b/internal/observability.go @@ -0,0 +1,75 @@ +// Copyright 2018, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +// This file contains helpers that are useful to add observability +// with metrics and tracing using OpenCensus to the various pieces +// of the service. + +import ( + "context" + + "google.golang.org/grpc" + + "go.opencensus.io/plugin/ocgrpc" + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" + + commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" + tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" +) + +var tagKeyInterceptorName, _ = tag.NewKey("opencensus_interceptor") +var mReceivedSpans = stats.Int64("oc.io/interceptor/received_spans", "Counts the number of spans received by the interceptor", "1") + +var ViewReceivedSpansInterceptor = &view.View{ + Name: "oc.io/interceptor/received_spans", + Description: "The number of spans received by the interceptor", + Measure: mReceivedSpans, + Aggregation: view.Distribution( + 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 14, 16, 18, 20, 25, 30, 35, 40, 45, 50, 60, 70, 80, 90, + 100, 150, 200, 250, 300, 450, 500, 600, 700, 800, 900, 1000, 1200, 1400, 1600, 1800, 2000, + ), + TagKeys: []tag.Key{tagKeyInterceptorName}, +} + +var AllViews = []*view.View{ + ViewReceivedSpansInterceptor, +} + +// NewReceivedSpansRecorderStreaming creates a function that uses a context created +// from the name of the interceptor to record the number of the spans received +// by the interceptor. +func NewReceivedSpansRecorderStreaming(lifetimeCtx context.Context, interceptorName string) func(*commonpb.Node, []*tracepb.Span) { + // We create and reuse this context because for streaming RPCs e.g. with gRPC + // the context doesn't change, so it is more useful for avoid expensively adding + // keys on each invocation. We can create the context once and then reuse it + // when recording measurements. + ctx, _ := tag.New(lifetimeCtx, tag.Upsert(tagKeyInterceptorName, interceptorName)) + + return func(ni *commonpb.Node, spans []*tracepb.Span) { + // TODO: (@odeke-em) perhaps also record information from the node? + stats.Record(ctx, mReceivedSpans.M(int64(len(spans)))) + } +} + +// GRPCServerWithObservabilityEnabled creates a gRPC server that at a bare minimum has +// the OpenCensus ocgrpc server stats handler enabled for tracing and stats. +// Use it instead of invoking grpc.NewServer directly. +func GRPCServerWithObservabilityEnabled(extraOpts ...grpc.ServerOption) *grpc.Server { + opts := append(extraOpts, grpc.StatsHandler(&ocgrpc.ServerHandler{})) + return grpc.NewServer(opts...) +}