diff --git a/pkg/client/resty/resty.go b/pkg/client/resty/resty.go index 94589f7a6c..c9c214cbf6 100644 --- a/pkg/client/resty/resty.go +++ b/pkg/client/resty/resty.go @@ -90,7 +90,7 @@ func StdConfig(name string) *Config { // RawConfig 返回配置 func RawConfig(key string) *Config { - var config = DefaultConfig() + config := DefaultConfig() config.Name = key if err := conf.UnmarshalKey(key, &config, conf.TagName("toml")); err != nil { @@ -162,7 +162,6 @@ func (config *Config) Build() (*resty.Client, error) { entry.Exit(base.WithError(err)) } } - }) tracer := xtrace.NewTracer(trace.SpanKindClient) @@ -191,12 +190,10 @@ func (config *Config) Build() (*resty.Client, error) { }) client.SetPreRequestHook(func(c *resty.Client, r *http.Request) error { - return nil }) client.OnAfterResponse(func(c *resty.Client, r *resty.Response) error { - cost := r.Time() if config.EnableMetric { @@ -210,6 +207,12 @@ func (config *Config) Build() (*resty.Client, error) { span.SetAttributes( semconv.HTTPStatusCodeKey.Int64(int64(r.StatusCode())), ) + + if r.IsError() { + span.RecordError(errors.New(r.Status())) + span.SetStatus(codes.Error, r.Status()) + } + span.End() } diff --git a/pkg/client/rocketmq/interceptor.go b/pkg/client/rocketmq/interceptor.go index 7ac219d148..2db5cb8dc8 100644 --- a/pkg/client/rocketmq/interceptor.go +++ b/pkg/client/rocketmq/interceptor.go @@ -92,7 +92,6 @@ func consumerMetricInterceptor() primitive.Interceptor { xlog.String("host", host), xlog.String("result", result), xlog.Any("err", err)) - } else { xlog.Jupiter().Debug("push consumer", xlog.String("topic", topic), @@ -186,9 +185,7 @@ func producerDefaultInterceptor(producer *Producer) primitive.Interceptor { realReq := req.(*primitive.Message) realReply := reply.(*primitive.SendResult) - var ( - span trace.Span - ) + var span trace.Span if producer.EnableTrace { @@ -200,7 +197,6 @@ func producerDefaultInterceptor(producer *Producer) primitive.Interceptor { for k, v := range md { realReq.WithProperty(strings.ToLower(k), strings.Join(v, ",")) } - } err := next(ctx, realReq, realReply) @@ -217,11 +213,8 @@ func producerDefaultInterceptor(producer *Producer) primitive.Interceptor { if producer.EnableTrace { if err != nil { span := trace.SpanFromContext(ctx) - if err != nil { - span.RecordError(err) - span.SetStatus(codes.Error, err.Error()) - } - span.End() + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) } } diff --git a/pkg/core/xtrace/propagation.go b/pkg/core/xtrace/propagation.go new file mode 100644 index 0000000000..92751d84ed --- /dev/null +++ b/pkg/core/xtrace/propagation.go @@ -0,0 +1,177 @@ +package xtrace + +import ( + "context" + "errors" + "fmt" + "strconv" + "strings" + + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/trace" +) + +const ( + jaegerHeader = "x-dy-traceid" + separator = ":" + traceID128bitsWidth = 128 / 4 + spanIDWidth = 64 / 4 + + idPaddingChar = "0" + + flagsDebug = 0x02 + flagsSampled = 0x01 + flagsNotSampled = 0x00 + + deprecatedParentSpanID = "0" +) + +var ( + empty = trace.SpanContext{} + + errMalformedTraceContextVal = errors.New("header value of uber-trace-id should contain four different part separated by : ") + errInvalidTraceIDLength = errors.New("invalid trace id length, must be either 16 or 32") + errMalformedTraceID = errors.New("cannot decode trace id from header, should be a string of hex, lowercase trace id can't be all zero") + errInvalidSpanIDLength = errors.New("invalid span id length, must be 16") + errMalformedSpanID = errors.New("cannot decode span id from header, should be a string of hex, lowercase span id can't be all zero") + errMalformedFlag = errors.New("cannot decode flag") +) + +// Jaeger propagator serializes SpanContext to/from Jaeger Headers +// +// Jaeger format: +// +// uber-trace-id: {trace-id}:{span-id}:{parent-span-id}:{flags}. +type Jaeger struct{} + +var _ propagation.TextMapPropagator = &Jaeger{} + +// Inject injects a context to the carrier following jaeger format. +// The parent span ID is set to an dummy parent span id as the most implementations do. +func (jaeger Jaeger) Inject(ctx context.Context, carrier propagation.TextMapCarrier) { + sc := trace.SpanFromContext(ctx).SpanContext() + headers := []string{} + if !sc.TraceID().IsValid() || !sc.SpanID().IsValid() { + return + } + headers = append(headers, sc.TraceID().String(), sc.SpanID().String(), deprecatedParentSpanID) + if debugFromContext(ctx) { + headers = append(headers, fmt.Sprintf("%x", flagsDebug|flagsSampled)) + } else if sc.IsSampled() { + headers = append(headers, fmt.Sprintf("%x", flagsSampled)) + } else { + headers = append(headers, fmt.Sprintf("%x", flagsNotSampled)) + } + + carrier.Set(jaegerHeader, strings.Join(headers, separator)) +} + +// Extract extracts a context from the carrier if it contains Jaeger headers. +func (jaeger Jaeger) Extract(ctx context.Context, carrier propagation.TextMapCarrier) context.Context { + // extract tracing information + if h := carrier.Get(jaegerHeader); h != "" { + ctx, sc, err := extract(ctx, h) + if err == nil && sc.IsValid() { + return trace.ContextWithRemoteSpanContext(ctx, sc) + } + } + + return ctx +} + +func extract(ctx context.Context, headerVal string) (context.Context, trace.SpanContext, error) { + var ( + scc = trace.SpanContextConfig{} + err error + ) + + parts := strings.Split(headerVal, separator) + if len(parts) != 4 { + return ctx, empty, errMalformedTraceContextVal + } + + // extract trace ID + if parts[0] != "" { + id := parts[0] + if len(id) > traceID128bitsWidth { + return ctx, empty, errInvalidTraceIDLength + } + // padding when length is less than 32 + if len(id) < traceID128bitsWidth { + padCharCount := traceID128bitsWidth - len(id) + id = strings.Repeat(idPaddingChar, padCharCount) + id + } + scc.TraceID, err = trace.TraceIDFromHex(id) + if err != nil { + return ctx, empty, errMalformedTraceID + } + } + + // extract span ID + if parts[1] != "" { + id := parts[1] + if len(id) > spanIDWidth { + return ctx, empty, errInvalidSpanIDLength + } + // padding when length is less than 16 + if len(id) < spanIDWidth { + padCharCount := spanIDWidth - len(id) + id = strings.Repeat(idPaddingChar, padCharCount) + id + } + scc.SpanID, err = trace.SpanIDFromHex(id) + if err != nil { + return ctx, empty, errMalformedSpanID + } + } + + // skip third part as it is deprecated + + // extract flag + if parts[3] != "" { + flagStr := parts[3] + flag, err := strconv.ParseInt(flagStr, 16, 64) + if err != nil { + return ctx, empty, errMalformedFlag + } + if flag&flagsSampled == flagsSampled { + // if sample bit is set, we check if debug bit is also set + if flag&flagsDebug == flagsDebug { + scc.TraceFlags |= trace.FlagsSampled + ctx = withDebug(ctx, true) + } else { + scc.TraceFlags |= trace.FlagsSampled + } + } + // ignore other bit, including firehose since we don't have corresponding flag in trace context. + } + return ctx, trace.NewSpanContext(scc), nil +} + +// Fields returns the Jaeger header key whose value is set with Inject. +func (jaeger Jaeger) Fields() []string { + return []string{jaegerHeader} +} + +type jaegerKeyType int + +const ( + debugKey jaegerKeyType = iota +) + +// withDebug returns a copy of parent with debug set as the debug flag value . +func withDebug(parent context.Context, debug bool) context.Context { + return context.WithValue(parent, debugKey, debug) +} + +// debugFromContext returns the debug value stored in ctx. +// +// If no debug value is stored in ctx false is returned. +func debugFromContext(ctx context.Context) bool { + if ctx == nil { + return false + } + if debug, ok := ctx.Value(debugKey).(bool); ok { + return debug + } + return false +} diff --git a/pkg/core/xtrace/trace.go b/pkg/core/xtrace/trace.go index 7379dd07b2..59100fe7e9 100644 --- a/pkg/core/xtrace/trace.go +++ b/pkg/core/xtrace/trace.go @@ -29,49 +29,39 @@ import ( func SetGlobalTracer(tp trace.TracerProvider) { xlog.Jupiter().Info("set global tracer", xlog.FieldMod("trace")) + propagator := propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, Jaeger{}) + // be compatible with opentracing bridge, wrapperTracerProvider := otelOpentracing.NewTracerPair(tp.Tracer("")) + bridge.SetTextMapPropagator(propagator) opentracing.SetGlobalTracer(bridge) - otel.SetTracerProvider(wrapperTracerProvider) -} -type options struct { - propagator propagation.TextMapPropagator + otel.SetTextMapPropagator(propagator) + otel.SetTracerProvider(wrapperTracerProvider) } -// Option is tracing option. -type Option func(*options) - // Tracer is otel span tracer type Tracer struct { tracer trace.Tracer kind trace.SpanKind - opt *options } // NewTracer create tracer instance -func NewTracer(kind trace.SpanKind, opts ...Option) *Tracer { - op := options{ - propagator: propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}), - //propagator: propagation.NewCompositeTextMapPropagator(Metadata{}, propagation.Baggage{}, propagation.TraceContext{}), - } - for _, o := range opts { - o(&op) - } - return &Tracer{tracer: otel.Tracer("jupiter"), kind: kind, opt: &op} +func NewTracer(kind trace.SpanKind) *Tracer { + return &Tracer{tracer: otel.Tracer("jupiter"), kind: kind} } // Start start tracing span func (t *Tracer) Start(ctx context.Context, operation string, carrier propagation.TextMapCarrier, opts ...trace.SpanStartOption) (context.Context, trace.Span) { if (t.kind == trace.SpanKindServer || t.kind == trace.SpanKindConsumer) && carrier != nil { - ctx = t.opt.propagator.Extract(ctx, carrier) + ctx = otel.GetTextMapPropagator().Extract(ctx, carrier) } opts = append(opts, trace.WithSpanKind(t.kind)) ctx, span := t.tracer.Start(ctx, operation, opts...) if (t.kind == trace.SpanKindClient || t.kind == trace.SpanKindProducer) && carrier != nil { - t.opt.propagator.Inject(ctx, carrier) + otel.GetTextMapPropagator().Inject(ctx, carrier) } return ctx, span } diff --git a/pkg/server/xecho/middleware.go b/pkg/server/xecho/middleware.go index e1f6a6a419..5f50c68e9e 100644 --- a/pkg/server/xecho/middleware.go +++ b/pkg/server/xecho/middleware.go @@ -28,6 +28,7 @@ import ( "github.com/douyu/jupiter/pkg/xlog" "github.com/labstack/echo/v4" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/propagation" semconv "go.opentelemetry.io/otel/semconv/v1.12.0" "go.opentelemetry.io/otel/trace" @@ -42,8 +43,8 @@ func extractAID(c echo.Context) string { func recoverMiddleware(slowQueryThresholdInMilli int64) echo.MiddlewareFunc { return func(next echo.HandlerFunc) echo.HandlerFunc { return func(ctx echo.Context) (err error) { - var beg = time.Now() - var fields = make([]xlog.Field, 0, 8) + beg := time.Now() + fields := make([]xlog.Field, 0, 8) defer func() { logger := xlog.J(ctx.Request().Context()) @@ -112,7 +113,15 @@ func traceServerInterceptor() echo.MiddlewareFunc { ctx = xlog.NewContext(ctx, xlog.Jupiter(), span.SpanContext().TraceID().String()) c.SetRequest(c.Request().WithContext(ctx)) - defer span.End() + defer func() { + if err != nil { + span.SetStatus(codes.Error, err.Error()) + span.RecordError(err) + } + + span.End() + }() + return next(c) } } diff --git a/pkg/server/xgrpc/interceptor.go b/pkg/server/xgrpc/interceptor.go index ddba563eff..9b9e4d3725 100644 --- a/pkg/server/xgrpc/interceptor.go +++ b/pkg/server/xgrpc/interceptor.go @@ -155,9 +155,9 @@ func extractAID(ctx context.Context) string { func defaultStreamServerInterceptor(logger *xlog.Logger, c *Config) grpc.StreamServerInterceptor { return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (err error) { - var beg = time.Now() - var fields = make([]xlog.Field, 0, 8) - var event = "normal" + beg := time.Now() + fields := make([]xlog.Field, 0, 8) + event := "normal" defer func() { if c.SlowQueryThresholdInMilli > 0 { if int64(time.Since(beg))/1e6 > c.SlowQueryThresholdInMilli { @@ -205,9 +205,9 @@ func defaultStreamServerInterceptor(logger *xlog.Logger, c *Config) grpc.StreamS func defaultUnaryServerInterceptor(logger *xlog.Logger, c *Config) grpc.UnaryServerInterceptor { return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { - var beg = time.Now() - var fields = make([]xlog.Field, 0, 8) - var event = "normal" + beg := time.Now() + fields := make([]xlog.Field, 0, 8) + event := "normal" defer func() { if c.SlowQueryThresholdInMilli > 0 { if int64(time.Since(beg))/1e6 > c.SlowQueryThresholdInMilli { @@ -266,7 +266,7 @@ func getClientIP(ctx context.Context) (string, error) { } func getPeer(ctx context.Context) map[string]string { - var peerMeta = make(map[string]string) + peerMeta := make(map[string]string) if md, ok := metadata.FromIncomingContext(ctx); ok { if val, ok := md["aid"]; ok { peerMeta["aid"] = strings.Join(val, ";") @@ -286,7 +286,6 @@ func getPeer(ctx context.Context) map[string]string { } } return peerMeta - } func NewSentinelUnaryServerInterceptor() grpc.UnaryServerInterceptor { diff --git a/pkg/store/gorm/interceptor.go b/pkg/store/gorm/interceptor.go index 67ae85c5e9..4b5865ad02 100644 --- a/pkg/store/gorm/interceptor.go +++ b/pkg/store/gorm/interceptor.go @@ -24,6 +24,7 @@ import ( "github.com/douyu/jupiter/pkg/core/xtrace" "github.com/douyu/jupiter/pkg/xlog" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/propagation" semconv "go.opentelemetry.io/otel/semconv/v1.12.0" "go.opentelemetry.io/otel/trace" @@ -31,8 +32,10 @@ import ( "gorm.io/gorm" ) -type Handler func(*gorm.DB) -type Interceptor func(dsn *DSN, op string, options *Config, next Handler) Handler +type ( + Handler func(*gorm.DB) + Interceptor func(dsn *DSN, op string, options *Config, next Handler) Handler +) var errSlowCommand = errors.New("mysql slow command") @@ -88,7 +91,6 @@ func traceInterceptor() Interceptor { return func(dsn *DSN, op string, options *Config, next Handler) Handler { return func(scope *gorm.DB) { - if ctx := scope.Statement.Context; ctx != nil { md := metadata.New(nil) @@ -104,11 +106,15 @@ func traceInterceptor() Interceptor { next(scope) + if scope.Error != nil { + span.RecordError(scope.Error) + span.SetStatus(codes.Error, scope.Error.Error()) + } + return } next(scope) - } } }