diff --git a/CHANGELOG.md b/CHANGELOG.md index 0b1a598a6..ad97f6b1b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ OpenTelemetry Go Automatic Instrumentation adheres to [Semantic Versioning](http ### Added - Update `httpPlusdb` demo with adding `OTEL_GO_AUTO_PARSE_DB_STATEMENT` env variable ([#1523](https://github.com/open-telemetry/opentelemetry-go-instrumentation/pull/1523)) - Support `SELECT`, `INSERT`, `UPDATE`, and `DELETE` for database span names and `db.operation.name` attribute. ([#1253](https://github.com/open-telemetry/opentelemetry-go-instrumentation/pull/1253)) +- Support the full tracing API with the `otelglobal` probe. ([#1405](https://github.com/open-telemetry/opentelemetry-go-instrumentation/pull/1405)) - Support `go.opentelemetry.io/otel@v1.33.0`. ([#1417](https://github.com/open-telemetry/opentelemetry-go-instrumentation/pull/1417)) - Support `google.golang.org/grpc` `1.69.0`. ([#1417](https://github.com/open-telemetry/opentelemetry-go-instrumentation/pull/1417)) - Update `google.golang.org/grpc` probe to work with versions `>= 1.69.0`. ([#1431](https://github.com/open-telemetry/opentelemetry-go-instrumentation/pull/1431)) diff --git a/internal/pkg/instrumentation/bpf/database/sql/probe.go b/internal/pkg/instrumentation/bpf/database/sql/probe.go index 8c25111de..bd94ab8a7 100644 --- a/internal/pkg/instrumentation/bpf/database/sql/probe.go +++ b/internal/pkg/instrumentation/bpf/database/sql/probe.go @@ -53,7 +53,7 @@ func New(logger *slog.Logger, version string) probe.Probe { Val: shouldIncludeDBStatement(), }, }, - Uprobes: []probe.Uprobe{ + Uprobes: []*probe.Uprobe{ { Sym: "database/sql.(*DB).queryDC", EntryProbe: "uprobe_queryDC", diff --git a/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/consumer/probe.go b/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/consumer/probe.go index d67e62717..dfed8bb54 100644 --- a/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/consumer/probe.go +++ b/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/consumer/probe.go @@ -69,7 +69,7 @@ func New(logger *slog.Logger, version string) probe.Probe { Val: structfield.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "ReaderConfig", "GroupID"), }, }, - Uprobes: []probe.Uprobe{ + Uprobes: []*probe.Uprobe{ { Sym: "github.com/segmentio/kafka-go.(*Reader).FetchMessage", EntryProbe: "uprobe_FetchMessage", diff --git a/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/producer/probe.go b/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/producer/probe.go index 8f2a4903a..7d69def0a 100644 --- a/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/producer/probe.go +++ b/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/producer/probe.go @@ -57,7 +57,7 @@ func New(logger *slog.Logger, version string) probe.Probe { Val: structfield.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Message", "Time"), }, }, - Uprobes: []probe.Uprobe{ + Uprobes: []*probe.Uprobe{ { Sym: "github.com/segmentio/kafka-go.(*Writer).WriteMessages", EntryProbe: "uprobe_WriteMessages", diff --git a/internal/pkg/instrumentation/bpf/go.opentelemetry.io/auto/sdk/probe.go b/internal/pkg/instrumentation/bpf/go.opentelemetry.io/auto/sdk/probe.go index 7c963c30f..c356558f0 100644 --- a/internal/pkg/instrumentation/bpf/go.opentelemetry.io/auto/sdk/probe.go +++ b/internal/pkg/instrumentation/bpf/go.opentelemetry.io/auto/sdk/probe.go @@ -60,7 +60,7 @@ func New(logger *slog.Logger) probe.Probe { ), }, }, - Uprobes: []probe.Uprobe{ + Uprobes: []*probe.Uprobe{ { Sym: "go.opentelemetry.io/auto/sdk.(*tracer).start", EntryProbe: "uprobe_Tracer_start", @@ -86,14 +86,14 @@ type converter struct { logger *slog.Logger } -func (c *converter) decodeEvent(record perf.Record) (event, error) { +func (c *converter) decodeEvent(record perf.Record) (*event, error) { reader := bytes.NewReader(record.RawSample) var e event err := binary.Read(reader, binary.LittleEndian, &e.Size) if err != nil { c.logger.Error("failed to decode size", "error", err) - return event{}, err + return nil, err } c.logger.Debug("decoded size", "size", e.Size) @@ -101,10 +101,10 @@ func (c *converter) decodeEvent(record perf.Record) (event, error) { _, err = reader.Read(e.SpanData) if err != nil { c.logger.Error("failed to read span data", "error", err) - return event{}, err + return nil, err } c.logger.Debug("decoded span data", "size", e.Size) - return e, nil + return &e, nil } func (c *converter) processFn(e *event) ptrace.ScopeSpans { diff --git a/internal/pkg/instrumentation/bpf/go.opentelemetry.io/otel/traceglobal/bpf/probe.bpf.c b/internal/pkg/instrumentation/bpf/go.opentelemetry.io/otel/traceglobal/bpf/probe.bpf.c index 5b0178f98..3f7d07d43 100644 --- a/internal/pkg/instrumentation/bpf/go.opentelemetry.io/otel/traceglobal/bpf/probe.bpf.c +++ b/internal/pkg/instrumentation/bpf/go.opentelemetry.io/otel/traceglobal/bpf/probe.bpf.c @@ -23,6 +23,9 @@ char __license[] SEC("license") = "Dual MIT/GPL"; #define MAX_BUCKETS 8 #define MAX_TRACERS 64 +// Records state of our write to auto-instrumentation flag. +bool wrote_flag = false; + struct span_description_t { char buf[MAX_STATUS_DESCRIPTION_LEN]; }; @@ -42,7 +45,12 @@ typedef struct tracer_id { char schema_url[MAX_TRACER_SCHEMA_URL_LEN]; } tracer_id_t; +struct control_t { + u32 kind; // Required to be 1. +}; + struct otel_span_t { + u32 kind; // Required to be 0. BASE_SPAN_PROPERTIES struct span_name_t span_name; otel_status_t status; @@ -388,6 +396,36 @@ static __always_inline long fill_tracer_id(tracer_id_t *tracer_id, go_tracer_ptr return 0; } +// This instrumentation attaches uprobe to the following function: +// func (t *tracer) newSpan(ctx context.Context, autoSpan *bool, name string, opts []trace.SpanStartOption) (context.Context, trace.Span) { +// https://github.com/open-telemetry/opentelemetry-go/blob/ac386f383cdfc14f546b4e55e8726a0a45e8a409/internal/global/trace.go#L161 +SEC("uprobe/newSpan") +int uprobe_newStart(struct pt_regs *ctx) { + if (wrote_flag) { + // Already wrote flag value. + return 0; + } + + void *flag_ptr = get_argument(ctx, 4); + if (flag_ptr == NULL) { + bpf_printk("invalid flag_ptr: NULL"); + return -1; + } + + bool true_value = true; + long res = bpf_probe_write_user(flag_ptr, &true_value, sizeof(bool)); + if (res != 0) { + bpf_printk("failed to write bool flag value: %ld", res); + return -2; + } + + wrote_flag = true; + + // Signal this uprobe should be unloaded. + struct control_t ctrl = {1}; + return bpf_perf_event_output(ctx, &events, BPF_F_CURRENT_CPU, (void *)(&ctrl), sizeof(struct control_t)); +} + // This instrumentation attaches uprobe to the following function: // func (t *tracer) Start(ctx context.Context, name string, opts ...trace.SpanStartOption) (context.Context, trace.Span) // https://github.com/open-telemetry/opentelemetry-go/blob/98b32a6c3a87fbee5d34c063b9096f416b250897/internal/global/trace.go#L149 diff --git a/internal/pkg/instrumentation/bpf/go.opentelemetry.io/otel/traceglobal/bpf_arm64_bpfel.go b/internal/pkg/instrumentation/bpf/go.opentelemetry.io/otel/traceglobal/bpf_arm64_bpfel.go index 56e54b800..fb0cf0b0d 100644 --- a/internal/pkg/instrumentation/bpf/go.opentelemetry.io/otel/traceglobal/bpf_arm64_bpfel.go +++ b/internal/pkg/instrumentation/bpf/go.opentelemetry.io/otel/traceglobal/bpf_arm64_bpfel.go @@ -13,6 +13,8 @@ import ( ) type bpfOtelSpanT struct { + Kind uint32 + _ [4]byte StartTime uint64 EndTime uint64 Sc bpfSpanContext @@ -101,6 +103,7 @@ type bpfProgramSpecs struct { UprobeSetStatus *ebpf.ProgramSpec `ebpf:"uprobe_SetStatus"` UprobeStart *ebpf.ProgramSpec `ebpf:"uprobe_Start"` UprobeStartReturns *ebpf.ProgramSpec `ebpf:"uprobe_Start_Returns"` + UprobeNewStart *ebpf.ProgramSpec `ebpf:"uprobe_newStart"` } // bpfMapSpecs contains maps before they are loaded into the kernel. @@ -148,6 +151,7 @@ type bpfVariableSpecs struct { TracerNamePos *ebpf.VariableSpec `ebpf:"tracer_name_pos"` TracerProviderPos *ebpf.VariableSpec `ebpf:"tracer_provider_pos"` TracerProviderTracersPos *ebpf.VariableSpec `ebpf:"tracer_provider_tracers_pos"` + WroteFlag *ebpf.VariableSpec `ebpf:"wrote_flag"` } // bpfObjects contains all objects after they have been loaded into the kernel. @@ -230,6 +234,7 @@ type bpfVariables struct { TracerNamePos *ebpf.Variable `ebpf:"tracer_name_pos"` TracerProviderPos *ebpf.Variable `ebpf:"tracer_provider_pos"` TracerProviderTracersPos *ebpf.Variable `ebpf:"tracer_provider_tracers_pos"` + WroteFlag *ebpf.Variable `ebpf:"wrote_flag"` } // bpfPrograms contains all programs after they have been loaded into the kernel. @@ -242,6 +247,7 @@ type bpfPrograms struct { UprobeSetStatus *ebpf.Program `ebpf:"uprobe_SetStatus"` UprobeStart *ebpf.Program `ebpf:"uprobe_Start"` UprobeStartReturns *ebpf.Program `ebpf:"uprobe_Start_Returns"` + UprobeNewStart *ebpf.Program `ebpf:"uprobe_newStart"` } func (p *bpfPrograms) Close() error { @@ -252,6 +258,7 @@ func (p *bpfPrograms) Close() error { p.UprobeSetStatus, p.UprobeStart, p.UprobeStartReturns, + p.UprobeNewStart, ) } diff --git a/internal/pkg/instrumentation/bpf/go.opentelemetry.io/otel/traceglobal/bpf_x86_bpfel.go b/internal/pkg/instrumentation/bpf/go.opentelemetry.io/otel/traceglobal/bpf_x86_bpfel.go index 1703d7e42..32a3f0149 100644 --- a/internal/pkg/instrumentation/bpf/go.opentelemetry.io/otel/traceglobal/bpf_x86_bpfel.go +++ b/internal/pkg/instrumentation/bpf/go.opentelemetry.io/otel/traceglobal/bpf_x86_bpfel.go @@ -13,6 +13,8 @@ import ( ) type bpfOtelSpanT struct { + Kind uint32 + _ [4]byte StartTime uint64 EndTime uint64 Sc bpfSpanContext @@ -101,6 +103,7 @@ type bpfProgramSpecs struct { UprobeSetStatus *ebpf.ProgramSpec `ebpf:"uprobe_SetStatus"` UprobeStart *ebpf.ProgramSpec `ebpf:"uprobe_Start"` UprobeStartReturns *ebpf.ProgramSpec `ebpf:"uprobe_Start_Returns"` + UprobeNewStart *ebpf.ProgramSpec `ebpf:"uprobe_newStart"` } // bpfMapSpecs contains maps before they are loaded into the kernel. @@ -148,6 +151,7 @@ type bpfVariableSpecs struct { TracerNamePos *ebpf.VariableSpec `ebpf:"tracer_name_pos"` TracerProviderPos *ebpf.VariableSpec `ebpf:"tracer_provider_pos"` TracerProviderTracersPos *ebpf.VariableSpec `ebpf:"tracer_provider_tracers_pos"` + WroteFlag *ebpf.VariableSpec `ebpf:"wrote_flag"` } // bpfObjects contains all objects after they have been loaded into the kernel. @@ -230,6 +234,7 @@ type bpfVariables struct { TracerNamePos *ebpf.Variable `ebpf:"tracer_name_pos"` TracerProviderPos *ebpf.Variable `ebpf:"tracer_provider_pos"` TracerProviderTracersPos *ebpf.Variable `ebpf:"tracer_provider_tracers_pos"` + WroteFlag *ebpf.Variable `ebpf:"wrote_flag"` } // bpfPrograms contains all programs after they have been loaded into the kernel. @@ -242,6 +247,7 @@ type bpfPrograms struct { UprobeSetStatus *ebpf.Program `ebpf:"uprobe_SetStatus"` UprobeStart *ebpf.Program `ebpf:"uprobe_Start"` UprobeStartReturns *ebpf.Program `ebpf:"uprobe_Start_Returns"` + UprobeNewStart *ebpf.Program `ebpf:"uprobe_newStart"` } func (p *bpfPrograms) Close() error { @@ -252,6 +258,7 @@ func (p *bpfPrograms) Close() error { p.UprobeSetStatus, p.UprobeStart, p.UprobeStartReturns, + p.UprobeNewStart, ) } diff --git a/internal/pkg/instrumentation/bpf/go.opentelemetry.io/otel/traceglobal/probe.go b/internal/pkg/instrumentation/bpf/go.opentelemetry.io/otel/traceglobal/probe.go index 9d07429a5..e8da60e6a 100644 --- a/internal/pkg/instrumentation/bpf/go.opentelemetry.io/otel/traceglobal/probe.go +++ b/internal/pkg/instrumentation/bpf/go.opentelemetry.io/otel/traceglobal/probe.go @@ -4,6 +4,7 @@ package global import ( + "bytes" "encoding/binary" "fmt" "log/slog" @@ -18,6 +19,7 @@ import ( "go.opentelemetry.io/auto/internal/pkg/process" "go.opentelemetry.io/auto/internal/pkg/structfield" + "github.com/cilium/ebpf/perf" "github.com/hashicorp/go-version" "golang.org/x/sys/unix" @@ -33,6 +35,27 @@ import ( const ( // pkg is the package being instrumented. pkg = "go.opentelemetry.io/otel/internal/global" + + // Minimum version of go.opentelemetry.io/otel that supports using the + // go.opentelemetry.io/auto/sdk in the global API. + minAutoSDK = "1.33.0" +) + +var ( + otelWithAutoSDK = probe.PackageConstrainst{ + Package: "go.opentelemetry.io/otel", + Constraints: version.MustConstraints( + version.NewConstraint(fmt.Sprintf(">= %s", minAutoSDK)), + ), + FailureMode: probe.FailureModeIgnore, + } + otelWithoutAutoSDK = probe.PackageConstrainst{ + Package: "go.opentelemetry.io/otel", + Constraints: version.MustConstraints( + version.NewConstraint(fmt.Sprintf("< %s", minAutoSDK)), + ), + FailureMode: probe.FailureModeIgnore, + } ) // New returns a new [probe.Probe]. @@ -41,6 +64,19 @@ func New(logger *slog.Logger) probe.Probe { SpanKind: trace.SpanKindClient, InstrumentedPkg: pkg, } + + uprobeNewStart := &probe.Uprobe{ + Sym: "go.opentelemetry.io/otel/internal/global.(*tracer).newSpan", + EntryProbe: "uprobe_newStart", + PackageConstrainsts: []probe.PackageConstrainst{ + otelWithAutoSDK, + }, + } + + c := &converter{ + logger: logger, + uprobeNewStart: uprobeNewStart, + } return &probe.TraceProducer[bpfObjects, event]{ Base: probe.Base[bpfObjects, event]{ ID: id, @@ -107,38 +143,94 @@ func New(logger *slog.Logger) probe.Probe { tracerIDContainsSchemaURL{}, tracerIDContainsScopeAttributes{}, }, - Uprobes: []probe.Uprobe{ + Uprobes: []*probe.Uprobe{ + uprobeNewStart, { Sym: "go.opentelemetry.io/otel/internal/global.(*tracer).Start", EntryProbe: "uprobe_Start", ReturnProbe: "uprobe_Start_Returns", + PackageConstrainsts: []probe.PackageConstrainst{ + otelWithoutAutoSDK, + }, }, { Sym: "go.opentelemetry.io/otel/internal/global.(*nonRecordingSpan).End", EntryProbe: "uprobe_End", + PackageConstrainsts: []probe.PackageConstrainst{ + otelWithoutAutoSDK, + }, }, { Sym: "go.opentelemetry.io/otel/internal/global.(*nonRecordingSpan).SetAttributes", EntryProbe: "uprobe_SetAttributes", FailureMode: probe.FailureModeIgnore, + PackageConstrainsts: []probe.PackageConstrainst{ + otelWithoutAutoSDK, + }, }, { Sym: "go.opentelemetry.io/otel/internal/global.(*nonRecordingSpan).SetStatus", EntryProbe: "uprobe_SetStatus", FailureMode: probe.FailureModeIgnore, + PackageConstrainsts: []probe.PackageConstrainst{ + otelWithoutAutoSDK, + }, }, { Sym: "go.opentelemetry.io/otel/internal/global.(*nonRecordingSpan).SetName", EntryProbe: "uprobe_SetName", FailureMode: probe.FailureModeIgnore, + PackageConstrainsts: []probe.PackageConstrainst{ + otelWithoutAutoSDK, + }, }, }, - SpecFn: loadBpf, + SpecFn: loadBpf, + ProcessRecord: c.decodeEvent, }, ProcessFn: processFn, } } +type recordKind uint32 + +const ( + recordKindTelemetry recordKind = iota + recordKindConrol +) + +type converter struct { + logger *slog.Logger + + uprobeNewStart *probe.Uprobe +} + +func (c *converter) decodeEvent(record perf.Record) (*event, error) { + reader := bytes.NewReader(record.RawSample) + + var kind recordKind + err := binary.Read(reader, binary.LittleEndian, &kind) + if err != nil { + return nil, err + } + + var e *event + switch kind { + case recordKindTelemetry: + e = new(event) + reader.Reset(record.RawSample) + err = binary.Read(reader, binary.LittleEndian, e) + case recordKindConrol: + if c.uprobeNewStart != nil { + err = c.uprobeNewStart.Close() + c.uprobeNewStart = nil + } + default: + err = fmt.Errorf("unknown record kind: %d", kind) + } + return e, err +} + // tracerIDContainsSchemaURL is a Probe Const defining whether the tracer key contains schemaURL. type tracerIDContainsSchemaURL struct{} diff --git a/internal/pkg/instrumentation/bpf/google.golang.org/grpc/client/probe.go b/internal/pkg/instrumentation/bpf/google.golang.org/grpc/client/probe.go index 90bbeb7bb..6dfe547b6 100644 --- a/internal/pkg/instrumentation/bpf/google.golang.org/grpc/client/probe.go +++ b/internal/pkg/instrumentation/bpf/google.golang.org/grpc/client/probe.go @@ -110,7 +110,7 @@ func New(logger *slog.Logger, version string) probe.Probe { MinVersion: writeStatusMinVersion, }, }, - Uprobes: []probe.Uprobe{ + Uprobes: []*probe.Uprobe{ { Sym: "google.golang.org/grpc.(*ClientConn).Invoke", EntryProbe: "uprobe_ClientConn_Invoke", diff --git a/internal/pkg/instrumentation/bpf/google.golang.org/grpc/server/probe.go b/internal/pkg/instrumentation/bpf/google.golang.org/grpc/server/probe.go index fb568e628..30be91916 100644 --- a/internal/pkg/instrumentation/bpf/google.golang.org/grpc/server/probe.go +++ b/internal/pkg/instrumentation/bpf/google.golang.org/grpc/server/probe.go @@ -126,7 +126,7 @@ func New(logger *slog.Logger, ver string) probe.Probe { }, framePosConst{}, }, - Uprobes: []probe.Uprobe{ + Uprobes: []*probe.Uprobe{ { Sym: "google.golang.org/grpc.(*Server).handleStream", EntryProbe: "uprobe_server_handleStream", diff --git a/internal/pkg/instrumentation/bpf/net/http/client/probe.go b/internal/pkg/instrumentation/bpf/net/http/client/probe.go index c96eb8c32..e0715c6ec 100644 --- a/internal/pkg/instrumentation/bpf/net/http/client/probe.go +++ b/internal/pkg/instrumentation/bpf/net/http/client/probe.go @@ -40,7 +40,7 @@ func New(logger *slog.Logger, version string) probe.Probe { InstrumentedPkg: pkg, } - uprobes := []probe.Uprobe{ + uprobes := []*probe.Uprobe{ { Sym: "net/http.(*Transport).roundTrip", EntryProbe: "uprobe_Transport_roundTrip", @@ -52,7 +52,7 @@ func New(logger *slog.Logger, version string) probe.Probe { // probe which writes the data in the outgoing buffer. if utils.SupportsContextPropagation() { uprobes = append(uprobes, - probe.Uprobe{ + &probe.Uprobe{ Sym: "net/http.Header.writeSubset", EntryProbe: "uprobe_writeSubset", // We mark this probe as dependent on roundTrip, so we don't accidentally diff --git a/internal/pkg/instrumentation/bpf/net/http/server/probe.go b/internal/pkg/instrumentation/bpf/net/http/server/probe.go index f2b574e11..d885b26a9 100644 --- a/internal/pkg/instrumentation/bpf/net/http/server/probe.go +++ b/internal/pkg/instrumentation/bpf/net/http/server/probe.go @@ -103,7 +103,7 @@ func New(logger *slog.Logger, version string) probe.Probe { }, patternPathSupportedConst{}, }, - Uprobes: []probe.Uprobe{ + Uprobes: []*probe.Uprobe{ { Sym: "net/http.serverHandler.ServeHTTP", EntryProbe: "uprobe_serverHandler_ServeHTTP", diff --git a/internal/pkg/instrumentation/probe/probe.go b/internal/pkg/instrumentation/probe/probe.go index 6bd9daa7f..66b0b4536 100644 --- a/internal/pkg/instrumentation/probe/probe.go +++ b/internal/pkg/instrumentation/probe/probe.go @@ -12,6 +12,7 @@ import ( "io" "log/slog" "os" + "sync/atomic" "github.com/cilium/ebpf" "github.com/cilium/ebpf/link" @@ -64,7 +65,7 @@ type Base[BPFObj any, BPFEvent any] struct { Consts []Const // Uprobes is a the collection of eBPF programs that need to be attached to // the target process. - Uprobes []Uprobe + Uprobes []*Uprobe // SpecFn is a creation function for an eBPF CollectionSpec related to the // probe. @@ -72,7 +73,7 @@ type Base[BPFObj any, BPFEvent any] struct { // ProcessRecord is an optional processing function for the probe. If nil, // all records will be read directly into a new BPFEvent using the // encoding/binary package. - ProcessRecord func(perf.Record) (BPFEvent, error) + ProcessRecord func(perf.Record) (*BPFEvent, error) reader *perf.Reader collection *ebpf.Collection @@ -198,7 +199,7 @@ func (i *Base[BPFObj, BPFEvent]) loadUprobes(exec *link.Executable, td *process. continue } - links, err := up.load(exec, td, i.collection) + err := up.load(exec, td, i.collection) if err != nil { var logFn func(string, ...any) switch up.FailureMode { @@ -213,9 +214,7 @@ func (i *Base[BPFObj, BPFEvent]) loadUprobes(exec *link.Executable, td *process. logFn("failed to load uprobe", "probe", i.ID, "symbol", up.Sym, "error", err) continue } - for _, l := range links { - i.closers = append(i.closers, l) - } + i.closers = append(i.closers, up) } return nil } @@ -268,18 +267,19 @@ func (i *Base[BPFObj, BPFEvent]) read() (*BPFEvent, error) { return nil, err } - var event BPFEvent + var event *BPFEvent if i.ProcessRecord != nil { event, err = i.ProcessRecord(record) } else { + event = new(BPFEvent) buf := bytes.NewReader(record.RawSample) - err = binary.Read(buf, binary.LittleEndian, &event) + err = binary.Read(buf, binary.LittleEndian, event) } if err != nil { return nil, err } - return &event, nil + return event, nil } // Close stops the Probe. @@ -315,6 +315,9 @@ func (i *SpanProducer[BPFObj, BPFEvent]) Run(handle func(ptrace.ScopeSpans)) { } continue } + if event == nil { + continue + } ss := ptrace.NewScopeSpans() @@ -344,6 +347,9 @@ func (i *TraceProducer[BPFObj, BPFEvent]) Run(handle func(ptrace.ScopeSpans)) { } continue } + if event == nil { + continue + } handle(i.ProcessFn(event)) } @@ -365,50 +371,76 @@ type Uprobe struct { // function specified by Sym. If ReturnProbe is empty, no eBPF program will be attached to the return of the function. ReturnProbe string DependsOn []string + + closers atomic.Pointer[[]io.Closer] } -func (u *Uprobe) load(exec *link.Executable, target *process.TargetDetails, c *ebpf.Collection) ([]link.Link, error) { +func (u *Uprobe) load(exec *link.Executable, target *process.TargetDetails, c *ebpf.Collection) error { offset, err := target.GetFunctionOffset(u.Sym) if err != nil { - return nil, err + return err } - var links []link.Link + var closers []io.Closer if u.EntryProbe != "" { entryProg, ok := c.Programs[u.EntryProbe] if !ok { - return nil, fmt.Errorf("entry probe %s not found", u.EntryProbe) + return fmt.Errorf("entry probe %s not found", u.EntryProbe) } opts := &link.UprobeOptions{Address: offset, PID: target.PID} l, err := exec.Uprobe("", entryProg, opts) if err != nil { - return nil, err + return err } - links = append(links, l) + closers = append(closers, l) } if u.ReturnProbe != "" { retProg, ok := c.Programs[u.ReturnProbe] if !ok { - return nil, fmt.Errorf("return probe %s not found", u.ReturnProbe) + return fmt.Errorf("return probe %s not found", u.ReturnProbe) } retOffsets, err := target.GetFunctionReturns(u.Sym) if err != nil { - return nil, err + return err } for _, ret := range retOffsets { opts := &link.UprobeOptions{Address: ret, PID: target.PID} l, err := exec.Uprobe("", retProg, opts) if err != nil { - return nil, err + return err } - links = append(links, l) + closers = append(closers, l) } } - return links, nil + old := u.closers.Swap(&closers) + if old != nil { + // load called twice without calling Close. Try and handle gracefully. + var err error + for _, closer := range *old { + err = errors.Join(err, closer.Close()) + } + return err + } + + return nil +} + +func (u *Uprobe) Close() error { + closersPtr := u.closers.Swap(nil) + if closersPtr == nil { + // No closers. + return nil + } + + var err error + for _, closer := range *closersPtr { + err = errors.Join(err, closer.Close()) + } + return err } // Const is an constant that needs to be injected into an eBPF program. diff --git a/internal/test/e2e/grpc/traces.json b/internal/test/e2e/grpc/traces.json index 82bdb6e71..aa906ac6a 100644 --- a/internal/test/e2e/grpc/traces.json +++ b/internal/test/e2e/grpc/traces.json @@ -304,7 +304,7 @@ } ], "flags": 256, - "kind": 3, + "kind": 1, "name": "SayHello", "parentSpanId": "xxxxx", "spanId": "xxxxx", @@ -321,7 +321,7 @@ } ], "flags": 256, - "kind": 3, + "kind": 1, "name": "SayHello", "parentSpanId": "xxxxx", "spanId": "xxxxx", diff --git a/internal/test/e2e/otelglobal/main.go b/internal/test/e2e/otelglobal/main.go index c151005cc..5e0732b7a 100644 --- a/internal/test/e2e/otelglobal/main.go +++ b/internal/test/e2e/otelglobal/main.go @@ -5,6 +5,7 @@ package main import ( "context" + "errors" "fmt" "time" @@ -14,10 +15,16 @@ import ( "go.opentelemetry.io/otel/trace" ) +const ( + name = "trace-example" + version = "v1.23.42" + schema = "https://some_schema" +) + var tracer = otel.Tracer( - "trace-example", - trace.WithInstrumentationVersion("v1.23.42"), - trace.WithSchemaURL("https://some_schema"), + name, + trace.WithInstrumentationVersion(version), + trace.WithSchemaURL(schema), ) func setUnusedTracers() { @@ -27,17 +34,34 @@ func setUnusedTracers() { } func innerFunction(ctx context.Context) { - _, span := tracer.Start(ctx, "child") + t := trace.SpanFromContext(ctx).TracerProvider().Tracer( + name, + trace.WithInstrumentationVersion(version), + trace.WithSchemaURL(schema), + ) + + _, span := t.Start(ctx, "child") defer span.End() span.SetAttributes(attribute.String("inner.key", "inner.value")) span.SetAttributes(attribute.Bool("cat.on_keyboard", true)) span.SetName("child override") - span.SetStatus(codes.Error, "i deleted the prod db sry") + + err := errors.New("i deleted the prod db sry") + span.SetStatus(codes.Error, err.Error()) + span.RecordError(err) + + span.AddLink(trace.Link{ + SpanContext: trace.NewSpanContext(trace.SpanContextConfig{ + TraceID: trace.TraceID{0x2}, + SpanID: trace.SpanID{0x1}, + TraceFlags: trace.FlagsSampled, + }), + }) } func createMainSpan(ctx context.Context) { - ctx, span := tracer.Start(ctx, "parent") + ctx, span := tracer.Start(ctx, "parent", trace.WithSpanKind(trace.SpanKindServer)) defer span.End() innerFunction(ctx) diff --git a/internal/test/e2e/otelglobal/traces.json b/internal/test/e2e/otelglobal/traces.json index a72fcf9eb..b89f76975 100644 --- a/internal/test/e2e/otelglobal/traces.json +++ b/internal/test/e2e/otelglobal/traces.json @@ -71,8 +71,34 @@ } } ], + "events": [ + { + "attributes": [ + { + "key": "exception.type", + "value": { + "stringValue": "*errors.errorString" + } + }, + { + "key": "exception.message", + "value": { + "stringValue": "i deleted the prod db sry" + } + } + ], + "name": "exception" + } + ], "flags": 256, - "kind": 3, + "kind": 1, + "links": [ + { + "flags": 256, + "spanId": "xxxxx", + "traceId": "xxxxx" + } + ], "name": "child override", "parentSpanId": "xxxxx", "spanId": "xxxxx", @@ -110,7 +136,7 @@ } ], "flags": 256, - "kind": 3, + "kind": 2, "name": "parent", "parentSpanId": "", "spanId": "xxxxx",