diff --git a/internal/pkg/instrumentation/bpf/database/sql/probe.go b/internal/pkg/instrumentation/bpf/database/sql/probe.go index 8c25111de..bd94ab8a7 100644 --- a/internal/pkg/instrumentation/bpf/database/sql/probe.go +++ b/internal/pkg/instrumentation/bpf/database/sql/probe.go @@ -53,7 +53,7 @@ func New(logger *slog.Logger, version string) probe.Probe { Val: shouldIncludeDBStatement(), }, }, - Uprobes: []probe.Uprobe{ + Uprobes: []*probe.Uprobe{ { Sym: "database/sql.(*DB).queryDC", EntryProbe: "uprobe_queryDC", diff --git a/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/consumer/probe.go b/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/consumer/probe.go index d67e62717..dfed8bb54 100644 --- a/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/consumer/probe.go +++ b/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/consumer/probe.go @@ -69,7 +69,7 @@ func New(logger *slog.Logger, version string) probe.Probe { Val: structfield.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "ReaderConfig", "GroupID"), }, }, - Uprobes: []probe.Uprobe{ + Uprobes: []*probe.Uprobe{ { Sym: "github.com/segmentio/kafka-go.(*Reader).FetchMessage", EntryProbe: "uprobe_FetchMessage", diff --git a/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/producer/probe.go b/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/producer/probe.go index 8f2a4903a..7d69def0a 100644 --- a/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/producer/probe.go +++ b/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/producer/probe.go @@ -57,7 +57,7 @@ func New(logger *slog.Logger, version string) probe.Probe { Val: structfield.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Message", "Time"), }, }, - Uprobes: []probe.Uprobe{ + Uprobes: []*probe.Uprobe{ { Sym: "github.com/segmentio/kafka-go.(*Writer).WriteMessages", EntryProbe: "uprobe_WriteMessages", diff --git a/internal/pkg/instrumentation/bpf/go.opentelemetry.io/auto/sdk/probe.go b/internal/pkg/instrumentation/bpf/go.opentelemetry.io/auto/sdk/probe.go index 7c963c30f..c356558f0 100644 --- a/internal/pkg/instrumentation/bpf/go.opentelemetry.io/auto/sdk/probe.go +++ b/internal/pkg/instrumentation/bpf/go.opentelemetry.io/auto/sdk/probe.go @@ -60,7 +60,7 @@ func New(logger *slog.Logger) probe.Probe { ), }, }, - Uprobes: []probe.Uprobe{ + Uprobes: []*probe.Uprobe{ { Sym: "go.opentelemetry.io/auto/sdk.(*tracer).start", EntryProbe: "uprobe_Tracer_start", @@ -86,14 +86,14 @@ type converter struct { logger *slog.Logger } -func (c *converter) decodeEvent(record perf.Record) (event, error) { +func (c *converter) decodeEvent(record perf.Record) (*event, error) { reader := bytes.NewReader(record.RawSample) var e event err := binary.Read(reader, binary.LittleEndian, &e.Size) if err != nil { c.logger.Error("failed to decode size", "error", err) - return event{}, err + return nil, err } c.logger.Debug("decoded size", "size", e.Size) @@ -101,10 +101,10 @@ func (c *converter) decodeEvent(record perf.Record) (event, error) { _, err = reader.Read(e.SpanData) if err != nil { c.logger.Error("failed to read span data", "error", err) - return event{}, err + return nil, err } c.logger.Debug("decoded span data", "size", e.Size) - return e, nil + return &e, nil } func (c *converter) processFn(e *event) ptrace.ScopeSpans { diff --git a/internal/pkg/instrumentation/bpf/go.opentelemetry.io/otel/traceglobal/bpf/probe.bpf.c b/internal/pkg/instrumentation/bpf/go.opentelemetry.io/otel/traceglobal/bpf/probe.bpf.c index 0d719521f..3f7d07d43 100644 --- a/internal/pkg/instrumentation/bpf/go.opentelemetry.io/otel/traceglobal/bpf/probe.bpf.c +++ b/internal/pkg/instrumentation/bpf/go.opentelemetry.io/otel/traceglobal/bpf/probe.bpf.c @@ -45,7 +45,12 @@ typedef struct tracer_id { char schema_url[MAX_TRACER_SCHEMA_URL_LEN]; } tracer_id_t; +struct control_t { + u32 kind; // Required to be 1. +}; + struct otel_span_t { + u32 kind; // Required to be 0. BASE_SPAN_PROPERTIES struct span_name_t span_name; otel_status_t status; @@ -415,7 +420,10 @@ int uprobe_newStart(struct pt_regs *ctx) { } wrote_flag = true; - return 0; + + // Signal this uprobe should be unloaded. + struct control_t ctrl = {1}; + return bpf_perf_event_output(ctx, &events, BPF_F_CURRENT_CPU, (void *)(&ctrl), sizeof(struct control_t)); } // This instrumentation attaches uprobe to the following function: diff --git a/internal/pkg/instrumentation/bpf/go.opentelemetry.io/otel/traceglobal/bpf_arm64_bpfel.go b/internal/pkg/instrumentation/bpf/go.opentelemetry.io/otel/traceglobal/bpf_arm64_bpfel.go index 84da4b64b..fb0cf0b0d 100644 --- a/internal/pkg/instrumentation/bpf/go.opentelemetry.io/otel/traceglobal/bpf_arm64_bpfel.go +++ b/internal/pkg/instrumentation/bpf/go.opentelemetry.io/otel/traceglobal/bpf_arm64_bpfel.go @@ -13,6 +13,8 @@ import ( ) type bpfOtelSpanT struct { + Kind uint32 + _ [4]byte StartTime uint64 EndTime uint64 Sc bpfSpanContext diff --git a/internal/pkg/instrumentation/bpf/go.opentelemetry.io/otel/traceglobal/bpf_x86_bpfel.go b/internal/pkg/instrumentation/bpf/go.opentelemetry.io/otel/traceglobal/bpf_x86_bpfel.go index 7a14c80cf..32a3f0149 100644 --- a/internal/pkg/instrumentation/bpf/go.opentelemetry.io/otel/traceglobal/bpf_x86_bpfel.go +++ b/internal/pkg/instrumentation/bpf/go.opentelemetry.io/otel/traceglobal/bpf_x86_bpfel.go @@ -13,6 +13,8 @@ import ( ) type bpfOtelSpanT struct { + Kind uint32 + _ [4]byte StartTime uint64 EndTime uint64 Sc bpfSpanContext diff --git a/internal/pkg/instrumentation/bpf/go.opentelemetry.io/otel/traceglobal/probe.go b/internal/pkg/instrumentation/bpf/go.opentelemetry.io/otel/traceglobal/probe.go index 7f1fb783a..98b8c2803 100644 --- a/internal/pkg/instrumentation/bpf/go.opentelemetry.io/otel/traceglobal/probe.go +++ b/internal/pkg/instrumentation/bpf/go.opentelemetry.io/otel/traceglobal/probe.go @@ -4,10 +4,12 @@ package global import ( + "bytes" "encoding/binary" "fmt" "log/slog" "math" + "sync" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" @@ -18,6 +20,7 @@ import ( "go.opentelemetry.io/auto/internal/pkg/process" "go.opentelemetry.io/auto/internal/pkg/structfield" + "github.com/cilium/ebpf/perf" "github.com/hashicorp/go-version" "golang.org/x/sys/unix" @@ -41,6 +44,17 @@ func New(logger *slog.Logger) probe.Probe { SpanKind: trace.SpanKindClient, InstrumentedPkg: pkg, } + + uprobeNewStart := &probe.Uprobe{ + Sym: "go.opentelemetry.io/otel/internal/global.(*tracer).newSpan", + EntryProbe: "uprobe_newStart", + FailureMode: probe.FailureModeWarn, + } + + c := &converter{ + logger: logger, + uprobeNewStart: uprobeNewStart, + } return &probe.TraceProducer[bpfObjects, event]{ Base: probe.Base[bpfObjects, event]{ ID: id, @@ -107,12 +121,8 @@ func New(logger *slog.Logger) probe.Probe { tracerIDContainsSchemaURL{}, tracerIDContainsScopeAttributes{}, }, - Uprobes: []probe.Uprobe{ - { - Sym: "go.opentelemetry.io/otel/internal/global.(*tracer).newSpan", - EntryProbe: "uprobe_newStart", - FailureMode: probe.FailureModeWarn, - }, + Uprobes: []*probe.Uprobe{ + uprobeNewStart, { Sym: "go.opentelemetry.io/otel/internal/global.(*tracer).Start", EntryProbe: "uprobe_Start", @@ -138,12 +148,55 @@ func New(logger *slog.Logger) probe.Probe { FailureMode: probe.FailureModeIgnore, }, }, - SpecFn: loadBpf, + SpecFn: loadBpf, + ProcessRecord: c.decodeEvent, }, ProcessFn: processFn, } } +type recordKind uint32 + +const ( + recordKindTelemetry recordKind = iota + recordKindConrol +) + +type converter struct { + logger *slog.Logger + + uprobeNewStart *probe.Uprobe + uprobeNewStartMu sync.Mutex +} + +func (c *converter) decodeEvent(record perf.Record) (*event, error) { + reader := bytes.NewReader(record.RawSample) + + var kind recordKind + err := binary.Read(reader, binary.LittleEndian, &kind) + if err != nil { + return nil, err + } + + var e *event + switch kind { + case recordKindTelemetry: + e := new(event) + reader.Reset(record.RawSample) + err = binary.Read(reader, binary.LittleEndian, e) + case recordKindConrol: + c.uprobeNewStartMu.Lock() + if c.uprobeNewStart != nil { + err = c.uprobeNewStart.Close() + c.uprobeNewStart = nil + } + c.uprobeNewStartMu.Unlock() + default: + err = fmt.Errorf("unknown record kind: %d", kind) + } + return e, err +} + // tracerIDContainsSchemaURL is a Probe Const defining whether the tracer key contains schemaURL. type tracerIDContainsSchemaURL struct{} diff --git a/internal/pkg/instrumentation/bpf/google.golang.org/grpc/client/probe.go b/internal/pkg/instrumentation/bpf/google.golang.org/grpc/client/probe.go index 680f30eb2..864b93bc5 100644 --- a/internal/pkg/instrumentation/bpf/google.golang.org/grpc/client/probe.go +++ b/internal/pkg/instrumentation/bpf/google.golang.org/grpc/client/probe.go @@ -103,7 +103,7 @@ func New(logger *slog.Logger, version string) probe.Probe { MinVersion: writeStatusMinVersion, }, }, - Uprobes: []probe.Uprobe{ + Uprobes: []*probe.Uprobe{ { Sym: "google.golang.org/grpc.(*ClientConn).Invoke", EntryProbe: "uprobe_ClientConn_Invoke", diff --git a/internal/pkg/instrumentation/bpf/google.golang.org/grpc/server/probe.go b/internal/pkg/instrumentation/bpf/google.golang.org/grpc/server/probe.go index fb568e628..30be91916 100644 --- a/internal/pkg/instrumentation/bpf/google.golang.org/grpc/server/probe.go +++ b/internal/pkg/instrumentation/bpf/google.golang.org/grpc/server/probe.go @@ -126,7 +126,7 @@ func New(logger *slog.Logger, ver string) probe.Probe { }, framePosConst{}, }, - Uprobes: []probe.Uprobe{ + Uprobes: []*probe.Uprobe{ { Sym: "google.golang.org/grpc.(*Server).handleStream", EntryProbe: "uprobe_server_handleStream", diff --git a/internal/pkg/instrumentation/bpf/net/http/client/probe.go b/internal/pkg/instrumentation/bpf/net/http/client/probe.go index c96eb8c32..e0715c6ec 100644 --- a/internal/pkg/instrumentation/bpf/net/http/client/probe.go +++ b/internal/pkg/instrumentation/bpf/net/http/client/probe.go @@ -40,7 +40,7 @@ func New(logger *slog.Logger, version string) probe.Probe { InstrumentedPkg: pkg, } - uprobes := []probe.Uprobe{ + uprobes := []*probe.Uprobe{ { Sym: "net/http.(*Transport).roundTrip", EntryProbe: "uprobe_Transport_roundTrip", @@ -52,7 +52,7 @@ func New(logger *slog.Logger, version string) probe.Probe { // probe which writes the data in the outgoing buffer. if utils.SupportsContextPropagation() { uprobes = append(uprobes, - probe.Uprobe{ + &probe.Uprobe{ Sym: "net/http.Header.writeSubset", EntryProbe: "uprobe_writeSubset", // We mark this probe as dependent on roundTrip, so we don't accidentally diff --git a/internal/pkg/instrumentation/bpf/net/http/server/probe.go b/internal/pkg/instrumentation/bpf/net/http/server/probe.go index f2b574e11..d885b26a9 100644 --- a/internal/pkg/instrumentation/bpf/net/http/server/probe.go +++ b/internal/pkg/instrumentation/bpf/net/http/server/probe.go @@ -103,7 +103,7 @@ func New(logger *slog.Logger, version string) probe.Probe { }, patternPathSupportedConst{}, }, - Uprobes: []probe.Uprobe{ + Uprobes: []*probe.Uprobe{ { Sym: "net/http.serverHandler.ServeHTTP", EntryProbe: "uprobe_serverHandler_ServeHTTP", diff --git a/internal/pkg/instrumentation/probe/probe.go b/internal/pkg/instrumentation/probe/probe.go index 6bd9daa7f..66b0b4536 100644 --- a/internal/pkg/instrumentation/probe/probe.go +++ b/internal/pkg/instrumentation/probe/probe.go @@ -12,6 +12,7 @@ import ( "io" "log/slog" "os" + "sync/atomic" "github.com/cilium/ebpf" "github.com/cilium/ebpf/link" @@ -64,7 +65,7 @@ type Base[BPFObj any, BPFEvent any] struct { Consts []Const // Uprobes is a the collection of eBPF programs that need to be attached to // the target process. - Uprobes []Uprobe + Uprobes []*Uprobe // SpecFn is a creation function for an eBPF CollectionSpec related to the // probe. @@ -72,7 +73,7 @@ type Base[BPFObj any, BPFEvent any] struct { // ProcessRecord is an optional processing function for the probe. If nil, // all records will be read directly into a new BPFEvent using the // encoding/binary package. - ProcessRecord func(perf.Record) (BPFEvent, error) + ProcessRecord func(perf.Record) (*BPFEvent, error) reader *perf.Reader collection *ebpf.Collection @@ -198,7 +199,7 @@ func (i *Base[BPFObj, BPFEvent]) loadUprobes(exec *link.Executable, td *process. continue } - links, err := up.load(exec, td, i.collection) + err := up.load(exec, td, i.collection) if err != nil { var logFn func(string, ...any) switch up.FailureMode { @@ -213,9 +214,7 @@ func (i *Base[BPFObj, BPFEvent]) loadUprobes(exec *link.Executable, td *process. logFn("failed to load uprobe", "probe", i.ID, "symbol", up.Sym, "error", err) continue } - for _, l := range links { - i.closers = append(i.closers, l) - } + i.closers = append(i.closers, up) } return nil } @@ -268,18 +267,19 @@ func (i *Base[BPFObj, BPFEvent]) read() (*BPFEvent, error) { return nil, err } - var event BPFEvent + var event *BPFEvent if i.ProcessRecord != nil { event, err = i.ProcessRecord(record) } else { + event = new(BPFEvent) buf := bytes.NewReader(record.RawSample) - err = binary.Read(buf, binary.LittleEndian, &event) + err = binary.Read(buf, binary.LittleEndian, event) } if err != nil { return nil, err } - return &event, nil + return event, nil } // Close stops the Probe. @@ -315,6 +315,9 @@ func (i *SpanProducer[BPFObj, BPFEvent]) Run(handle func(ptrace.ScopeSpans)) { } continue } + if event == nil { + continue + } ss := ptrace.NewScopeSpans() @@ -344,6 +347,9 @@ func (i *TraceProducer[BPFObj, BPFEvent]) Run(handle func(ptrace.ScopeSpans)) { } continue } + if event == nil { + continue + } handle(i.ProcessFn(event)) } @@ -365,50 +371,76 @@ type Uprobe struct { // function specified by Sym. If ReturnProbe is empty, no eBPF program will be attached to the return of the function. ReturnProbe string DependsOn []string + + closers atomic.Pointer[[]io.Closer] } -func (u *Uprobe) load(exec *link.Executable, target *process.TargetDetails, c *ebpf.Collection) ([]link.Link, error) { +func (u *Uprobe) load(exec *link.Executable, target *process.TargetDetails, c *ebpf.Collection) error { offset, err := target.GetFunctionOffset(u.Sym) if err != nil { - return nil, err + return err } - var links []link.Link + var closers []io.Closer if u.EntryProbe != "" { entryProg, ok := c.Programs[u.EntryProbe] if !ok { - return nil, fmt.Errorf("entry probe %s not found", u.EntryProbe) + return fmt.Errorf("entry probe %s not found", u.EntryProbe) } opts := &link.UprobeOptions{Address: offset, PID: target.PID} l, err := exec.Uprobe("", entryProg, opts) if err != nil { - return nil, err + return err } - links = append(links, l) + closers = append(closers, l) } if u.ReturnProbe != "" { retProg, ok := c.Programs[u.ReturnProbe] if !ok { - return nil, fmt.Errorf("return probe %s not found", u.ReturnProbe) + return fmt.Errorf("return probe %s not found", u.ReturnProbe) } retOffsets, err := target.GetFunctionReturns(u.Sym) if err != nil { - return nil, err + return err } for _, ret := range retOffsets { opts := &link.UprobeOptions{Address: ret, PID: target.PID} l, err := exec.Uprobe("", retProg, opts) if err != nil { - return nil, err + return err } - links = append(links, l) + closers = append(closers, l) } } - return links, nil + old := u.closers.Swap(&closers) + if old != nil { + // load called twice without calling Close. Try and handle gracefully. + var err error + for _, closer := range *old { + err = errors.Join(err, closer.Close()) + } + return err + } + + return nil +} + +func (u *Uprobe) Close() error { + closersPtr := u.closers.Swap(nil) + if closersPtr == nil { + // No closers. + return nil + } + + var err error + for _, closer := range *closersPtr { + err = errors.Join(err, closer.Close()) + } + return err } // Const is an constant that needs to be injected into an eBPF program.