Skip to content

Commit

Permalink
feat: fully compatible with opentracing (#1170)
Browse files Browse the repository at this point in the history
* feat: record error for otel

* add jaeger propagation

* add SetTextMapPropagator

* fix opentracing compatible

* simplify tracer
  • Loading branch information
sysulq authored Jan 17, 2025
1 parent 9dfd719 commit bf199f9
Show file tree
Hide file tree
Showing 7 changed files with 225 additions and 48 deletions.
11 changes: 7 additions & 4 deletions pkg/client/resty/resty.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func StdConfig(name string) *Config {

// RawConfig 返回配置
func RawConfig(key string) *Config {
var config = DefaultConfig()
config := DefaultConfig()
config.Name = key

if err := conf.UnmarshalKey(key, &config, conf.TagName("toml")); err != nil {
Expand Down Expand Up @@ -162,7 +162,6 @@ func (config *Config) Build() (*resty.Client, error) {
entry.Exit(base.WithError(err))
}
}

})

tracer := xtrace.NewTracer(trace.SpanKindClient)
Expand Down Expand Up @@ -191,12 +190,10 @@ func (config *Config) Build() (*resty.Client, error) {
})

client.SetPreRequestHook(func(c *resty.Client, r *http.Request) error {

return nil
})

client.OnAfterResponse(func(c *resty.Client, r *resty.Response) error {

cost := r.Time()

if config.EnableMetric {
Expand All @@ -210,6 +207,12 @@ func (config *Config) Build() (*resty.Client, error) {
span.SetAttributes(
semconv.HTTPStatusCodeKey.Int64(int64(r.StatusCode())),
)

if r.IsError() {
span.RecordError(errors.New(r.Status()))
span.SetStatus(codes.Error, r.Status())
}

span.End()
}

Expand Down
13 changes: 3 additions & 10 deletions pkg/client/rocketmq/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ func consumerMetricInterceptor() primitive.Interceptor {
xlog.String("host", host),
xlog.String("result", result),
xlog.Any("err", err))

} else {
xlog.Jupiter().Debug("push consumer",
xlog.String("topic", topic),
Expand Down Expand Up @@ -186,9 +185,7 @@ func producerDefaultInterceptor(producer *Producer) primitive.Interceptor {
realReq := req.(*primitive.Message)
realReply := reply.(*primitive.SendResult)

var (
span trace.Span
)
var span trace.Span

if producer.EnableTrace {

Expand All @@ -200,7 +197,6 @@ func producerDefaultInterceptor(producer *Producer) primitive.Interceptor {
for k, v := range md {
realReq.WithProperty(strings.ToLower(k), strings.Join(v, ","))
}

}

err := next(ctx, realReq, realReply)
Expand All @@ -217,11 +213,8 @@ func producerDefaultInterceptor(producer *Producer) primitive.Interceptor {
if producer.EnableTrace {
if err != nil {
span := trace.SpanFromContext(ctx)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
}
span.End()
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
}
}

Expand Down
177 changes: 177 additions & 0 deletions pkg/core/xtrace/propagation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
package xtrace

import (
"context"
"errors"
"fmt"
"strconv"
"strings"

"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
)

const (
jaegerHeader = "x-dy-traceid"
separator = ":"
traceID128bitsWidth = 128 / 4
spanIDWidth = 64 / 4

idPaddingChar = "0"

flagsDebug = 0x02
flagsSampled = 0x01
flagsNotSampled = 0x00

deprecatedParentSpanID = "0"
)

var (
empty = trace.SpanContext{}

errMalformedTraceContextVal = errors.New("header value of uber-trace-id should contain four different part separated by : ")
errInvalidTraceIDLength = errors.New("invalid trace id length, must be either 16 or 32")
errMalformedTraceID = errors.New("cannot decode trace id from header, should be a string of hex, lowercase trace id can't be all zero")
errInvalidSpanIDLength = errors.New("invalid span id length, must be 16")
errMalformedSpanID = errors.New("cannot decode span id from header, should be a string of hex, lowercase span id can't be all zero")
errMalformedFlag = errors.New("cannot decode flag")
)

// Jaeger propagator serializes SpanContext to/from Jaeger Headers
//
// Jaeger format:
//
// uber-trace-id: {trace-id}:{span-id}:{parent-span-id}:{flags}.
type Jaeger struct{}

var _ propagation.TextMapPropagator = &Jaeger{}

// Inject injects a context to the carrier following jaeger format.
// The parent span ID is set to an dummy parent span id as the most implementations do.
func (jaeger Jaeger) Inject(ctx context.Context, carrier propagation.TextMapCarrier) {
sc := trace.SpanFromContext(ctx).SpanContext()
headers := []string{}
if !sc.TraceID().IsValid() || !sc.SpanID().IsValid() {
return
}
headers = append(headers, sc.TraceID().String(), sc.SpanID().String(), deprecatedParentSpanID)
if debugFromContext(ctx) {
headers = append(headers, fmt.Sprintf("%x", flagsDebug|flagsSampled))
} else if sc.IsSampled() {
headers = append(headers, fmt.Sprintf("%x", flagsSampled))
} else {
headers = append(headers, fmt.Sprintf("%x", flagsNotSampled))
}

carrier.Set(jaegerHeader, strings.Join(headers, separator))
}

// Extract extracts a context from the carrier if it contains Jaeger headers.
func (jaeger Jaeger) Extract(ctx context.Context, carrier propagation.TextMapCarrier) context.Context {
// extract tracing information
if h := carrier.Get(jaegerHeader); h != "" {
ctx, sc, err := extract(ctx, h)
if err == nil && sc.IsValid() {
return trace.ContextWithRemoteSpanContext(ctx, sc)
}
}

return ctx
}

func extract(ctx context.Context, headerVal string) (context.Context, trace.SpanContext, error) {
var (
scc = trace.SpanContextConfig{}
err error
)

parts := strings.Split(headerVal, separator)
if len(parts) != 4 {
return ctx, empty, errMalformedTraceContextVal
}

// extract trace ID
if parts[0] != "" {
id := parts[0]
if len(id) > traceID128bitsWidth {
return ctx, empty, errInvalidTraceIDLength
}
// padding when length is less than 32
if len(id) < traceID128bitsWidth {
padCharCount := traceID128bitsWidth - len(id)
id = strings.Repeat(idPaddingChar, padCharCount) + id
}
scc.TraceID, err = trace.TraceIDFromHex(id)
if err != nil {
return ctx, empty, errMalformedTraceID
}
}

// extract span ID
if parts[1] != "" {
id := parts[1]
if len(id) > spanIDWidth {
return ctx, empty, errInvalidSpanIDLength
}
// padding when length is less than 16
if len(id) < spanIDWidth {
padCharCount := spanIDWidth - len(id)
id = strings.Repeat(idPaddingChar, padCharCount) + id
}
scc.SpanID, err = trace.SpanIDFromHex(id)
if err != nil {
return ctx, empty, errMalformedSpanID
}
}

// skip third part as it is deprecated

// extract flag
if parts[3] != "" {
flagStr := parts[3]
flag, err := strconv.ParseInt(flagStr, 16, 64)
if err != nil {
return ctx, empty, errMalformedFlag
}
if flag&flagsSampled == flagsSampled {
// if sample bit is set, we check if debug bit is also set
if flag&flagsDebug == flagsDebug {
scc.TraceFlags |= trace.FlagsSampled
ctx = withDebug(ctx, true)
} else {
scc.TraceFlags |= trace.FlagsSampled
}
}
// ignore other bit, including firehose since we don't have corresponding flag in trace context.
}
return ctx, trace.NewSpanContext(scc), nil
}

// Fields returns the Jaeger header key whose value is set with Inject.
func (jaeger Jaeger) Fields() []string {
return []string{jaegerHeader}
}

type jaegerKeyType int

const (
debugKey jaegerKeyType = iota
)

// withDebug returns a copy of parent with debug set as the debug flag value .
func withDebug(parent context.Context, debug bool) context.Context {
return context.WithValue(parent, debugKey, debug)
}

// debugFromContext returns the debug value stored in ctx.
//
// If no debug value is stored in ctx false is returned.
func debugFromContext(ctx context.Context) bool {
if ctx == nil {
return false
}
if debug, ok := ctx.Value(debugKey).(bool); ok {
return debug
}
return false
}
28 changes: 9 additions & 19 deletions pkg/core/xtrace/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,49 +29,39 @@ import (
func SetGlobalTracer(tp trace.TracerProvider) {
xlog.Jupiter().Info("set global tracer", xlog.FieldMod("trace"))

propagator := propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, Jaeger{})

// be compatible with opentracing
bridge, wrapperTracerProvider := otelOpentracing.NewTracerPair(tp.Tracer(""))
bridge.SetTextMapPropagator(propagator)
opentracing.SetGlobalTracer(bridge)
otel.SetTracerProvider(wrapperTracerProvider)
}

type options struct {
propagator propagation.TextMapPropagator
otel.SetTextMapPropagator(propagator)
otel.SetTracerProvider(wrapperTracerProvider)
}

// Option is tracing option.
type Option func(*options)

// Tracer is otel span tracer
type Tracer struct {
tracer trace.Tracer
kind trace.SpanKind
opt *options
}

// NewTracer create tracer instance
func NewTracer(kind trace.SpanKind, opts ...Option) *Tracer {
op := options{
propagator: propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}),
//propagator: propagation.NewCompositeTextMapPropagator(Metadata{}, propagation.Baggage{}, propagation.TraceContext{}),
}
for _, o := range opts {
o(&op)
}
return &Tracer{tracer: otel.Tracer("jupiter"), kind: kind, opt: &op}
func NewTracer(kind trace.SpanKind) *Tracer {
return &Tracer{tracer: otel.Tracer("jupiter"), kind: kind}
}

// Start start tracing span
func (t *Tracer) Start(ctx context.Context, operation string, carrier propagation.TextMapCarrier, opts ...trace.SpanStartOption) (context.Context, trace.Span) {
if (t.kind == trace.SpanKindServer || t.kind == trace.SpanKindConsumer) && carrier != nil {
ctx = t.opt.propagator.Extract(ctx, carrier)
ctx = otel.GetTextMapPropagator().Extract(ctx, carrier)
}
opts = append(opts, trace.WithSpanKind(t.kind))

ctx, span := t.tracer.Start(ctx, operation, opts...)

if (t.kind == trace.SpanKindClient || t.kind == trace.SpanKindProducer) && carrier != nil {
t.opt.propagator.Inject(ctx, carrier)
otel.GetTextMapPropagator().Inject(ctx, carrier)
}
return ctx, span
}
15 changes: 12 additions & 3 deletions pkg/server/xecho/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/douyu/jupiter/pkg/xlog"
"github.com/labstack/echo/v4"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/propagation"
semconv "go.opentelemetry.io/otel/semconv/v1.12.0"
"go.opentelemetry.io/otel/trace"
Expand All @@ -42,8 +43,8 @@ func extractAID(c echo.Context) string {
func recoverMiddleware(slowQueryThresholdInMilli int64) echo.MiddlewareFunc {
return func(next echo.HandlerFunc) echo.HandlerFunc {
return func(ctx echo.Context) (err error) {
var beg = time.Now()
var fields = make([]xlog.Field, 0, 8)
beg := time.Now()
fields := make([]xlog.Field, 0, 8)

defer func() {
logger := xlog.J(ctx.Request().Context())
Expand Down Expand Up @@ -112,7 +113,15 @@ func traceServerInterceptor() echo.MiddlewareFunc {
ctx = xlog.NewContext(ctx, xlog.Jupiter(), span.SpanContext().TraceID().String())

c.SetRequest(c.Request().WithContext(ctx))
defer span.End()
defer func() {
if err != nil {
span.SetStatus(codes.Error, err.Error())
span.RecordError(err)
}

span.End()
}()

return next(c)
}
}
Expand Down
15 changes: 7 additions & 8 deletions pkg/server/xgrpc/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,9 @@ func extractAID(ctx context.Context) string {

func defaultStreamServerInterceptor(logger *xlog.Logger, c *Config) grpc.StreamServerInterceptor {
return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (err error) {
var beg = time.Now()
var fields = make([]xlog.Field, 0, 8)
var event = "normal"
beg := time.Now()
fields := make([]xlog.Field, 0, 8)
event := "normal"
defer func() {
if c.SlowQueryThresholdInMilli > 0 {
if int64(time.Since(beg))/1e6 > c.SlowQueryThresholdInMilli {
Expand Down Expand Up @@ -205,9 +205,9 @@ func defaultStreamServerInterceptor(logger *xlog.Logger, c *Config) grpc.StreamS

func defaultUnaryServerInterceptor(logger *xlog.Logger, c *Config) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
var beg = time.Now()
var fields = make([]xlog.Field, 0, 8)
var event = "normal"
beg := time.Now()
fields := make([]xlog.Field, 0, 8)
event := "normal"
defer func() {
if c.SlowQueryThresholdInMilli > 0 {
if int64(time.Since(beg))/1e6 > c.SlowQueryThresholdInMilli {
Expand Down Expand Up @@ -266,7 +266,7 @@ func getClientIP(ctx context.Context) (string, error) {
}

func getPeer(ctx context.Context) map[string]string {
var peerMeta = make(map[string]string)
peerMeta := make(map[string]string)
if md, ok := metadata.FromIncomingContext(ctx); ok {
if val, ok := md["aid"]; ok {
peerMeta["aid"] = strings.Join(val, ";")
Expand All @@ -286,7 +286,6 @@ func getPeer(ctx context.Context) map[string]string {
}
}
return peerMeta

}

func NewSentinelUnaryServerInterceptor() grpc.UnaryServerInterceptor {
Expand Down
Loading

0 comments on commit bf199f9

Please sign in to comment.