From 33165497f9c419d01dd388b1d750e13564a49c6d Mon Sep 17 00:00:00 2001 From: Mike Dame Date: Tue, 19 Nov 2024 12:02:12 -0500 Subject: [PATCH 1/7] no-op: move consts to their own file --- internal/pkg/instrumentation/probe/consts.go | 106 +++++++++++++++++++ internal/pkg/instrumentation/probe/probe.go | 91 ---------------- 2 files changed, 106 insertions(+), 91 deletions(-) create mode 100644 internal/pkg/instrumentation/probe/consts.go diff --git a/internal/pkg/instrumentation/probe/consts.go b/internal/pkg/instrumentation/probe/consts.go new file mode 100644 index 000000000..b28d3784d --- /dev/null +++ b/internal/pkg/instrumentation/probe/consts.go @@ -0,0 +1,106 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// Package probe provides instrumentation probe types and definitions. +package probe + +import ( + "errors" + "fmt" + + "github.com/hashicorp/go-version" + + "go.opentelemetry.io/auto/internal/pkg/inject" + "go.opentelemetry.io/auto/internal/pkg/process" + "go.opentelemetry.io/auto/internal/pkg/structfield" +) + +// Const is an constant that needs to be injected into an eBPF program. +type Const interface { + // InjectOption returns the inject.Option to run for the Const when running + // inject.Constants. + InjectOption(td *process.TargetDetails) (inject.Option, error) +} + +// StructFieldConst is a [Const] for a struct field offset. These struct field +// ID needs to be known offsets in the [inject] package. +type StructFieldConst struct { + Key string + Val structfield.ID +} + +// InjectOption returns the appropriately configured [inject.WithOffset] if the +// version of the struct field module is known. If it is not, an error is +// returned. +func (c StructFieldConst) InjectOption(td *process.TargetDetails) (inject.Option, error) { + ver, ok := td.Libraries[c.Val.ModPath] + if !ok { + return nil, fmt.Errorf("unknown module version: %s", c.Val.ModPath) + } + return inject.WithOffset(c.Key, c.Val, ver), nil +} + +// StructFieldConstMinVersion is a [Const] for a struct field offset. These struct field +// ID needs to be known offsets in the [inject] package. The offset is only +// injected if the module version is greater than or equal to the MinVersion. +type StructFieldConstMinVersion struct { + StructField StructFieldConst + MinVersion *version.Version +} + +// InjectOption returns the appropriately configured [inject.WithOffset] if the +// version of the struct field module is known and is greater than or equal to +// the MinVersion. If the module version is not known, an error is returned. +// If the module version is known but is less than the MinVersion, no offset is +// injected. +func (c StructFieldConstMinVersion) InjectOption(td *process.TargetDetails) (inject.Option, error) { + sf := c.StructField + ver, ok := td.Libraries[sf.Val.ModPath] + if !ok { + return nil, fmt.Errorf("unknown module version: %s", sf.Val.ModPath) + } + + if !ver.GreaterThanOrEqual(c.MinVersion) { + return nil, nil + } + + return inject.WithOffset(sf.Key, sf.Val, ver), nil +} + +// AllocationConst is a [Const] for all the allocation details that need to be +// injected into an eBPF program. +type AllocationConst struct{} + +// InjectOption returns the appropriately configured +// [inject.WithAllocationDetails] if the [process.AllocationDetails] within td +// are not nil. An error is returned if [process.AllocationDetails] is nil. +func (c AllocationConst) InjectOption(td *process.TargetDetails) (inject.Option, error) { + if td.AllocationDetails == nil { + return nil, errors.New("no allocation details") + } + return inject.WithAllocationDetails(*td.AllocationDetails), nil +} + +// RegistersABIConst is a [Const] for the boolean flag informing an eBPF +// program if the Go space has registered ABI. +type RegistersABIConst struct{} + +// InjectOption returns the appropriately configured [inject.WithRegistersABI]. +func (c RegistersABIConst) InjectOption(td *process.TargetDetails) (inject.Option, error) { + return inject.WithRegistersABI(td.IsRegistersABI()), nil +} + +// KeyValConst is a [Const] for a generic key-value pair. +// +// This should not be used as a replacement for any of the other provided +// [Const] implementations. Those implementations may have added significance +// and should be used instead where applicable. +type KeyValConst struct { + Key string + Val interface{} +} + +// InjectOption returns the appropriately configured [inject.WithKeyValue]. +func (c KeyValConst) InjectOption(*process.TargetDetails) (inject.Option, error) { + return inject.WithKeyValue(c.Key, c.Val), nil +} diff --git a/internal/pkg/instrumentation/probe/probe.go b/internal/pkg/instrumentation/probe/probe.go index fcce6bb7f..6b7e663c0 100644 --- a/internal/pkg/instrumentation/probe/probe.go +++ b/internal/pkg/instrumentation/probe/probe.go @@ -16,7 +16,6 @@ import ( "github.com/cilium/ebpf" "github.com/cilium/ebpf/link" "github.com/cilium/ebpf/perf" - "github.com/hashicorp/go-version" "go.opentelemetry.io/collector/pdata/ptrace" @@ -370,93 +369,3 @@ func (u *Uprobe) load(exec *link.Executable, target *process.TargetDetails, c *e return links, nil } - -// Const is an constant that needs to be injected into an eBPF program. -type Const interface { - // InjectOption returns the inject.Option to run for the Const when running - // inject.Constants. - InjectOption(td *process.TargetDetails) (inject.Option, error) -} - -// StructFieldConst is a [Const] for a struct field offset. These struct field -// ID needs to be known offsets in the [inject] package. -type StructFieldConst struct { - Key string - Val structfield.ID -} - -// InjectOption returns the appropriately configured [inject.WithOffset] if the -// version of the struct field module is known. If it is not, an error is -// returned. -func (c StructFieldConst) InjectOption(td *process.TargetDetails) (inject.Option, error) { - ver, ok := td.Libraries[c.Val.ModPath] - if !ok { - return nil, fmt.Errorf("unknown module version: %s", c.Val.ModPath) - } - return inject.WithOffset(c.Key, c.Val, ver), nil -} - -// StructFieldConstMinVersion is a [Const] for a struct field offset. These struct field -// ID needs to be known offsets in the [inject] package. The offset is only -// injected if the module version is greater than or equal to the MinVersion. -type StructFieldConstMinVersion struct { - StructField StructFieldConst - MinVersion *version.Version -} - -// InjectOption returns the appropriately configured [inject.WithOffset] if the -// version of the struct field module is known and is greater than or equal to -// the MinVersion. If the module version is not known, an error is returned. -// If the module version is known but is less than the MinVersion, no offset is -// injected. -func (c StructFieldConstMinVersion) InjectOption(td *process.TargetDetails) (inject.Option, error) { - sf := c.StructField - ver, ok := td.Libraries[sf.Val.ModPath] - if !ok { - return nil, fmt.Errorf("unknown module version: %s", sf.Val.ModPath) - } - - if !ver.GreaterThanOrEqual(c.MinVersion) { - return nil, nil - } - - return inject.WithOffset(sf.Key, sf.Val, ver), nil -} - -// AllocationConst is a [Const] for all the allocation details that need to be -// injected into an eBPF program. -type AllocationConst struct{} - -// InjectOption returns the appropriately configured -// [inject.WithAllocationDetails] if the [process.AllocationDetails] within td -// are not nil. An error is returned if [process.AllocationDetails] is nil. -func (c AllocationConst) InjectOption(td *process.TargetDetails) (inject.Option, error) { - if td.AllocationDetails == nil { - return nil, errors.New("no allocation details") - } - return inject.WithAllocationDetails(*td.AllocationDetails), nil -} - -// RegistersABIConst is a [Const] for the boolean flag informing an eBPF -// program if the Go space has registered ABI. -type RegistersABIConst struct{} - -// InjectOption returns the appropriately configured [inject.WithRegistersABI]. -func (c RegistersABIConst) InjectOption(td *process.TargetDetails) (inject.Option, error) { - return inject.WithRegistersABI(td.IsRegistersABI()), nil -} - -// KeyValConst is a [Const] for a generic key-value pair. -// -// This should not be used as a replacement for any of the other provided -// [Const] implementations. Those implementations may have added significance -// and should be used instead where applicable. -type KeyValConst struct { - Key string - Val interface{} -} - -// InjectOption returns the appropriately configured [inject.WithKeyValue]. -func (c KeyValConst) InjectOption(*process.TargetDetails) (inject.Option, error) { - return inject.WithKeyValue(c.Key, c.Val), nil -} From 85b29e816740165d3c29a67b2cca0f981bc801a5 Mon Sep 17 00:00:00 2001 From: Mike Dame Date: Fri, 22 Nov 2024 15:50:01 -0500 Subject: [PATCH 2/7] Move Uprobe type to its own file --- internal/pkg/instrumentation/probe/uprobe.go | 75 ++++++++++++++++++++ 1 file changed, 75 insertions(+) create mode 100644 internal/pkg/instrumentation/probe/uprobe.go diff --git a/internal/pkg/instrumentation/probe/uprobe.go b/internal/pkg/instrumentation/probe/uprobe.go new file mode 100644 index 000000000..fc7dc3c25 --- /dev/null +++ b/internal/pkg/instrumentation/probe/uprobe.go @@ -0,0 +1,75 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// Package probe provides instrumentation probe types and definitions. +package probe + +import ( + "fmt" + + "github.com/cilium/ebpf" + "github.com/cilium/ebpf/link" + + "go.opentelemetry.io/auto/internal/pkg/process" +) + +// Uprobe is an eBPF program that is attached in the entry point and/or the return of a function. +type Uprobe struct { + // Sym is the symbol name of the function to attach the eBPF program to. + Sym string + // Optional is a boolean flag informing if the Uprobe is optional. If the + // Uprobe is optional and fails to attach, the error is logged and + // processing continues. + Optional bool + // EntryProbe is the name of the eBPF program to attach to the entry of the + // function specified by Sym. If EntryProbe is empty, no eBPF program will be attached to the entry of the function. + EntryProbe string + // ReturnProbe is the name of the eBPF program to attach to the return of the + // function specified by Sym. If ReturnProbe is empty, no eBPF program will be attached to the return of the function. + ReturnProbe string + DependsOn []string +} + +func (u *Uprobe) load(exec *link.Executable, target *process.TargetDetails, c *ebpf.Collection) ([]link.Link, error) { + offset, err := target.GetFunctionOffset(u.Sym) + if err != nil { + return nil, err + } + + var links []link.Link + + if u.EntryProbe != "" { + entryProg, ok := c.Programs[u.EntryProbe] + if !ok { + return nil, fmt.Errorf("entry probe %s not found", u.EntryProbe) + } + opts := &link.UprobeOptions{Address: offset, PID: target.PID} + l, err := exec.Uprobe("", entryProg, opts) + if err != nil { + return nil, err + } + links = append(links, l) + } + + if u.ReturnProbe != "" { + retProg, ok := c.Programs[u.ReturnProbe] + if !ok { + return nil, fmt.Errorf("return probe %s not found", u.ReturnProbe) + } + retOffsets, err := target.GetFunctionReturns(u.Sym) + if err != nil { + return nil, err + } + + for _, ret := range retOffsets { + opts := &link.UprobeOptions{Address: ret, PID: target.PID} + l, err := exec.Uprobe("", retProg, opts) + if err != nil { + return nil, err + } + links = append(links, l) + } + } + + return links, nil +} From 9de77e6018ab0dcd3b53b34e49547e5c0ddd26a0 Mon Sep 17 00:00:00 2001 From: Mike Dame Date: Fri, 22 Nov 2024 15:50:33 -0500 Subject: [PATCH 3/7] Delete old instrumentation/probe/probe.go --- internal/pkg/instrumentation/probe/probe.go | 371 -------------------- 1 file changed, 371 deletions(-) delete mode 100644 internal/pkg/instrumentation/probe/probe.go diff --git a/internal/pkg/instrumentation/probe/probe.go b/internal/pkg/instrumentation/probe/probe.go deleted file mode 100644 index 6b7e663c0..000000000 --- a/internal/pkg/instrumentation/probe/probe.go +++ /dev/null @@ -1,371 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -// Package probe provides instrumentation probe types and definitions. -package probe - -import ( - "bytes" - "encoding/binary" - "errors" - "fmt" - "io" - "log/slog" - "os" - - "github.com/cilium/ebpf" - "github.com/cilium/ebpf/link" - "github.com/cilium/ebpf/perf" - - "go.opentelemetry.io/collector/pdata/ptrace" - - "go.opentelemetry.io/auto/internal/pkg/inject" - "go.opentelemetry.io/auto/internal/pkg/instrumentation/bpffs" - "go.opentelemetry.io/auto/internal/pkg/instrumentation/probe/sampling" - "go.opentelemetry.io/auto/internal/pkg/instrumentation/utils" - "go.opentelemetry.io/auto/internal/pkg/process" - "go.opentelemetry.io/auto/internal/pkg/structfield" -) - -// Probe is the instrument used by instrumentation for a Go package to measure -// and report on the state of that packages operation. -type Probe interface { - // Manifest returns the Probe's instrumentation Manifest. This includes all - // the information about the package the Probe instruments. - Manifest() Manifest - - // Load loads all the eBPF programs and maps required by the Probe. - // It also attaches the eBPF programs to the target process. - // TODO: currently passing Sampler as an initial configuration - this will be - // updated to a more generic configuration in the future. - Load(*link.Executable, *process.TargetDetails, *sampling.Config) error - - // Run runs the events processing loop. - Run(func(ptrace.ScopeSpans)) - - // Close stops the Probe. - Close() error -} - -// Base is a base implementation of [Probe]. -// -// This type can be returned by instrumentation directly. Instrumentation can -// also wrap this implementation with their own type if they need to override -// default behavior. -type Base[BPFObj any, BPFEvent any] struct { - // ID is a unique identifier for the probe. - ID ID - // Logger is used to log operations and errors. - Logger *slog.Logger - - // Consts are the constants that need to be injected into the eBPF program - // that is run by this Probe. - Consts []Const - // Uprobes is a the collection of eBPF programs that need to be attached to - // the target process. - Uprobes []Uprobe - - // SpecFn is a creation function for an eBPF CollectionSpec related to the - // probe. - SpecFn func() (*ebpf.CollectionSpec, error) - // ProcessRecord is an optional processing function for the probe. If nil, - // all records will be read directly into a new BPFEvent using the - // encoding/binary package. - ProcessRecord func(perf.Record) (BPFEvent, error) - - reader *perf.Reader - collection *ebpf.Collection - closers []io.Closer - samplingManager *sampling.Manager -} - -const ( - // The default size of the perf buffer in pages. - // We will need to make this configurable in the future. - PerfBufferDefaultSizeInPages = 128 - // The default name of the eBPF map used to pass events from the eBPF program - // to userspace. - DefaultBufferMapName = "events" -) - -// Manifest returns the Probe's instrumentation Manifest. -func (i *Base[BPFObj, BPFEvent]) Manifest() Manifest { - var structFieldIDs []structfield.ID - for _, cnst := range i.Consts { - if sfc, ok := cnst.(StructFieldConst); ok { - structFieldIDs = append(structFieldIDs, sfc.Val) - } - } - - symbols := make([]FunctionSymbol, 0, len(i.Uprobes)) - for _, up := range i.Uprobes { - symbols = append(symbols, FunctionSymbol{Symbol: up.Sym, DependsOn: up.DependsOn}) - } - - return NewManifest(i.ID, structFieldIDs, symbols) -} - -func (i *Base[BPFObj, BPFEvent]) Spec() (*ebpf.CollectionSpec, error) { - return i.SpecFn() -} - -// Load loads all instrumentation offsets. -func (i *Base[BPFObj, BPFEvent]) Load(exec *link.Executable, td *process.TargetDetails, sampler *sampling.Config) error { - spec, err := i.SpecFn() - if err != nil { - return err - } - - err = i.InjectConsts(td, spec) - if err != nil { - return err - } - - i.collection, err = i.buildEBPFCollection(td, spec) - if err != nil { - return err - } - - err = i.loadUprobes(exec, td) - if err != nil { - return err - } - - err = i.initReader() - if err != nil { - return err - } - - i.samplingManager, err = sampling.NewSamplingManager(i.collection, sampler) - if err != nil { - return err - } - - i.closers = append(i.closers, i.reader) - - return nil -} - -func (i *Base[BPFObj, BPFEvent]) InjectConsts(td *process.TargetDetails, spec *ebpf.CollectionSpec) error { - var err error - var opts []inject.Option - for _, cnst := range i.Consts { - o, e := cnst.InjectOption(td) - err = errors.Join(err, e) - if e == nil && o != nil { - opts = append(opts, o) - } - } - if err != nil { - return err - } - - return inject.Constants(spec, opts...) -} - -func (i *Base[BPFObj, BPFEvent]) loadUprobes(exec *link.Executable, td *process.TargetDetails) error { - for _, up := range i.Uprobes { - links, err := up.load(exec, td, i.collection) - if err != nil { - if up.Optional { - i.Logger.Debug("failed to attach optional uprobe", "probe", i.ID, "symbol", up.Sym, "error", err) - continue - } - return err - } - for _, l := range links { - i.closers = append(i.closers, l) - } - } - return nil -} - -func (i *Base[BPFObj, BPFEvent]) initReader() error { - buf, ok := i.collection.Maps[DefaultBufferMapName] - if !ok { - return fmt.Errorf("%s map not found", DefaultBufferMapName) - } - var err error - i.reader, err = perf.NewReader(buf, PerfBufferDefaultSizeInPages*os.Getpagesize()) - if err != nil { - return err - } - i.closers = append(i.closers, i.reader) - return nil -} - -func (i *Base[BPFObj, BPFEvent]) buildEBPFCollection(td *process.TargetDetails, spec *ebpf.CollectionSpec) (*ebpf.Collection, error) { - obj := new(BPFObj) - if c, ok := ((interface{})(obj)).(io.Closer); ok { - i.closers = append(i.closers, c) - } - - sOpts := &ebpf.CollectionOptions{ - Maps: ebpf.MapOptions{ - PinPath: bpffs.PathForTargetApplication(td), - }, - } - c, err := utils.InitializeEBPFCollection(spec, sOpts) - if err != nil { - return nil, err - } - - return c, nil -} - -// read reads a new BPFEvent from the perf Reader. -func (i *Base[BPFObj, BPFEvent]) read() (*BPFEvent, error) { - record, err := i.reader.Read() - if err != nil { - if !errors.Is(err, perf.ErrClosed) { - i.Logger.Error("error reading from perf reader", "error", err) - } - return nil, err - } - - if record.LostSamples != 0 { - i.Logger.Debug("perf event ring buffer full", "dropped", record.LostSamples) - return nil, err - } - - var event BPFEvent - if i.ProcessRecord != nil { - event, err = i.ProcessRecord(record) - } else { - buf := bytes.NewReader(record.RawSample) - err = binary.Read(buf, binary.LittleEndian, &event) - } - - if err != nil { - return nil, err - } - return &event, nil -} - -// Close stops the Probe. -func (i *Base[BPFObj, BPFEvent]) Close() error { - if i.collection != nil { - i.collection.Close() - } - var err error - for _, c := range i.closers { - err = errors.Join(err, c.Close()) - } - if err == nil { - i.Logger.Debug("Closed", "Probe", i.ID) - } - return err -} - -type SpanProducer[BPFObj any, BPFEvent any] struct { - Base[BPFObj, BPFEvent] - - Version string - SchemaURL string - ProcessFn func(*BPFEvent) ptrace.SpanSlice -} - -// Run runs the events processing loop. -func (i *SpanProducer[BPFObj, BPFEvent]) Run(handle func(ptrace.ScopeSpans)) { - for { - event, err := i.read() - if err != nil { - if errors.Is(err, perf.ErrClosed) { - return - } - continue - } - - ss := ptrace.NewScopeSpans() - - ss.Scope().SetName("go.opentelemetry.io/auto/" + i.ID.InstrumentedPkg) - ss.Scope().SetVersion(i.Version) - ss.SetSchemaUrl(i.SchemaURL) - - i.ProcessFn(event).CopyTo(ss.Spans()) - - handle(ss) - } -} - -type TraceProducer[BPFObj any, BPFEvent any] struct { - Base[BPFObj, BPFEvent] - - ProcessFn func(*BPFEvent) ptrace.ScopeSpans -} - -// Run runs the events processing loop. -func (i *TraceProducer[BPFObj, BPFEvent]) Run(handle func(ptrace.ScopeSpans)) { - for { - event, err := i.read() - if err != nil { - if errors.Is(err, perf.ErrClosed) { - return - } - continue - } - - handle(i.ProcessFn(event)) - } -} - -// Uprobe is an eBPF program that is attached in the entry point and/or the return of a function. -type Uprobe struct { - // Sym is the symbol name of the function to attach the eBPF program to. - Sym string - // Optional is a boolean flag informing if the Uprobe is optional. If the - // Uprobe is optional and fails to attach, the error is logged and - // processing continues. - Optional bool - // EntryProbe is the name of the eBPF program to attach to the entry of the - // function specified by Sym. If EntryProbe is empty, no eBPF program will be attached to the entry of the function. - EntryProbe string - // ReturnProbe is the name of the eBPF program to attach to the return of the - // function specified by Sym. If ReturnProbe is empty, no eBPF program will be attached to the return of the function. - ReturnProbe string - DependsOn []string -} - -func (u *Uprobe) load(exec *link.Executable, target *process.TargetDetails, c *ebpf.Collection) ([]link.Link, error) { - offset, err := target.GetFunctionOffset(u.Sym) - if err != nil { - return nil, err - } - - var links []link.Link - - if u.EntryProbe != "" { - entryProg, ok := c.Programs[u.EntryProbe] - if !ok { - return nil, fmt.Errorf("entry probe %s not found", u.EntryProbe) - } - opts := &link.UprobeOptions{Address: offset, PID: target.PID} - l, err := exec.Uprobe("", entryProg, opts) - if err != nil { - return nil, err - } - links = append(links, l) - } - - if u.ReturnProbe != "" { - retProg, ok := c.Programs[u.ReturnProbe] - if !ok { - return nil, fmt.Errorf("return probe %s not found", u.ReturnProbe) - } - retOffsets, err := target.GetFunctionReturns(u.Sym) - if err != nil { - return nil, err - } - - for _, ret := range retOffsets { - opts := &link.UprobeOptions{Address: ret, PID: target.PID} - l, err := exec.Uprobe("", retProg, opts) - if err != nil { - return nil, err - } - links = append(links, l) - } - } - - return links, nil -} From 5ce6a733998c36370a9cc27d94adf30a5126dadc Mon Sep 17 00:00:00 2001 From: Mike Dame Date: Fri, 22 Nov 2024 16:11:22 -0500 Subject: [PATCH 4/7] Add new Probe interfaces --- .../pkg/instrumentation/probe/interfaces.go | 73 +++++++++++++++++++ 1 file changed, 73 insertions(+) create mode 100644 internal/pkg/instrumentation/probe/interfaces.go diff --git a/internal/pkg/instrumentation/probe/interfaces.go b/internal/pkg/instrumentation/probe/interfaces.go new file mode 100644 index 000000000..16bc03ca0 --- /dev/null +++ b/internal/pkg/instrumentation/probe/interfaces.go @@ -0,0 +1,73 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// Package probe provides instrumentation probe types and definitions. +package probe + +import ( + "log/slog" +) + +// BaseProbe is the most basic type of Probe. It must support configuration, +// loading and attaching eBPF programs, running, and closing. +// All Probes must implement this Base functionality. +type BaseProbe interface { + // ID is a unique identifier for the Probe. + // All types of Probes must provide a unique identifier for the framework. + ID() ID + + // GetLogger returns an *slog.Logger for this Probe. + GetLogger() *slog.Logger + + // Load loads the eBPF programs and maps required by the Probe into memory. + // The specific types of programs and maps are implemented by the Probe. + Load() error + + // Attach attaches the eBPF programs to trigger points in the process. + // The specific attachment points are implemented by the Probe. + Attach() error + + // ApplyConfig updates the Probe's current Config with the provided Config + // interface. It is up to the Probe to implement type conversion to any custom + // config formats it defines, to support options specific to the Probe. + ApplyConfig(Config) error +} + +// RunnableProbe is a Probe that Runs. +type RunnableProbe interface { + BaseProbe + + // Run starts the Probe. + Run() + + // Close stops the Probe. + Close() error +} + +// TracingProbe is a RunnableProbe meant specifically for trace telemetry. +type TracingProbe interface { + RunnableProbe + + // TraceConfig provides the TracingConfig for the Probe. + TraceConfig() *TracingConfig +} + +// GoLibraryTelemetryProbe is a RunnableProbe that is bound to a single library +// in a single target executable. +type GoLibraryTelemetryProbe interface { + RunnableProbe + + // Manifest returns the Probe's instrumentation Manifest. This includes all + // the information about any packages the Probe instruments. + Manifest() Manifest + + // TargetConfig returns the Probe's TargetExecutableConfig containing + // information about the process the Probe observes. + TargetConfig() *TargetExecutableConfig +} + +// Config represents the config for a Probe. +// There are currently no default options for Probes, however Probes may +// define their own config structs that the Probe must cast itself when +// implementing ApplyConfig(). +type Config interface{} From 2b8e5a71219d11081b0c891e85a647a63b2605a2 Mon Sep 17 00:00:00 2001 From: Mike Dame Date: Fri, 22 Nov 2024 16:11:51 -0500 Subject: [PATCH 5/7] Add default Probe implementations and convenience builders --- internal/pkg/instrumentation/probe/builder.go | 41 +++ internal/pkg/instrumentation/probe/types.go | 324 ++++++++++++++++++ 2 files changed, 365 insertions(+) create mode 100644 internal/pkg/instrumentation/probe/builder.go create mode 100644 internal/pkg/instrumentation/probe/types.go diff --git a/internal/pkg/instrumentation/probe/builder.go b/internal/pkg/instrumentation/probe/builder.go new file mode 100644 index 000000000..d0b337f75 --- /dev/null +++ b/internal/pkg/instrumentation/probe/builder.go @@ -0,0 +1,41 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// Package probe provides instrumentation probe types and definitions. +package probe + +// NewTargetSpanProducingProbe returns a fully instantiated TargetSpanProducingProbe. +func NewTargetSpanProducingProbe[BPFObj any, BPFEvent any]() *TargetSpanProducingProbe[BPFObj, BPFEvent] { + return &TargetSpanProducingProbe[BPFObj, BPFEvent]{ + TracingConfig: &TracingConfig{}, + TargetEventProducingProbe: NewTargetEventProducingProbe[BPFObj, BPFEvent](), + } +} + +// NewTargetTraceProducingProbe returns a fully instantiated TargetTraceProducingProbe. +func NewTargetTraceProducingProbe[BPFObj any, BPFEvent any]() *TargetTraceProducingProbe[BPFObj, BPFEvent] { + return &TargetTraceProducingProbe[BPFObj, BPFEvent]{ + TracingConfig: &TracingConfig{}, + TargetEventProducingProbe: NewTargetEventProducingProbe[BPFObj, BPFEvent](), + } +} + +// NewTargetEventProducingProbe returns a fully instantiated TargetEventProducingProbe. +func NewTargetEventProducingProbe[BPFObj any, BPFEvent any]() *TargetEventProducingProbe[BPFObj, BPFEvent] { + return &TargetEventProducingProbe[BPFObj, BPFEvent]{ + TargetExecutableProbe: NewTargetExecutableProbe[BPFObj](), + } +} + +// NewTargetExectuableProbe returns a fully instantiated TargetExecutableProbe. +func NewTargetExecutableProbe[BPFObj any]() *TargetExecutableProbe[BPFObj] { + return &TargetExecutableProbe[BPFObj]{ + TargetExecutableConfig: &TargetExecutableConfig{}, + BasicProbe: NewBasicProbe(), + } +} + +// NewBasicProbe returns a fully instantiated BasicProbe. +func NewBasicProbe() *BasicProbe { + return &BasicProbe{} +} diff --git a/internal/pkg/instrumentation/probe/types.go b/internal/pkg/instrumentation/probe/types.go new file mode 100644 index 000000000..b1e7c866a --- /dev/null +++ b/internal/pkg/instrumentation/probe/types.go @@ -0,0 +1,324 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// Package probe provides instrumentation probe types and definitions. +package probe + +import ( + "bytes" + "encoding/binary" + "errors" + "fmt" + "io" + "log/slog" + "os" + + "github.com/cilium/ebpf" + "github.com/cilium/ebpf/link" + "github.com/cilium/ebpf/perf" + + "go.opentelemetry.io/collector/pdata/ptrace" + + "go.opentelemetry.io/auto/internal/pkg/inject" + "go.opentelemetry.io/auto/internal/pkg/instrumentation/bpffs" + "go.opentelemetry.io/auto/internal/pkg/instrumentation/probe/sampling" + "go.opentelemetry.io/auto/internal/pkg/instrumentation/utils" + "go.opentelemetry.io/auto/internal/pkg/process" + "go.opentelemetry.io/auto/internal/pkg/structfield" +) + +// ErrInvalidConfig is intended to be used by a Probe's implementation of +// ApplyConfig() when the provided probe.Config interface does not convert to +// the Probe's custom config struct. For example, libraries may choose to ignore +// this error when batch-updating Probes with a value that doesn't apply to all Probes. +var ErrInvalidConfig = errors.New("invalid config type for Probe") + +const ( + // The default size of the perf buffer in pages. + // We will need to make this configurable in the future. + PerfBufferDefaultSizeInPages = 128 + // The default name of the eBPF map used to pass events from the eBPF program + // to userspace. + DefaultBufferMapName = "events" +) + +// TracingConfig provides base configuration options for trace-based Probes. +type TracingConfig struct { + SamplingConfig *sampling.Config +} + +// TargetExecutableConfig provides executable and target process details for applications. +type TargetExecutableConfig struct { + // Executable defines a target executable. + Executable *link.Executable + // TargetDetails defines target process information. + TargetDetails *process.TargetDetails +} + +// BasicProbe is a provided implementation of BaseProbe. It is the base configuration +// and identification layer for any Probe. +type BasicProbe struct { + // ProbeID is a unique identifier for the Probe. + ProbeID ID + // Logger is used to log operations and errors. + Logger *slog.Logger +} + +// ID returns the ID for this Probe. +func (b *BasicProbe) ID() ID { + return b.ProbeID +} + +// GetLogger returns the Logger for this Probe. +func (b *BasicProbe) GetLogger() *slog.Logger { + return b.Logger +} + +// TargetExecutableProbe is a provided implementation of GoLibraryTelemetryProbe. +type TargetExecutableProbe[BPFObj any] struct { + *BasicProbe + *TargetExecutableConfig + + // Consts are the constants that need to be injected into the eBPF program + // that is run by this Probe. Currently only used by TargetingProbes. + Consts []Const + + // SpecFn is a creation function for an eBPF CollectionSpec related to the + // probe. + SpecFn func() (*ebpf.CollectionSpec, error) + + closers []io.Closer + collection *ebpf.Collection + reader *perf.Reader + // Uprobes is a the collection of eBPF programs that need to be attached to + // the target process. + Uprobes []Uprobe +} + +func (p *TargetExecutableProbe[BPFObj]) TargetConfig() *TargetExecutableConfig { + return p.TargetExecutableConfig +} + +// Load loads the eBPF programs for this Probe into memory. +func (p *TargetExecutableProbe[BPFObj]) Load() error { + spec, err := p.SpecFn() + if err != nil { + return err + } + + // Inject Consts into the Probe's collection spec. + var opts []inject.Option + for _, cnst := range p.Consts { + o, e := cnst.InjectOption(p.TargetDetails) + err = errors.Join(err, e) + if e == nil && o != nil { + opts = append(opts, o) + } + } + if err != nil { + return err + } + err = inject.Constants(spec, opts...) + if err != nil { + return err + } + + // Set up closers for the Probe + obj := new(BPFObj) + if c, ok := ((interface{})(obj)).(io.Closer); ok { + p.closers = append(p.closers, c) + } + + // Initialize the eBPF collection for the Probe + sOpts := &ebpf.CollectionOptions{ + Maps: ebpf.MapOptions{ + PinPath: bpffs.PathForTargetApplication(p.TargetDetails), + }, + } + c, err := utils.InitializeEBPFCollection(spec, sOpts) + if err != nil { + return err + } + p.collection = c + + return nil +} + +// Attach attaches loaded eBPF programs to trigger points and initializes the Probe. +func (p *TargetExecutableProbe[BPFObj]) Attach() error { + // Attach Uprobes + for _, up := range p.Uprobes { + links, err := up.load(p.Executable, p.TargetDetails, p.collection) + if err != nil { + if up.Optional { + p.Logger.Debug("failed to attach optional uprobe", "probe", p.ID, "symbol", up.Sym, "error", err) + continue + } + return err + } + for _, l := range links { + p.closers = append(p.closers, l) + } + } + + // Initialize reader for the Probe + buf, ok := p.collection.Maps[DefaultBufferMapName] + if !ok { + return fmt.Errorf("%s map not found", DefaultBufferMapName) + } + var err error + p.reader, err = perf.NewReader(buf, PerfBufferDefaultSizeInPages*os.Getpagesize()) + if err != nil { + return err + } + p.closers = append(p.closers, p.reader) + + return nil +} + +// Manifest returns the Probe's instrumentation Manifest. +func (p *TargetExecutableProbe[BPFObj]) Manifest() Manifest { + var structFieldIDs []structfield.ID + for _, cnst := range p.Consts { + if sfc, ok := cnst.(StructFieldConst); ok { + structFieldIDs = append(structFieldIDs, sfc.Val) + } + } + + symbols := make([]FunctionSymbol, 0, len(p.Uprobes)) + for _, up := range p.Uprobes { + symbols = append(symbols, FunctionSymbol{Symbol: up.Sym, DependsOn: up.DependsOn}) + } + + return NewManifest(p.ProbeID, structFieldIDs, symbols) +} + +func (p *TargetExecutableProbe[BPFObj]) Close() error { + if p.collection != nil { + p.collection.Close() + } + var err error + for _, c := range p.closers { + err = errors.Join(err, c.Close()) + } + if err == nil { + p.Logger.Debug("Closed", "Probe", p.ID) + } + return err +} + +// TargetEventProducingProbe is a TargetExecutableProbe that reads and processes eBPF events. +type TargetEventProducingProbe[BPFObj any, BPFEvent any] struct { + *TargetExecutableProbe[BPFObj] + + // ProcessRecord is an optional processing function for the probe. If nil, + // all records will be read directly into a new BPFEvent using the + // encoding/binary package. + ProcessRecord func(perf.Record) (BPFEvent, error) +} + +// read reads a new BPFEvent from the perf Reader. +func (e *TargetEventProducingProbe[BPFObj, BPFEvent]) read() (*BPFEvent, error) { + record, err := e.reader.Read() + if err != nil { + if !errors.Is(err, perf.ErrClosed) { + e.Logger.Error("error reading from perf reader", "error", err) + } + return nil, err + } + + if record.LostSamples != 0 { + e.Logger.Debug("perf event ring buffer full", "dropped", record.LostSamples) + return nil, err + } + + var event BPFEvent + if e.ProcessRecord != nil { + event, err = e.ProcessRecord(record) + } else { + buf := bytes.NewReader(record.RawSample) + err = binary.Read(buf, binary.LittleEndian, &event) + } + + if err != nil { + return nil, err + } + return &event, nil +} + +// TargetSpanProducingProbe is a provided implementation of GoLibraryTelemetryProbe that +// processes and handles ptrace.ScopeSpans by emitting spans. +type TargetSpanProducingProbe[BPFObj any, BPFEvent any] struct { + *TargetEventProducingProbe[BPFObj, BPFEvent] + *TracingConfig + + Version string + SchemaURL string + ProcessFn func(*BPFEvent) ptrace.SpanSlice + + Handler func(ptrace.ScopeSpans) +} + +func (s *TargetSpanProducingProbe[BPFObj, BPFEvent]) TraceConfig() *TracingConfig { + return s.TracingConfig +} + +func (s *TargetSpanProducingProbe[BPFObj, BPFEvent]) Run() { + _, err := sampling.NewSamplingManager(s.collection, s.SamplingConfig) + if err != nil { + s.Logger.Error("unable to get new sampling manager", "err", err) + return + } + + for { + event, err := s.read() + if err != nil { + if errors.Is(err, perf.ErrClosed) { + return + } + continue + } + + ss := ptrace.NewScopeSpans() + + ss.Scope().SetName("go.opentelemetry.io/auto/" + s.ProbeID.InstrumentedPkg) + ss.Scope().SetVersion(s.Version) + ss.SetSchemaUrl(s.SchemaURL) + + s.ProcessFn(event).CopyTo(ss.Spans()) + + s.Handler(ss) + } +} + +// TargetTraceProducingProbe is a provided implementation of GoLibraryTelemetryProbe that +// processes and handles ptrace.ScopeSpans by emitting traces. +type TargetTraceProducingProbe[BPFObj any, BPFEvent any] struct { + *TargetEventProducingProbe[BPFObj, BPFEvent] + *TracingConfig + + ProcessFn func(*BPFEvent) ptrace.ScopeSpans + + Handler func(ptrace.ScopeSpans) +} + +// Run runs the events processing loop. +func (i *TargetTraceProducingProbe[BPFObj, BPFEvent]) Run() { + _, err := sampling.NewSamplingManager(i.collection, i.SamplingConfig) + if err != nil { + i.Logger.Error("unable to get new sampling manager", "err", err) + return + } + + for { + event, err := i.read() + if err != nil { + if errors.Is(err, perf.ErrClosed) { + return + } + continue + } + + i.Handler(i.ProcessFn(event)) + } +} From 0555c93944adcdfd6f33b92ea4016c40c6bd23fc Mon Sep 17 00:00:00 2001 From: Mike Dame Date: Fri, 22 Nov 2024 16:12:25 -0500 Subject: [PATCH 6/7] Update Manager with new extension points (using new interfaces) --- instrumentation.go | 32 ++- internal/pkg/instrumentation/manager.go | 208 +++++++++---------- internal/pkg/instrumentation/manager_test.go | 106 +++++++--- 3 files changed, 212 insertions(+), 134 deletions(-) diff --git a/instrumentation.go b/instrumentation.go index 827cb877a..7e76fce5b 100644 --- a/instrumentation.go +++ b/instrumentation.go @@ -23,7 +23,19 @@ import ( "go.opentelemetry.io/otel/sdk/trace" semconv "go.opentelemetry.io/otel/semconv/v1.26.0" + dbSql "go.opentelemetry.io/auto/internal/pkg/instrumentation/bpf/database/sql" + kafkaConsumer "go.opentelemetry.io/auto/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/consumer" + kafkaProducer "go.opentelemetry.io/auto/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/producer" + autosdk "go.opentelemetry.io/auto/internal/pkg/instrumentation/bpf/go.opentelemetry.io/auto/sdk" + otelTraceGlobal "go.opentelemetry.io/auto/internal/pkg/instrumentation/bpf/go.opentelemetry.io/otel/traceglobal" + grpcClient "go.opentelemetry.io/auto/internal/pkg/instrumentation/bpf/google.golang.org/grpc/client" + grpcServer "go.opentelemetry.io/auto/internal/pkg/instrumentation/bpf/google.golang.org/grpc/server" + httpClient "go.opentelemetry.io/auto/internal/pkg/instrumentation/bpf/net/http/client" + httpServer "go.opentelemetry.io/auto/internal/pkg/instrumentation/bpf/net/http/server" + "go.opentelemetry.io/auto/internal/pkg/instrumentation" + "go.opentelemetry.io/auto/internal/pkg/instrumentation/probe" + "go.opentelemetry.io/auto/internal/pkg/instrumentation/utils" "go.opentelemetry.io/auto/internal/pkg/opentelemetry" "go.opentelemetry.io/auto/internal/pkg/process" ) @@ -94,7 +106,23 @@ func NewInstrumentation(ctx context.Context, opts ...InstrumentationOption) (*In } cp := convertConfigProvider(c.cp) - mngr, err := instrumentation.NewManager(c.logger, ctrl, c.globalImpl, cp, Version()) + + // TODO: Probes should be passed by the end developer to NewInstrumentation() when they're public + probes := []probe.BaseProbe{ + grpcClient.New(c.logger, Version(), ctrl.Trace), + grpcServer.New(c.logger, Version(), ctrl.Trace), + httpServer.New(c.logger, Version(), ctrl.Trace), + httpClient.New(c.logger, Version(), ctrl.Trace, httpClient.Config{SupportsContextPropagation: utils.SupportsContextPropagation()}), + dbSql.New(c.logger, Version(), ctrl.Trace), + kafkaProducer.New(c.logger, Version(), ctrl.Trace), + kafkaConsumer.New(c.logger, Version(), ctrl.Trace), + autosdk.New(c.logger, ctrl.Trace), + } + if c.globalImpl { + probes = append(probes, otelTraceGlobal.New(c.logger, ctrl.Trace)) + } + + mngr, err := instrumentation.NewManager(c.logger, ctrl, probes, cp, Version()) if err != nil { return nil, err } @@ -117,7 +145,7 @@ func NewInstrumentation(ctx context.Context, opts ...InstrumentationOption) (*In "dependencies", td.Libraries, "total_functions_found", len(td.Functions), ) - mngr.FilterUnusedProbes(td) + mngr.FilterUnusedProbesForTarget(td) return &Instrumentation{ target: td, diff --git a/internal/pkg/instrumentation/manager.go b/internal/pkg/instrumentation/manager.go index 8fcf9e3f2..b43839fb9 100644 --- a/internal/pkg/instrumentation/manager.go +++ b/internal/pkg/instrumentation/manager.go @@ -15,15 +15,6 @@ import ( "go.opentelemetry.io/otel/trace" - dbSql "go.opentelemetry.io/auto/internal/pkg/instrumentation/bpf/database/sql" - kafkaConsumer "go.opentelemetry.io/auto/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/consumer" - kafkaProducer "go.opentelemetry.io/auto/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/producer" - autosdk "go.opentelemetry.io/auto/internal/pkg/instrumentation/bpf/go.opentelemetry.io/auto/sdk" - otelTraceGlobal "go.opentelemetry.io/auto/internal/pkg/instrumentation/bpf/go.opentelemetry.io/otel/traceglobal" - grpcClient "go.opentelemetry.io/auto/internal/pkg/instrumentation/bpf/google.golang.org/grpc/client" - grpcServer "go.opentelemetry.io/auto/internal/pkg/instrumentation/bpf/google.golang.org/grpc/server" - httpClient "go.opentelemetry.io/auto/internal/pkg/instrumentation/bpf/net/http/client" - httpServer "go.opentelemetry.io/auto/internal/pkg/instrumentation/bpf/net/http/server" "go.opentelemetry.io/auto/internal/pkg/instrumentation/bpffs" "go.opentelemetry.io/auto/internal/pkg/instrumentation/probe" "go.opentelemetry.io/auto/internal/pkg/opentelemetry" @@ -51,9 +42,8 @@ const ( type Manager struct { logger *slog.Logger version string - probes map[probe.ID]probe.Probe + probes map[probe.ID]probe.BaseProbe otelController *opentelemetry.Controller - globalImpl bool cp ConfigProvider exe *link.Executable td *process.TargetDetails @@ -63,22 +53,37 @@ type Manager struct { probeMu sync.Mutex state managerState stateMu sync.RWMutex + relevantFuncs map[string]interface{} } // NewManager returns a new [Manager]. -func NewManager(logger *slog.Logger, otelController *opentelemetry.Controller, globalImpl bool, cp ConfigProvider, version string) (*Manager, error) { +func NewManager(logger *slog.Logger, otelController *opentelemetry.Controller, probes []probe.BaseProbe, cp ConfigProvider, version string) (*Manager, error) { m := &Manager{ logger: logger, version: version, - probes: make(map[probe.ID]probe.Probe), + probes: make(map[probe.ID]probe.BaseProbe), otelController: otelController, - globalImpl: globalImpl, cp: cp, + relevantFuncs: make(map[string]interface{}), } - err := m.registerProbes() - if err != nil { - return nil, err + for _, p := range probes { + switch probeObj := p.(type) { + case probe.GoLibraryTelemetryProbe: + id := probeObj.Manifest().Id + if _, exists := m.probes[id]; exists { + return nil, fmt.Errorf("library %s registered twice, aborting", id) + } + + if err := m.validateProbeDependents(id, probeObj.Manifest().Symbols); err != nil { + return nil, err + } + + m.probes[id] = p + + default: + return nil, fmt.Errorf("unknown probe type") + } } return m, nil @@ -97,59 +102,52 @@ func (m *Manager) validateProbeDependents(id probe.ID, symbols []probe.FunctionS return fmt.Errorf("library %s has declared a dependent function %s for probe %s which does not exist, aborting", id, d, s.Symbol) } } - } - return nil -} - -func (m *Manager) registerProbe(p probe.Probe) error { - id := p.Manifest().Id - if _, exists := m.probes[id]; exists { - return fmt.Errorf("library %s registered twice, aborting", id) - } - - if err := m.validateProbeDependents(id, p.Manifest().Symbols); err != nil { - return err + // if no dependency issues, add the symbol to the manager's relevant funcs + m.relevantFuncs[s.Symbol] = nil } - m.probes[id] = p return nil } // GetRelevantFuncs returns the instrumented functions for all managed probes. func (m *Manager) GetRelevantFuncs() map[string]interface{} { - funcsMap := make(map[string]interface{}) - for _, i := range m.probes { - for _, s := range i.Manifest().Symbols { - funcsMap[s.Symbol] = nil - } - } - - return funcsMap + return m.relevantFuncs } -// FilterUnusedProbes filterers probes whose functions are already instrumented -// out of the Manager. -func (m *Manager) FilterUnusedProbes(target *process.TargetDetails) { +// FilterUnusedProbesForTarget filters probes whose functions are not instrumented +// out of the Manager, and updates instrumented probes with Target Details. +func (m *Manager) FilterUnusedProbesForTarget(target *process.TargetDetails) { existingFuncMap := make(map[string]interface{}) for _, f := range target.Functions { existingFuncMap[f.Name] = nil } for name, inst := range m.probes { - funcsFound := false - for _, s := range inst.Manifest().Symbols { - if len(s.DependsOn) == 0 { - if _, exists := existingFuncMap[s.Symbol]; exists { - funcsFound = true - break + switch p := inst.(type) { + case probe.GoLibraryTelemetryProbe: + // Filter Probe if unused in target process + funcsFound := false + for _, s := range p.Manifest().Symbols { + if len(s.DependsOn) == 0 { + if _, exists := existingFuncMap[s.Symbol]; exists { + funcsFound = true + break + } } } - } - if !funcsFound { - m.logger.Debug("no functions found for probe, removing", "name", name) - delete(m.probes, name) + if !funcsFound { + m.logger.Debug("no functions found for probe, removing", "name", name) + delete(m.probes, name) + continue + } + + // If Probe is used, pass target details to Probe + p.TargetConfig().TargetDetails = target + + default: + continue } } } @@ -200,34 +198,41 @@ func (m *Manager) applyConfig(c Config) error { } for id, p := range m.probes { - currentlyEnabled := isProbeEnabled(id, m.currentConfig) - newEnabled := isProbeEnabled(id, c) + if runnableProbe, ok := p.(probe.RunnableProbe); ok { + currentlyEnabled := isProbeEnabled(id, m.currentConfig) + newEnabled := isProbeEnabled(id, c) - if currentlyEnabled && !newEnabled { - m.logger.Info("Disabling probe", "id", id) - err = errors.Join(err, p.Close()) - continue - } + if currentlyEnabled && !newEnabled { + m.logger.Info("Disabling probe", "id", id) + err = errors.Join(err, runnableProbe.Close()) + continue + } - if !currentlyEnabled && newEnabled { - m.logger.Info("Enabling probe", "id", id) - err = errors.Join(err, p.Load(m.exe, m.td, c.SamplingConfig)) - if err == nil { - m.runProbe(p) + if !currentlyEnabled && newEnabled { + m.logger.Info("Enabling probe", "id", id) + + if tracingProbe, ok := p.(probe.TracingProbe); ok { + tracingProbe.TraceConfig().SamplingConfig = c.SamplingConfig + } + + err = errors.Join(err, runnableProbe.Load()) + if err != nil { + continue + } + err = errors.Join(err, runnableProbe.Attach()) + if err == nil { + m.runningProbesWG.Add(1) + go func(ap probe.RunnableProbe) { + defer m.runningProbesWG.Done() + ap.Run() + }(runnableProbe) + } + continue } - continue } } - return nil -} - -func (m *Manager) runProbe(p probe.Probe) { - m.runningProbesWG.Add(1) - go func(ap probe.Probe) { - defer m.runningProbesWG.Done() - ap.Run(m.otelController.Trace) - }(p) + return err } func (m *Manager) ConfigLoop(ctx context.Context) { @@ -289,7 +294,13 @@ func (m *Manager) runProbes(ctx context.Context) (context.Context, error) { for id, p := range m.probes { if isProbeEnabled(id, m.currentConfig) { - m.runProbe(p) + if tp, ok := p.(probe.RunnableProbe); ok { + m.runningProbesWG.Add(1) + go func(ap probe.RunnableProbe) { + defer m.runningProbesWG.Done() + ap.Run() + }(tp) + } } } @@ -372,11 +383,24 @@ func (m *Manager) loadProbes(target *process.TargetDetails) error { for name, i := range m.probes { if isProbeEnabled(name, m.currentConfig) { m.logger.Info("loading probe", "name", name) - err := i.Load(exe, target, m.currentConfig.SamplingConfig) + if p, ok := i.(probe.TracingProbe); ok { + p.TraceConfig().SamplingConfig = m.currentConfig.SamplingConfig + } + if p, ok := i.(probe.GoLibraryTelemetryProbe); ok { + p.TargetConfig().Executable = exe + } + + err := i.Load() if err != nil { m.logger.Error("error while loading probes, cleaning up", "error", err, "name", name) return errors.Join(err, m.cleanup(target)) } + + err = i.Attach() + if err != nil { + m.logger.Error("error while attaching probes, cleaning up", "error", err, "name", name) + return errors.Join(err, m.cleanup(target)) + } } } @@ -396,8 +420,12 @@ func (m *Manager) mount(target *process.TargetDetails) error { func (m *Manager) cleanup(target *process.TargetDetails) error { ctx := context.Background() err := m.cp.Shutdown(context.Background()) + + // Shut down TelemetryProbes for _, i := range m.probes { - err = errors.Join(err, i.Close()) + if p, ok := i.(probe.RunnableProbe); ok { + err = errors.Join(err, p.Close()) + } } // Wait for all probes to close so we know there is no more telemetry being @@ -409,31 +437,3 @@ func (m *Manager) cleanup(target *process.TargetDetails) error { m.logger.Debug("Cleaning bpffs") return errors.Join(err, bpffsCleanup(target)) } - -func (m *Manager) availableProbes() []probe.Probe { - p := []probe.Probe{ - grpcClient.New(m.logger, m.version), - grpcServer.New(m.logger, m.version), - httpServer.New(m.logger, m.version), - httpClient.New(m.logger, m.version), - dbSql.New(m.logger, m.version), - kafkaProducer.New(m.logger, m.version), - kafkaConsumer.New(m.logger, m.version), - autosdk.New(m.logger), - } - - if m.globalImpl { - p = append(p, otelTraceGlobal.New(m.logger)) - } - - return p -} - -func (m *Manager) registerProbes() error { - for _, p := range m.availableProbes() { - if err := m.registerProbe(p); err != nil { - return err - } - } - return nil -} diff --git a/internal/pkg/instrumentation/manager_test.go b/internal/pkg/instrumentation/manager_test.go index 9e1a58d98..e31fd93ca 100644 --- a/internal/pkg/instrumentation/manager_test.go +++ b/internal/pkg/instrumentation/manager_test.go @@ -23,16 +23,20 @@ import ( "go.opentelemetry.io/otel/trace/noop" "go.opentelemetry.io/auto/internal/pkg/instrumentation/probe" - "go.opentelemetry.io/auto/internal/pkg/instrumentation/probe/sampling" "go.opentelemetry.io/auto/internal/pkg/opentelemetry" "go.opentelemetry.io/auto/internal/pkg/process" "go.opentelemetry.io/auto/internal/pkg/process/binary" + + httpClient "go.opentelemetry.io/auto/internal/pkg/instrumentation/bpf/net/http/client" + httpServer "go.opentelemetry.io/auto/internal/pkg/instrumentation/bpf/net/http/server" ) func TestProbeFiltering(t *testing.T) { ver, err := version.NewVersion("1.20.0") assert.NoError(t, err) + mockExeAndBpffs(t) + t.Run("empty target details", func(t *testing.T) { m := fakeManager(t) @@ -43,7 +47,7 @@ func TestProbeFiltering(t *testing.T) { Libraries: map[string]*version.Version{}, AllocationDetails: nil, } - m.FilterUnusedProbes(&td) + m.FilterUnusedProbesForTarget(&td) assert.Equal(t, 0, len(m.probes)) }) @@ -61,7 +65,7 @@ func TestProbeFiltering(t *testing.T) { Libraries: map[string]*version.Version{}, AllocationDetails: nil, } - m.FilterUnusedProbes(&td) + m.FilterUnusedProbesForTarget(&td) assert.Equal(t, 1, len(m.probes)) // one function, single probe }) @@ -80,7 +84,7 @@ func TestProbeFiltering(t *testing.T) { Libraries: map[string]*version.Version{}, AllocationDetails: nil, } - m.FilterUnusedProbes(&td) + m.FilterUnusedProbesForTarget(&td) assert.Equal(t, 2, len(m.probes)) }) @@ -100,7 +104,7 @@ func TestProbeFiltering(t *testing.T) { Libraries: map[string]*version.Version{}, AllocationDetails: nil, } - m.FilterUnusedProbes(&td) + m.FilterUnusedProbesForTarget(&td) assert.Equal(t, 1, len(m.probes)) }) } @@ -189,7 +193,11 @@ func TestDependencyChecks(t *testing.T) { } func fakeManager(t *testing.T) *Manager { - m, err := NewManager(slog.Default(), nil, true, NewNoopConfigProvider(nil), "") + probes := []probe.BaseProbe{ + httpClient.New(nil, "", nil, httpClient.Config{}), + httpServer.New(nil, "", nil), + } + m, err := NewManager(slog.Default(), nil, probes, NewNoopConfigProvider(nil), "") assert.NoError(t, err) assert.NotNil(t, m) @@ -246,7 +254,7 @@ func TestRunStoppingByContext(t *testing.T) { m := &Manager{ otelController: ctrl, logger: slog.Default(), - probes: map[probe.ID]probe.Probe{{}: p}, + probes: map[probe.ID]probe.BaseProbe{{}: p}, cp: NewNoopConfigProvider(nil), } @@ -295,7 +303,7 @@ func TestRunStoppingByStop(t *testing.T) { m := &Manager{ otelController: ctrl, logger: slog.Default(), - probes: map[probe.ID]probe.Probe{{}: &p}, + probes: map[probe.ID]probe.BaseProbe{{}: &p}, cp: NewNoopConfigProvider(nil), } @@ -327,7 +335,7 @@ func TestRunStoppingByStop(t *testing.T) { } type slowProbe struct { - probe.Probe + probe.BaseProbe closeSignal chan struct{} stop chan struct{} @@ -340,11 +348,20 @@ func newSlowProbe(stop chan struct{}) slowProbe { } } -func (p slowProbe) Load(*link.Executable, *process.TargetDetails, *sampling.Config) error { +var ( + _ probe.BaseProbe = (*slowProbe)(nil) + _ probe.RunnableProbe = (*slowProbe)(nil) +) + +func (p slowProbe) Load() error { + return nil +} + +func (p slowProbe) Attach() error { return nil } -func (p slowProbe) Run(func(ptrace.ScopeSpans)) { +func (p slowProbe) Run() { } func (p slowProbe) Close() error { @@ -354,17 +371,37 @@ func (p slowProbe) Close() error { } type noopProbe struct { - loaded, running, closed atomic.Bool + loaded, attached, running, closed atomic.Bool } -var _ probe.Probe = (*noopProbe)(nil) +var ( + _ probe.BaseProbe = (*noopProbe)(nil) + _ probe.RunnableProbe = (*noopProbe)(nil) +) + +func (p *noopProbe) ApplyConfig(probe.Config) error { + return nil +} -func (p *noopProbe) Load(*link.Executable, *process.TargetDetails, *sampling.Config) error { +func (p *noopProbe) Load() error { p.loaded.Store(true) return nil } -func (p *noopProbe) Run(func(ptrace.ScopeSpans)) { +func (p *noopProbe) Attach() error { + p.attached.Store(true) + return nil +} + +func (p *noopProbe) GetLogger() *slog.Logger { + return nil +} + +func (p *noopProbe) ID() probe.ID { + return probe.ID{InstrumentedPkg: "noop"} +} + +func (p *noopProbe) Run() { p.running.Store(true) } @@ -419,7 +456,7 @@ func TestConfigProvider(t *testing.T) { m := &Manager{ logger: slog.Default(), - probes: map[probe.ID]probe.Probe{ + probes: map[probe.ID]probe.BaseProbe{ netHTTPClientProbeID: &noopProbe{}, netHTTPServerProbeID: &noopProbe{}, somePackageProducerProbeID: &noopProbe{}, @@ -523,24 +560,37 @@ func TestConfigProvider(t *testing.T) { }) } +var ( + _ probe.BaseProbe = (*hangingProbe)(nil) + _ probe.RunnableProbe = (*hangingProbe)(nil) +) + type hangingProbe struct { - probe.Probe + probe.BaseProbe closeReturned chan struct{} + handler func(ptrace.ScopeSpans) } -func newHangingProbe() *hangingProbe { - return &hangingProbe{closeReturned: make(chan struct{})} +func newHangingProbe(handler func(ptrace.ScopeSpans)) *hangingProbe { + return &hangingProbe{ + closeReturned: make(chan struct{}), + handler: handler, + } } -func (p *hangingProbe) Load(*link.Executable, *process.TargetDetails, *sampling.Config) error { +func (p *hangingProbe) Load() error { return nil } -func (p *hangingProbe) Run(handle func(ptrace.ScopeSpans)) { +func (p *hangingProbe) Attach() error { + return nil +} + +func (p *hangingProbe) Run() { <-p.closeReturned // Write after Close has returned. - handle(ptrace.NewScopeSpans()) + p.handler(ptrace.NewScopeSpans()) } func (p *hangingProbe) Close() error { @@ -549,17 +599,17 @@ func (p *hangingProbe) Close() error { } func TestRunStopDeadlock(t *testing.T) { - // Regression test for #1228. - p := newHangingProbe() - tp := new(shutdownTracerProvider) ctrl, err := opentelemetry.NewController(slog.Default(), tp) require.NoError(t, err) + // Regression test for #1228. + p := newHangingProbe(ctrl.Trace) + m := &Manager{ otelController: ctrl, logger: slog.Default(), - probes: map[probe.ID]probe.Probe{{}: p}, + probes: map[probe.ID]probe.BaseProbe{{}: p}, cp: NewNoopConfigProvider(nil), } @@ -607,7 +657,7 @@ func TestStopBeforeLoad(t *testing.T) { m := &Manager{ otelController: ctrl, logger: slog.Default(), - probes: map[probe.ID]probe.Probe{{}: &p}, + probes: map[probe.ID]probe.BaseProbe{{}: &p}, cp: NewNoopConfigProvider(nil), } @@ -627,7 +677,7 @@ func TestStopBeforeRun(t *testing.T) { m := &Manager{ otelController: ctrl, logger: slog.Default(), - probes: map[probe.ID]probe.Probe{{}: &p}, + probes: map[probe.ID]probe.BaseProbe{{}: &p}, cp: NewNoopConfigProvider(nil), } From 1826b468b6a117ea6a6cb4ee039aaaca65a4e813 Mon Sep 17 00:00:00 2001 From: Mike Dame Date: Fri, 22 Nov 2024 16:13:00 -0500 Subject: [PATCH 7/7] Switch current Probes to new interfaces --- .../instrumentation/bpf/database/sql/probe.go | 83 +++--- .../segmentio/kafka-go/consumer/probe.go | 116 +++++---- .../segmentio/kafka-go/producer/probe.go | 91 ++++--- .../bpf/go.opentelemetry.io/auto/sdk/probe.go | 118 +++++---- .../otel/traceglobal/probe.go | 210 ++++++++------- .../google.golang.org/grpc/client/probe.go | 153 ++++++----- .../google.golang.org/grpc/server/probe.go | 194 +++++++------- .../bpf/net/http/client/probe.go | 242 ++++++++++-------- .../bpf/net/http/server/probe.go | 176 +++++++------ 9 files changed, 766 insertions(+), 617 deletions(-) diff --git a/internal/pkg/instrumentation/bpf/database/sql/probe.go b/internal/pkg/instrumentation/bpf/database/sql/probe.go index 8bf0c06c9..288247362 100644 --- a/internal/pkg/instrumentation/bpf/database/sql/probe.go +++ b/internal/pkg/instrumentation/bpf/database/sql/probe.go @@ -29,45 +29,34 @@ const ( IncludeDBStatementEnvVar = "OTEL_GO_AUTO_INCLUDE_DB_STATEMENT" ) -// New returns a new [probe.Probe]. -func New(logger *slog.Logger, version string) probe.Probe { +type DatabaseSQLProbe struct { + *probe.TargetSpanProducingProbe[bpfObjects, event] +} + +func (d *DatabaseSQLProbe) ApplyConfig(c probe.Config) error { + return nil +} + +// New returns a new [probe.GoLibraryTelemetryProbe]. +func New(logger *slog.Logger, version string, handler func(ptrace.ScopeSpans)) probe.GoLibraryTelemetryProbe { id := probe.ID{ SpanKind: trace.SpanKindClient, InstrumentedPkg: pkg, } - return &probe.SpanProducer[bpfObjects, event]{ - Base: probe.Base[bpfObjects, event]{ - ID: id, - Logger: logger, - Consts: []probe.Const{ - probe.RegistersABIConst{}, - probe.AllocationConst{}, - probe.KeyValConst{ - Key: "should_include_db_statement", - Val: shouldIncludeDBStatement(), - }, - }, - Uprobes: []probe.Uprobe{ - { - Sym: "database/sql.(*DB).queryDC", - EntryProbe: "uprobe_queryDC", - ReturnProbe: "uprobe_queryDC_Returns", - Optional: true, - }, - { - Sym: "database/sql.(*DB).execDC", - EntryProbe: "uprobe_execDC", - ReturnProbe: "uprobe_execDC_Returns", - Optional: true, - }, - }, - - SpecFn: loadBpf, - }, - Version: version, - SchemaURL: semconv.SchemaURL, - ProcessFn: processFn, + + p := &DatabaseSQLProbe{ + TargetSpanProducingProbe: probe.NewTargetSpanProducingProbe[bpfObjects, event](), } + p.ProbeID = id + p.Logger = logger + p.Consts = consts + p.Uprobes = uprobes + p.SpecFn = loadBpf + p.Version = version + p.SchemaURL = semconv.SchemaURL + p.ProcessFn = processFn + p.Handler = handler + return p } // event represents an event in an SQL database @@ -112,3 +101,29 @@ func shouldIncludeDBStatement() bool { return false } + +var ( + consts = []probe.Const{ + probe.RegistersABIConst{}, + probe.AllocationConst{}, + probe.KeyValConst{ + Key: "should_include_db_statement", + Val: shouldIncludeDBStatement(), + }, + } + + uprobes = []probe.Uprobe{ + { + Sym: "database/sql.(*DB).queryDC", + EntryProbe: "uprobe_queryDC", + ReturnProbe: "uprobe_queryDC_Returns", + Optional: true, + }, + { + Sym: "database/sql.(*DB).execDC", + EntryProbe: "uprobe_execDC", + ReturnProbe: "uprobe_execDC_Returns", + Optional: true, + }, + } +) diff --git a/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/consumer/probe.go b/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/consumer/probe.go index d67e62717..8a47d089a 100644 --- a/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/consumer/probe.go +++ b/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/consumer/probe.go @@ -27,61 +27,34 @@ const ( pkg = "github.com/segmentio/kafka-go" ) -// New returns a new [probe.Probe]. -func New(logger *slog.Logger, version string) probe.Probe { +type KafkaConsumerProbe struct { + *probe.TargetSpanProducingProbe[bpfObjects, event] +} + +func (k *KafkaConsumerProbe) ApplyConfig(c probe.Config) error { + return nil +} + +// New returns a new [probe.GoLibraryTelemetryProbe]. +func New(logger *slog.Logger, version string, handler func(ptrace.ScopeSpans)) probe.GoLibraryTelemetryProbe { id := probe.ID{ SpanKind: trace.SpanKindConsumer, InstrumentedPkg: pkg, } - return &probe.SpanProducer[bpfObjects, event]{ - Base: probe.Base[bpfObjects, event]{ - ID: id, - Logger: logger, - Consts: []probe.Const{ - probe.RegistersABIConst{}, - probe.AllocationConst{}, - probe.StructFieldConst{ - Key: "message_headers_pos", - Val: structfield.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Message", "Headers"), - }, - probe.StructFieldConst{ - Key: "message_key_pos", - Val: structfield.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Message", "Key"), - }, - probe.StructFieldConst{ - Key: "message_topic_pos", - Val: structfield.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Message", "Topic"), - }, - probe.StructFieldConst{ - Key: "message_partition_pos", - Val: structfield.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Message", "Partition"), - }, - probe.StructFieldConst{ - Key: "message_offset_pos", - Val: structfield.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Message", "Offset"), - }, - probe.StructFieldConst{ - Key: "reader_config_pos", - Val: structfield.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Reader", "config"), - }, - probe.StructFieldConst{ - Key: "reader_config_group_id_pos", - Val: structfield.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "ReaderConfig", "GroupID"), - }, - }, - Uprobes: []probe.Uprobe{ - { - Sym: "github.com/segmentio/kafka-go.(*Reader).FetchMessage", - EntryProbe: "uprobe_FetchMessage", - ReturnProbe: "uprobe_FetchMessage_Returns", - }, - }, - SpecFn: loadBpf, - }, - Version: version, - SchemaURL: semconv.SchemaURL, - ProcessFn: processFn, + + p := &KafkaConsumerProbe{ + TargetSpanProducingProbe: probe.NewTargetSpanProducingProbe[bpfObjects, event](), } + p.ProbeID = id + p.Logger = logger + p.Uprobes = uprobes + p.Consts = consts + p.SpecFn = loadBpf + p.Version = version + p.SchemaURL = semconv.SchemaURL + p.ProcessFn = processFn + p.Handler = handler + return p } // event represents a kafka message received by the consumer. @@ -129,3 +102,46 @@ func processFn(e *event) ptrace.SpanSlice { func kafkaConsumerSpanName(topic string) string { return fmt.Sprintf("%s receive", topic) } + +var ( + consts = []probe.Const{ + probe.RegistersABIConst{}, + probe.AllocationConst{}, + probe.StructFieldConst{ + Key: "message_headers_pos", + Val: structfield.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Message", "Headers"), + }, + probe.StructFieldConst{ + Key: "message_key_pos", + Val: structfield.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Message", "Key"), + }, + probe.StructFieldConst{ + Key: "message_topic_pos", + Val: structfield.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Message", "Topic"), + }, + probe.StructFieldConst{ + Key: "message_partition_pos", + Val: structfield.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Message", "Partition"), + }, + probe.StructFieldConst{ + Key: "message_offset_pos", + Val: structfield.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Message", "Offset"), + }, + probe.StructFieldConst{ + Key: "reader_config_pos", + Val: structfield.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Reader", "config"), + }, + probe.StructFieldConst{ + Key: "reader_config_group_id_pos", + Val: structfield.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "ReaderConfig", "GroupID"), + }, + } + + uprobes = []probe.Uprobe{ + { + Sym: "github.com/segmentio/kafka-go.(*Reader).FetchMessage", + EntryProbe: "uprobe_FetchMessage", + ReturnProbe: "uprobe_FetchMessage_Returns", + }, + } +) diff --git a/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/producer/probe.go b/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/producer/probe.go index 8f2a4903a..ca29970bf 100644 --- a/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/producer/probe.go +++ b/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/producer/probe.go @@ -27,49 +27,33 @@ const ( pkg = "github.com/segmentio/kafka-go" ) -// New returns a new [probe.Probe]. -func New(logger *slog.Logger, version string) probe.Probe { +type KafkaProducerProbe struct { + *probe.TargetSpanProducingProbe[bpfObjects, event] +} + +func (k *KafkaProducerProbe) ApplyConfig(c probe.Config) error { + return nil +} + +// New returns a new [probe.GoLibraryTelemetryProbe]. +func New(logger *slog.Logger, version string, handler func(ptrace.ScopeSpans)) probe.GoLibraryTelemetryProbe { id := probe.ID{ SpanKind: trace.SpanKindProducer, InstrumentedPkg: pkg, } - return &probe.SpanProducer[bpfObjects, event]{ - Base: probe.Base[bpfObjects, event]{ - ID: id, - Logger: logger, - Consts: []probe.Const{ - probe.RegistersABIConst{}, - probe.AllocationConst{}, - probe.StructFieldConst{ - Key: "writer_topic_pos", - Val: structfield.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Writer", "Topic"), - }, - probe.StructFieldConst{ - Key: "message_headers_pos", - Val: structfield.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Message", "Headers"), - }, - probe.StructFieldConst{ - Key: "message_key_pos", - Val: structfield.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Message", "Key"), - }, - probe.StructFieldConst{ - Key: "message_time_pos", - Val: structfield.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Message", "Time"), - }, - }, - Uprobes: []probe.Uprobe{ - { - Sym: "github.com/segmentio/kafka-go.(*Writer).WriteMessages", - EntryProbe: "uprobe_WriteMessages", - ReturnProbe: "uprobe_WriteMessages_Returns", - }, - }, - SpecFn: loadBpf, - }, - Version: version, - SchemaURL: semconv.SchemaURL, - ProcessFn: processFn, + + p := &KafkaProducerProbe{ + TargetSpanProducingProbe: probe.NewTargetSpanProducingProbe[bpfObjects, event](), } + p.ProbeID = id + p.Logger = logger + p.Consts = consts + p.Uprobes = uprobes + p.Version = version + p.SchemaURL = semconv.SchemaURL + p.ProcessFn = processFn + p.Handler = handler + return p } type messageAttributes struct { @@ -147,3 +131,34 @@ func processFn(e *event) ptrace.SpanSlice { func kafkaProducerSpanName(topic string) string { return fmt.Sprintf("%s publish", topic) } + +var ( + consts = []probe.Const{ + probe.RegistersABIConst{}, + probe.AllocationConst{}, + probe.StructFieldConst{ + Key: "writer_topic_pos", + Val: structfield.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Writer", "Topic"), + }, + probe.StructFieldConst{ + Key: "message_headers_pos", + Val: structfield.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Message", "Headers"), + }, + probe.StructFieldConst{ + Key: "message_key_pos", + Val: structfield.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Message", "Key"), + }, + probe.StructFieldConst{ + Key: "message_time_pos", + Val: structfield.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Message", "Time"), + }, + } + + uprobes = []probe.Uprobe{ + { + Sym: "github.com/segmentio/kafka-go.(*Writer).WriteMessages", + EntryProbe: "uprobe_WriteMessages", + ReturnProbe: "uprobe_WriteMessages_Returns", + }, + } +) diff --git a/internal/pkg/instrumentation/bpf/go.opentelemetry.io/auto/sdk/probe.go b/internal/pkg/instrumentation/bpf/go.opentelemetry.io/auto/sdk/probe.go index 7c963c30f..5da902b1e 100644 --- a/internal/pkg/instrumentation/bpf/go.opentelemetry.io/auto/sdk/probe.go +++ b/internal/pkg/instrumentation/bpf/go.opentelemetry.io/auto/sdk/probe.go @@ -18,63 +18,34 @@ import ( //go:generate go run github.com/cilium/ebpf/cmd/bpf2go -target amd64,arm64 bpf ./bpf/probe.bpf.c -// New returns a new [probe.Probe]. -func New(logger *slog.Logger) probe.Probe { +type OtelAutoProbe struct { + *probe.TargetTraceProducingProbe[bpfObjects, event] +} + +func (o *OtelAutoProbe) ApplyConfig(c probe.Config) error { + return nil +} + +// New returns a new [probe.GoLibraryTelemetryProbe]. +func New(logger *slog.Logger, handler func(ptrace.ScopeSpans)) probe.GoLibraryTelemetryProbe { id := probe.ID{ SpanKind: trace.SpanKindClient, InstrumentedPkg: "go.opentelemetry.io/auto", } c := &converter{logger: logger} - return &probe.TraceProducer[bpfObjects, event]{ - Base: probe.Base[bpfObjects, event]{ - ID: id, - Logger: logger, - Consts: []probe.Const{ - probe.RegistersABIConst{}, - probe.AllocationConst{}, - probe.StructFieldConst{ - Key: "span_context_trace_id_pos", - Val: structfield.NewID( - "go.opentelemetry.io/otel", - "go.opentelemetry.io/otel/trace", - "SpanContext", - "traceID", - ), - }, - probe.StructFieldConst{ - Key: "span_context_span_id_pos", - Val: structfield.NewID( - "go.opentelemetry.io/otel", - "go.opentelemetry.io/otel/trace", - "SpanContext", - "spanID", - ), - }, - probe.StructFieldConst{ - Key: "span_context_trace_flags_pos", - Val: structfield.NewID( - "go.opentelemetry.io/otel", - "go.opentelemetry.io/otel/trace", - "SpanContext", - "traceFlags", - ), - }, - }, - Uprobes: []probe.Uprobe{ - { - Sym: "go.opentelemetry.io/auto/sdk.(*tracer).start", - EntryProbe: "uprobe_Tracer_start", - }, - { - Sym: "go.opentelemetry.io/auto/sdk.(*span).ended", - EntryProbe: "uprobe_Span_ended", - }, - }, - SpecFn: loadBpf, - ProcessRecord: c.decodeEvent, - }, - ProcessFn: c.processFn, + + p := &OtelAutoProbe{ + TargetTraceProducingProbe: probe.NewTargetTraceProducingProbe[bpfObjects, event](), } + p.ProbeID = id + p.Logger = logger + p.Consts = consts + p.Uprobes = uprobes + p.SpecFn = loadBpf + p.ProcessRecord = c.decodeEvent + p.ProcessFn = c.processFn + p.Handler = handler + return p } type event struct { @@ -129,3 +100,48 @@ func (c *converter) processFn(e *event) ptrace.ScopeSpans { return ss.At(0) } + +var ( + consts = []probe.Const{ + probe.RegistersABIConst{}, + probe.AllocationConst{}, + probe.StructFieldConst{ + Key: "span_context_trace_id_pos", + Val: structfield.NewID( + "go.opentelemetry.io/otel", + "go.opentelemetry.io/otel/trace", + "SpanContext", + "traceID", + ), + }, + probe.StructFieldConst{ + Key: "span_context_span_id_pos", + Val: structfield.NewID( + "go.opentelemetry.io/otel", + "go.opentelemetry.io/otel/trace", + "SpanContext", + "spanID", + ), + }, + probe.StructFieldConst{ + Key: "span_context_trace_flags_pos", + Val: structfield.NewID( + "go.opentelemetry.io/otel", + "go.opentelemetry.io/otel/trace", + "SpanContext", + "traceFlags", + ), + }, + } + + uprobes = []probe.Uprobe{ + { + Sym: "go.opentelemetry.io/auto/sdk.(*tracer).start", + EntryProbe: "uprobe_Tracer_start", + }, + { + Sym: "go.opentelemetry.io/auto/sdk.(*span).ended", + EntryProbe: "uprobe_Span_ended", + }, + } +) diff --git a/internal/pkg/instrumentation/bpf/go.opentelemetry.io/otel/traceglobal/probe.go b/internal/pkg/instrumentation/bpf/go.opentelemetry.io/otel/traceglobal/probe.go index b8d8ef1f5..81dff2f5b 100644 --- a/internal/pkg/instrumentation/bpf/go.opentelemetry.io/otel/traceglobal/probe.go +++ b/internal/pkg/instrumentation/bpf/go.opentelemetry.io/otel/traceglobal/probe.go @@ -35,108 +35,32 @@ const ( pkg = "go.opentelemetry.io/otel/internal/global" ) -// New returns a new [probe.Probe]. -func New(logger *slog.Logger) probe.Probe { +type OtelTraceGlobalProbe struct { + *probe.TargetTraceProducingProbe[bpfObjects, event] +} + +func (o *OtelTraceGlobalProbe) ApplyConfig(c probe.Config) error { + return nil +} + +// New returns a new [probe.GoLibraryTelemetryProbe]. +func New(logger *slog.Logger, handler func(ptrace.ScopeSpans)) probe.GoLibraryTelemetryProbe { id := probe.ID{ SpanKind: trace.SpanKindClient, InstrumentedPkg: pkg, } - return &probe.TraceProducer[bpfObjects, event]{ - Base: probe.Base[bpfObjects, event]{ - ID: id, - Logger: logger, - Consts: []probe.Const{ - probe.RegistersABIConst{}, - probe.AllocationConst{}, - probe.KeyValConst{ - Key: "attr_type_invalid", - Val: uint64(attribute.INVALID), - }, - probe.KeyValConst{ - Key: "attr_type_bool", - Val: uint64(attribute.BOOL), - }, - probe.KeyValConst{ - Key: "attr_type_int64", - Val: uint64(attribute.INT64), - }, - probe.KeyValConst{ - Key: "attr_type_float64", - Val: uint64(attribute.FLOAT64), - }, - probe.KeyValConst{ - Key: "attr_type_string", - Val: uint64(attribute.STRING), - }, - probe.KeyValConst{ - Key: "attr_type_boolslice", - Val: uint64(attribute.BOOLSLICE), - }, - probe.KeyValConst{ - Key: "attr_type_int64slice", - Val: uint64(attribute.INT64SLICE), - }, - probe.KeyValConst{ - Key: "attr_type_float64slice", - Val: uint64(attribute.FLOAT64SLICE), - }, - probe.KeyValConst{ - Key: "attr_type_stringslice", - Val: uint64(attribute.STRINGSLICE), - }, - probe.StructFieldConst{ - Key: "tracer_delegate_pos", - Val: structfield.NewID("go.opentelemetry.io/otel", "go.opentelemetry.io/otel/internal/global", "tracer", "delegate"), - }, - probe.StructFieldConst{ - Key: "tracer_name_pos", - Val: structfield.NewID("go.opentelemetry.io/otel", "go.opentelemetry.io/otel/internal/global", "tracer", "name"), - }, - probe.StructFieldConst{ - Key: "tracer_provider_pos", - Val: structfield.NewID("go.opentelemetry.io/otel", "go.opentelemetry.io/otel/internal/global", "tracer", "provider"), - }, - probe.StructFieldConst{ - Key: "tracer_provider_tracers_pos", - Val: structfield.NewID("go.opentelemetry.io/otel", "go.opentelemetry.io/otel/internal/global", "tracerProvider", "tracers"), - }, - probe.StructFieldConst{ - Key: "buckets_ptr_pos", - Val: structfield.NewID("std", "runtime", "hmap", "buckets"), - }, - tracerIDContainsSchemaURL{}, - tracerIDContainsScopeAttributes{}, - }, - Uprobes: []probe.Uprobe{ - { - Sym: "go.opentelemetry.io/otel/internal/global.(*tracer).Start", - EntryProbe: "uprobe_Start", - ReturnProbe: "uprobe_Start_Returns", - }, - { - Sym: "go.opentelemetry.io/otel/internal/global.(*nonRecordingSpan).End", - EntryProbe: "uprobe_End", - }, - { - Sym: "go.opentelemetry.io/otel/internal/global.(*nonRecordingSpan).SetAttributes", - EntryProbe: "uprobe_SetAttributes", - Optional: true, - }, - { - Sym: "go.opentelemetry.io/otel/internal/global.(*nonRecordingSpan).SetStatus", - EntryProbe: "uprobe_SetStatus", - Optional: true, - }, - { - Sym: "go.opentelemetry.io/otel/internal/global.(*nonRecordingSpan).SetName", - EntryProbe: "uprobe_SetName", - Optional: true, - }, - }, - SpecFn: loadBpf, - }, - ProcessFn: processFn, + + p := &OtelTraceGlobalProbe{ + TargetTraceProducingProbe: probe.NewTargetTraceProducingProbe[bpfObjects, event](), } + p.ProbeID = id + p.Logger = logger + p.Consts = consts + p.Uprobes = uprobes + p.SpecFn = loadBpf + p.ProcessFn = processFn + p.Handler = handler + return p } // tracerIDContainsSchemaURL is a Probe Const defining whether the tracer key contains schemaURL. @@ -262,3 +186,95 @@ func setAttributes(dest pcommon.Map, ab attributesBuffer) { } } } + +var ( + consts = []probe.Const{ + probe.RegistersABIConst{}, + probe.AllocationConst{}, + probe.KeyValConst{ + Key: "attr_type_invalid", + Val: uint64(attribute.INVALID), + }, + probe.KeyValConst{ + Key: "attr_type_bool", + Val: uint64(attribute.BOOL), + }, + probe.KeyValConst{ + Key: "attr_type_int64", + Val: uint64(attribute.INT64), + }, + probe.KeyValConst{ + Key: "attr_type_float64", + Val: uint64(attribute.FLOAT64), + }, + probe.KeyValConst{ + Key: "attr_type_string", + Val: uint64(attribute.STRING), + }, + probe.KeyValConst{ + Key: "attr_type_boolslice", + Val: uint64(attribute.BOOLSLICE), + }, + probe.KeyValConst{ + Key: "attr_type_int64slice", + Val: uint64(attribute.INT64SLICE), + }, + probe.KeyValConst{ + Key: "attr_type_float64slice", + Val: uint64(attribute.FLOAT64SLICE), + }, + probe.KeyValConst{ + Key: "attr_type_stringslice", + Val: uint64(attribute.STRINGSLICE), + }, + probe.StructFieldConst{ + Key: "tracer_delegate_pos", + Val: structfield.NewID("go.opentelemetry.io/otel", "go.opentelemetry.io/otel/internal/global", "tracer", "delegate"), + }, + probe.StructFieldConst{ + Key: "tracer_name_pos", + Val: structfield.NewID("go.opentelemetry.io/otel", "go.opentelemetry.io/otel/internal/global", "tracer", "name"), + }, + probe.StructFieldConst{ + Key: "tracer_provider_pos", + Val: structfield.NewID("go.opentelemetry.io/otel", "go.opentelemetry.io/otel/internal/global", "tracer", "provider"), + }, + probe.StructFieldConst{ + Key: "tracer_provider_tracers_pos", + Val: structfield.NewID("go.opentelemetry.io/otel", "go.opentelemetry.io/otel/internal/global", "tracerProvider", "tracers"), + }, + probe.StructFieldConst{ + Key: "buckets_ptr_pos", + Val: structfield.NewID("std", "runtime", "hmap", "buckets"), + }, + tracerIDContainsSchemaURL{}, + tracerIDContainsScopeAttributes{}, + } + + uprobes = []probe.Uprobe{ + { + Sym: "go.opentelemetry.io/otel/internal/global.(*tracer).Start", + EntryProbe: "uprobe_Start", + ReturnProbe: "uprobe_Start_Returns", + }, + { + Sym: "go.opentelemetry.io/otel/internal/global.(*nonRecordingSpan).End", + EntryProbe: "uprobe_End", + }, + { + Sym: "go.opentelemetry.io/otel/internal/global.(*nonRecordingSpan).SetAttributes", + EntryProbe: "uprobe_SetAttributes", + Optional: true, + }, + { + Sym: "go.opentelemetry.io/otel/internal/global.(*nonRecordingSpan).SetStatus", + EntryProbe: "uprobe_SetStatus", + Optional: true, + }, + { + Sym: "go.opentelemetry.io/otel/internal/global.(*nonRecordingSpan).SetName", + EntryProbe: "uprobe_SetName", + Optional: true, + }, + } +) diff --git a/internal/pkg/instrumentation/bpf/google.golang.org/grpc/client/probe.go b/internal/pkg/instrumentation/bpf/google.golang.org/grpc/client/probe.go index 680f30eb2..c2b626937 100644 --- a/internal/pkg/instrumentation/bpf/google.golang.org/grpc/client/probe.go +++ b/internal/pkg/instrumentation/bpf/google.golang.org/grpc/client/probe.go @@ -51,79 +51,35 @@ func (w writeStatusConst) InjectOption(td *process.TargetDetails) (inject.Option return inject.WithKeyValue("write_status_supported", writeStatus), nil } -// New returns a new [probe.Probe]. -func New(logger *slog.Logger, version string) probe.Probe { +type ClientProbe struct { + *probe.TargetSpanProducingProbe[bpfObjects, event] +} + +func (g *ClientProbe) ApplyConfig(c probe.Config) error { + return nil +} + +// New returns a new [probe.GoLibraryTelemetryProbe]. +func New(logger *slog.Logger, version string, handler func(ptrace.ScopeSpans)) probe.GoLibraryTelemetryProbe { id := probe.ID{ SpanKind: trace.SpanKindClient, InstrumentedPkg: pkg, } - return &probe.SpanProducer[bpfObjects, event]{ - Base: probe.Base[bpfObjects, event]{ - ID: id, - Logger: logger, - Consts: []probe.Const{ - probe.RegistersABIConst{}, - probe.AllocationConst{}, - writeStatusConst{}, - probe.StructFieldConst{ - Key: "clientconn_target_ptr_pos", - Val: structfield.NewID("google.golang.org/grpc", "google.golang.org/grpc", "ClientConn", "target"), - }, - probe.StructFieldConst{ - Key: "httpclient_nextid_pos", - Val: structfield.NewID("google.golang.org/grpc", "google.golang.org/grpc/internal/transport", "http2Client", "nextID"), - }, - probe.StructFieldConst{ - Key: "headerFrame_hf_pos", - Val: structfield.NewID("google.golang.org/grpc", "google.golang.org/grpc/internal/transport", "headerFrame", "hf"), - }, - probe.StructFieldConst{ - Key: "headerFrame_streamid_pos", - Val: structfield.NewID("google.golang.org/grpc", "google.golang.org/grpc/internal/transport", "headerFrame", "streamID"), - }, - probe.StructFieldConstMinVersion{ - StructField: probe.StructFieldConst{ - Key: "error_status_pos", - Val: structfield.NewID("google.golang.org/grpc", "google.golang.org/grpc/internal/status", "Error", "s"), - }, - MinVersion: writeStatusMinVersion, - }, - probe.StructFieldConstMinVersion{ - StructField: probe.StructFieldConst{ - Key: "status_s_pos", - Val: structfield.NewID("google.golang.org/grpc", "google.golang.org/grpc/internal/status", "Status", "s"), - }, - MinVersion: writeStatusMinVersion, - }, - probe.StructFieldConstMinVersion{ - StructField: probe.StructFieldConst{ - Key: "status_code_pos", - Val: structfield.NewID("google.golang.org/grpc", "google.golang.org/genproto/googleapis/rpc/status", "Status", "Code"), - }, - MinVersion: writeStatusMinVersion, - }, - }, - Uprobes: []probe.Uprobe{ - { - Sym: "google.golang.org/grpc.(*ClientConn).Invoke", - EntryProbe: "uprobe_ClientConn_Invoke", - ReturnProbe: "uprobe_ClientConn_Invoke_Returns", - }, - { - Sym: "google.golang.org/grpc/internal/transport.(*http2Client).NewStream", - EntryProbe: "uprobe_http2Client_NewStream", - }, - { - Sym: "google.golang.org/grpc/internal/transport.(*loopyWriter).headerHandler", - EntryProbe: "uprobe_LoopyWriter_HeaderHandler", - }, - }, - SpecFn: verifyAndLoadBpf, - }, - Version: version, - SchemaURL: semconv.SchemaURL, - ProcessFn: processFn, + + p := &ClientProbe{ + TargetSpanProducingProbe: probe.NewTargetSpanProducingProbe[bpfObjects, event](), } + p.ProbeID = id + p.Logger = logger + p.Uprobes = uprobes + p.Consts = consts + p.SpecFn = verifyAndLoadBpf + p.Version = version + p.SchemaURL = semconv.SchemaURL + p.ProcessFn = processFn + p.Handler = handler + + return p } func verifyAndLoadBpf() (*ebpf.CollectionSpec, error) { @@ -188,3 +144,64 @@ func processFn(e *event) ptrace.SpanSlice { return spans } + +var ( + consts = []probe.Const{ + probe.RegistersABIConst{}, + probe.AllocationConst{}, + writeStatusConst{}, + probe.StructFieldConst{ + Key: "clientconn_target_ptr_pos", + Val: structfield.NewID("google.golang.org/grpc", "google.golang.org/grpc", "ClientConn", "target"), + }, + probe.StructFieldConst{ + Key: "httpclient_nextid_pos", + Val: structfield.NewID("google.golang.org/grpc", "google.golang.org/grpc/internal/transport", "http2Client", "nextID"), + }, + probe.StructFieldConst{ + Key: "headerFrame_hf_pos", + Val: structfield.NewID("google.golang.org/grpc", "google.golang.org/grpc/internal/transport", "headerFrame", "hf"), + }, + probe.StructFieldConst{ + Key: "headerFrame_streamid_pos", + Val: structfield.NewID("google.golang.org/grpc", "google.golang.org/grpc/internal/transport", "headerFrame", "streamID"), + }, + probe.StructFieldConstMinVersion{ + StructField: probe.StructFieldConst{ + Key: "error_status_pos", + Val: structfield.NewID("google.golang.org/grpc", "google.golang.org/grpc/internal/status", "Error", "s"), + }, + MinVersion: writeStatusMinVersion, + }, + probe.StructFieldConstMinVersion{ + StructField: probe.StructFieldConst{ + Key: "status_s_pos", + Val: structfield.NewID("google.golang.org/grpc", "google.golang.org/grpc/internal/status", "Status", "s"), + }, + MinVersion: writeStatusMinVersion, + }, + probe.StructFieldConstMinVersion{ + StructField: probe.StructFieldConst{ + Key: "status_code_pos", + Val: structfield.NewID("google.golang.org/grpc", "google.golang.org/genproto/googleapis/rpc/status", "Status", "Code"), + }, + MinVersion: writeStatusMinVersion, + }, + } + + uprobes = []probe.Uprobe{ + { + Sym: "google.golang.org/grpc.(*ClientConn).Invoke", + EntryProbe: "uprobe_ClientConn_Invoke", + ReturnProbe: "uprobe_ClientConn_Invoke_Returns", + }, + { + Sym: "google.golang.org/grpc/internal/transport.(*http2Client).NewStream", + EntryProbe: "uprobe_http2Client_NewStream", + }, + { + Sym: "google.golang.org/grpc/internal/transport.(*loopyWriter).headerHandler", + EntryProbe: "uprobe_LoopyWriter_HeaderHandler", + }, + } +) diff --git a/internal/pkg/instrumentation/bpf/google.golang.org/grpc/server/probe.go b/internal/pkg/instrumentation/bpf/google.golang.org/grpc/server/probe.go index 6828e7f3d..1c67dee49 100644 --- a/internal/pkg/instrumentation/bpf/google.golang.org/grpc/server/probe.go +++ b/internal/pkg/instrumentation/bpf/google.golang.org/grpc/server/probe.go @@ -32,100 +32,34 @@ const ( pkg = "google.golang.org/grpc" ) -// New returns a new [probe.Probe]. -func New(logger *slog.Logger, version string) probe.Probe { +type GrpcServerProbe struct { + *probe.TargetSpanProducingProbe[bpfObjects, event] +} + +func (g *GrpcServerProbe) ApplyConfig(c probe.Config) error { + return nil +} + +// New returns a new [probe.GoLibraryTelemetryProbe]. +func New(logger *slog.Logger, version string, handler func(ptrace.ScopeSpans)) probe.GoLibraryTelemetryProbe { id := probe.ID{ SpanKind: trace.SpanKindServer, InstrumentedPkg: pkg, } - return &probe.SpanProducer[bpfObjects, event]{ - Base: probe.Base[bpfObjects, event]{ - ID: id, - Logger: logger, - Consts: []probe.Const{ - probe.RegistersABIConst{}, - probe.AllocationConst{}, - writeStatusConst{}, - serverAddrConst{}, - probe.StructFieldConst{ - Key: "stream_method_ptr_pos", - Val: structfield.NewID("google.golang.org/grpc", "google.golang.org/grpc/internal/transport", "Stream", "method"), - }, - probe.StructFieldConst{ - Key: "stream_id_pos", - Val: structfield.NewID("google.golang.org/grpc", "google.golang.org/grpc/internal/transport", "Stream", "id"), - }, - probe.StructFieldConst{ - Key: "stream_ctx_pos", - Val: structfield.NewID("google.golang.org/grpc", "google.golang.org/grpc/internal/transport", "Stream", "ctx"), - }, - probe.StructFieldConst{ - Key: "frame_fields_pos", - Val: structfield.NewID("golang.org/x/net", "golang.org/x/net/http2", "MetaHeadersFrame", "Fields"), - }, - probe.StructFieldConst{ - Key: "frame_stream_id_pod", - Val: structfield.NewID("golang.org/x/net", "golang.org/x/net/http2", "FrameHeader", "StreamID"), - }, - probe.StructFieldConstMinVersion{ - StructField: probe.StructFieldConst{ - Key: "status_s_pos", - Val: structfield.NewID("google.golang.org/grpc", "google.golang.org/grpc/internal/status", "Status", "s"), - }, - MinVersion: writeStatusMinVersion, - }, - probe.StructFieldConstMinVersion{ - StructField: probe.StructFieldConst{ - Key: "status_code_pos", - Val: structfield.NewID("google.golang.org/grpc", "google.golang.org/genproto/googleapis/rpc/status", "Status", "Code"), - }, - MinVersion: writeStatusMinVersion, - }, - probe.StructFieldConstMinVersion{ - StructField: probe.StructFieldConst{ - Key: "http2server_peer_pos", - Val: structfield.NewID("google.golang.org/grpc", "google.golang.org/grpc/internal/transport", "http2Server", "peer"), - }, - MinVersion: serverAddrMinVersion, - }, - probe.StructFieldConstMinVersion{ - StructField: probe.StructFieldConst{ - Key: "peer_local_addr_pos", - Val: structfield.NewID("google.golang.org/grpc", "google.golang.org/grpc/peer", "Peer", "LocalAddr"), - }, - MinVersion: serverAddrMinVersion, - }, - probe.StructFieldConst{ - Key: "TCPAddr_IP_offset", - Val: structfield.NewID("std", "net", "TCPAddr", "IP"), - }, - probe.StructFieldConst{ - Key: "TCPAddr_Port_offset", - Val: structfield.NewID("std", "net", "TCPAddr", "Port"), - }, - framePosConst{}, - }, - Uprobes: []probe.Uprobe{ - { - Sym: "google.golang.org/grpc.(*Server).handleStream", - EntryProbe: "uprobe_server_handleStream", - ReturnProbe: "uprobe_server_handleStream_Returns", - }, - { - Sym: "google.golang.org/grpc/internal/transport.(*http2Server).operateHeaders", - EntryProbe: "uprobe_http2Server_operateHeader", - }, - { - Sym: "google.golang.org/grpc/internal/transport.(*http2Server).WriteStatus", - EntryProbe: "uprobe_http2Server_WriteStatus", - }, - }, - SpecFn: loadBpf, - }, - Version: version, - SchemaURL: semconv.SchemaURL, - ProcessFn: processFn, + + p := &GrpcServerProbe{ + TargetSpanProducingProbe: probe.NewTargetSpanProducingProbe[bpfObjects, event](), } + p.ProbeID = id + p.Logger = logger + p.Consts = consts + p.Uprobes = uprobes + p.SpecFn = loadBpf + p.Version = version + p.SchemaURL = semconv.SchemaURL + p.ProcessFn = processFn + p.Handler = handler + return p } // framePosConst is a Probe Const defining the position of the @@ -241,3 +175,85 @@ func processFn(e *event) ptrace.SpanSlice { return spans } + +var ( + consts = []probe.Const{ + probe.RegistersABIConst{}, + probe.AllocationConst{}, + writeStatusConst{}, + serverAddrConst{}, + probe.StructFieldConst{ + Key: "stream_method_ptr_pos", + Val: structfield.NewID("google.golang.org/grpc", "google.golang.org/grpc/internal/transport", "Stream", "method"), + }, + probe.StructFieldConst{ + Key: "stream_id_pos", + Val: structfield.NewID("google.golang.org/grpc", "google.golang.org/grpc/internal/transport", "Stream", "id"), + }, + probe.StructFieldConst{ + Key: "stream_ctx_pos", + Val: structfield.NewID("google.golang.org/grpc", "google.golang.org/grpc/internal/transport", "Stream", "ctx"), + }, + probe.StructFieldConst{ + Key: "frame_fields_pos", + Val: structfield.NewID("golang.org/x/net", "golang.org/x/net/http2", "MetaHeadersFrame", "Fields"), + }, + probe.StructFieldConst{ + Key: "frame_stream_id_pod", + Val: structfield.NewID("golang.org/x/net", "golang.org/x/net/http2", "FrameHeader", "StreamID"), + }, + probe.StructFieldConstMinVersion{ + StructField: probe.StructFieldConst{ + Key: "status_s_pos", + Val: structfield.NewID("google.golang.org/grpc", "google.golang.org/grpc/internal/status", "Status", "s"), + }, + MinVersion: writeStatusMinVersion, + }, + probe.StructFieldConstMinVersion{ + StructField: probe.StructFieldConst{ + Key: "status_code_pos", + Val: structfield.NewID("google.golang.org/grpc", "google.golang.org/genproto/googleapis/rpc/status", "Status", "Code"), + }, + MinVersion: writeStatusMinVersion, + }, + probe.StructFieldConstMinVersion{ + StructField: probe.StructFieldConst{ + Key: "http2server_peer_pos", + Val: structfield.NewID("google.golang.org/grpc", "google.golang.org/grpc/internal/transport", "http2Server", "peer"), + }, + MinVersion: serverAddrMinVersion, + }, + probe.StructFieldConstMinVersion{ + StructField: probe.StructFieldConst{ + Key: "peer_local_addr_pos", + Val: structfield.NewID("google.golang.org/grpc", "google.golang.org/grpc/peer", "Peer", "LocalAddr"), + }, + MinVersion: serverAddrMinVersion, + }, + probe.StructFieldConst{ + Key: "TCPAddr_IP_offset", + Val: structfield.NewID("std", "net", "TCPAddr", "IP"), + }, + probe.StructFieldConst{ + Key: "TCPAddr_Port_offset", + Val: structfield.NewID("std", "net", "TCPAddr", "Port"), + }, + framePosConst{}, + } + + uprobes = []probe.Uprobe{ + { + Sym: "google.golang.org/grpc.(*Server).handleStream", + EntryProbe: "uprobe_server_handleStream", + ReturnProbe: "uprobe_server_handleStream_Returns", + }, + { + Sym: "google.golang.org/grpc/internal/transport.(*http2Server).operateHeaders", + EntryProbe: "uprobe_http2Server_operateHeader", + }, + { + Sym: "google.golang.org/grpc/internal/transport.(*http2Server).WriteStatus", + EntryProbe: "uprobe_http2Server_WriteStatus", + }, + } +) diff --git a/internal/pkg/instrumentation/bpf/net/http/client/probe.go b/internal/pkg/instrumentation/bpf/net/http/client/probe.go index c96eb8c32..79d6bd46a 100644 --- a/internal/pkg/instrumentation/bpf/net/http/client/probe.go +++ b/internal/pkg/instrumentation/bpf/net/http/client/probe.go @@ -33,24 +33,33 @@ const ( pkg = "net/http" ) -// New returns a new [probe.Probe]. -func New(logger *slog.Logger, version string) probe.Probe { +type Config struct { + SupportsContextPropagation bool +} + +type HTTPClientProbe struct { + config Config + *probe.TargetSpanProducingProbe[bpfObjects, event] +} + +func (h *HTTPClientProbe) ApplyConfig(c probe.Config) error { + if cfg, ok := c.(Config); ok { + h.config.SupportsContextPropagation = cfg.SupportsContextPropagation + return nil + } + return probe.ErrInvalidConfig +} + +// New returns a new [probe.GoLibraryTelemetryProbe]. +func New(logger *slog.Logger, version string, handler func(ptrace.ScopeSpans), cfg Config) probe.GoLibraryTelemetryProbe { id := probe.ID{ SpanKind: trace.SpanKindClient, InstrumentedPkg: pkg, } - uprobes := []probe.Uprobe{ - { - Sym: "net/http.(*Transport).roundTrip", - EntryProbe: "uprobe_Transport_roundTrip", - ReturnProbe: "uprobe_Transport_roundTrip_Returns", - }, - } - // If the kernel supports context propagation, we enable the // probe which writes the data in the outgoing buffer. - if utils.SupportsContextPropagation() { + if cfg.SupportsContextPropagation { uprobes = append(uprobes, probe.Uprobe{ Sym: "net/http.Header.writeSubset", @@ -63,105 +72,21 @@ func New(logger *slog.Logger, version string) probe.Probe { ) } - return &probe.SpanProducer[bpfObjects, event]{ - Base: probe.Base[bpfObjects, event]{ - ID: id, - Logger: logger, - Consts: []probe.Const{ - probe.RegistersABIConst{}, - probe.AllocationConst{}, - probe.StructFieldConst{ - Key: "method_ptr_pos", - Val: structfield.NewID("std", "net/http", "Request", "Method"), - }, - probe.StructFieldConst{ - Key: "url_ptr_pos", - Val: structfield.NewID("std", "net/http", "Request", "URL"), - }, - probe.StructFieldConst{ - Key: "path_ptr_pos", - Val: structfield.NewID("std", "net/url", "URL", "Path"), - }, - probe.StructFieldConst{ - Key: "headers_ptr_pos", - Val: structfield.NewID("std", "net/http", "Request", "Header"), - }, - probe.StructFieldConst{ - Key: "ctx_ptr_pos", - Val: structfield.NewID("std", "net/http", "Request", "ctx"), - }, - probe.StructFieldConst{ - Key: "status_code_pos", - Val: structfield.NewID("std", "net/http", "Response", "StatusCode"), - }, - probe.StructFieldConst{ - Key: "request_host_pos", - Val: structfield.NewID("std", "net/http", "Request", "Host"), - }, - probe.StructFieldConst{ - Key: "request_proto_pos", - Val: structfield.NewID("std", "net/http", "Request", "Proto"), - }, - probe.StructFieldConst{ - Key: "io_writer_buf_ptr_pos", - Val: structfield.NewID("std", "bufio", "Writer", "buf"), - }, - probe.StructFieldConst{ - Key: "io_writer_n_pos", - Val: structfield.NewID("std", "bufio", "Writer", "n"), - }, - probe.StructFieldConst{ - Key: "scheme_pos", - Val: structfield.NewID("std", "net/url", "URL", "Scheme"), - }, - probe.StructFieldConst{ - Key: "opaque_pos", - Val: structfield.NewID("std", "net/url", "URL", "Opaque"), - }, - probe.StructFieldConst{ - Key: "user_ptr_pos", - Val: structfield.NewID("std", "net/url", "URL", "User"), - }, - probe.StructFieldConst{ - Key: "raw_path_pos", - Val: structfield.NewID("std", "net/url", "URL", "RawPath"), - }, - probe.StructFieldConst{ - Key: "omit_host_pos", - Val: structfield.NewID("std", "net/url", "URL", "OmitHost"), - }, - probe.StructFieldConst{ - Key: "force_query_pos", - Val: structfield.NewID("std", "net/url", "URL", "ForceQuery"), - }, - probe.StructFieldConst{ - Key: "raw_query_pos", - Val: structfield.NewID("std", "net/url", "URL", "RawQuery"), - }, - probe.StructFieldConst{ - Key: "fragment_pos", - Val: structfield.NewID("std", "net/url", "URL", "Fragment"), - }, - probe.StructFieldConst{ - Key: "raw_fragment_pos", - Val: structfield.NewID("std", "net/url", "URL", "RawFragment"), - }, - probe.StructFieldConst{ - Key: "username_pos", - Val: structfield.NewID("std", "net/url", "Userinfo", "username"), - }, - probe.StructFieldConst{ - Key: "url_host_pos", - Val: structfield.NewID("std", "net/url", "URL", "Host"), - }, - }, - Uprobes: uprobes, - SpecFn: verifyAndLoadBpf, - }, - Version: version, - SchemaURL: semconv.SchemaURL, - ProcessFn: processFn, + p := &HTTPClientProbe{ + config: cfg, + TargetSpanProducingProbe: probe.NewTargetSpanProducingProbe[bpfObjects, event](), } + p.Version = version + p.SchemaURL = semconv.SchemaURL + p.ProcessFn = processFn + p.Handler = handler + p.Consts = consts + p.Uprobes = uprobes + p.SpecFn = verifyAndLoadBpf + p.ProbeID = id + p.Logger = logger + + return p } func verifyAndLoadBpf() (*ebpf.CollectionSpec, error) { @@ -281,3 +206,102 @@ func processFn(e *event) ptrace.SpanSlice { return spans } + +var ( + uprobes = []probe.Uprobe{ + { + Sym: "net/http.(*Transport).roundTrip", + EntryProbe: "uprobe_Transport_roundTrip", + ReturnProbe: "uprobe_Transport_roundTrip_Returns", + }, + } + + consts = []probe.Const{ + probe.RegistersABIConst{}, + probe.AllocationConst{}, + probe.StructFieldConst{ + Key: "method_ptr_pos", + Val: structfield.NewID("std", "net/http", "Request", "Method"), + }, + probe.StructFieldConst{ + Key: "url_ptr_pos", + Val: structfield.NewID("std", "net/http", "Request", "URL"), + }, + probe.StructFieldConst{ + Key: "path_ptr_pos", + Val: structfield.NewID("std", "net/url", "URL", "Path"), + }, + probe.StructFieldConst{ + Key: "headers_ptr_pos", + Val: structfield.NewID("std", "net/http", "Request", "Header"), + }, + probe.StructFieldConst{ + Key: "ctx_ptr_pos", + Val: structfield.NewID("std", "net/http", "Request", "ctx"), + }, + probe.StructFieldConst{ + Key: "status_code_pos", + Val: structfield.NewID("std", "net/http", "Response", "StatusCode"), + }, + probe.StructFieldConst{ + Key: "request_host_pos", + Val: structfield.NewID("std", "net/http", "Request", "Host"), + }, + probe.StructFieldConst{ + Key: "request_proto_pos", + Val: structfield.NewID("std", "net/http", "Request", "Proto"), + }, + probe.StructFieldConst{ + Key: "io_writer_buf_ptr_pos", + Val: structfield.NewID("std", "bufio", "Writer", "buf"), + }, + probe.StructFieldConst{ + Key: "io_writer_n_pos", + Val: structfield.NewID("std", "bufio", "Writer", "n"), + }, + probe.StructFieldConst{ + Key: "scheme_pos", + Val: structfield.NewID("std", "net/url", "URL", "Scheme"), + }, + probe.StructFieldConst{ + Key: "opaque_pos", + Val: structfield.NewID("std", "net/url", "URL", "Opaque"), + }, + probe.StructFieldConst{ + Key: "user_ptr_pos", + Val: structfield.NewID("std", "net/url", "URL", "User"), + }, + probe.StructFieldConst{ + Key: "raw_path_pos", + Val: structfield.NewID("std", "net/url", "URL", "RawPath"), + }, + probe.StructFieldConst{ + Key: "omit_host_pos", + Val: structfield.NewID("std", "net/url", "URL", "OmitHost"), + }, + probe.StructFieldConst{ + Key: "force_query_pos", + Val: structfield.NewID("std", "net/url", "URL", "ForceQuery"), + }, + probe.StructFieldConst{ + Key: "raw_query_pos", + Val: structfield.NewID("std", "net/url", "URL", "RawQuery"), + }, + probe.StructFieldConst{ + Key: "fragment_pos", + Val: structfield.NewID("std", "net/url", "URL", "Fragment"), + }, + probe.StructFieldConst{ + Key: "raw_fragment_pos", + Val: structfield.NewID("std", "net/url", "URL", "RawFragment"), + }, + probe.StructFieldConst{ + Key: "username_pos", + Val: structfield.NewID("std", "net/url", "Userinfo", "username"), + }, + probe.StructFieldConst{ + Key: "url_host_pos", + Val: structfield.NewID("std", "net/url", "URL", "Host"), + }, + } +) diff --git a/internal/pkg/instrumentation/bpf/net/http/server/probe.go b/internal/pkg/instrumentation/bpf/net/http/server/probe.go index f2b574e11..987f58df4 100644 --- a/internal/pkg/instrumentation/bpf/net/http/server/probe.go +++ b/internal/pkg/instrumentation/bpf/net/http/server/probe.go @@ -31,91 +31,32 @@ const ( pkg = "net/http" ) -// New returns a new [probe.Probe]. -func New(logger *slog.Logger, version string) probe.Probe { +type HTTPServerProbe struct { + *probe.TargetSpanProducingProbe[bpfObjects, event] +} + +func (h *HTTPServerProbe) ApplyConfig(c probe.Config) error { + return nil +} + +// New returns a new [probe.GoLibraryTelemetryProbe]. +func New(logger *slog.Logger, version string, handler func(ptrace.ScopeSpans)) probe.GoLibraryTelemetryProbe { id := probe.ID{ SpanKind: trace.SpanKindServer, InstrumentedPkg: pkg, } - return &probe.SpanProducer[bpfObjects, event]{ - Base: probe.Base[bpfObjects, event]{ - ID: id, - Logger: logger, - Consts: []probe.Const{ - probe.RegistersABIConst{}, - probe.StructFieldConst{ - Key: "method_ptr_pos", - Val: structfield.NewID("std", "net/http", "Request", "Method"), - }, - probe.StructFieldConst{ - Key: "url_ptr_pos", - Val: structfield.NewID("std", "net/http", "Request", "URL"), - }, - probe.StructFieldConst{ - Key: "ctx_ptr_pos", - Val: structfield.NewID("std", "net/http", "Request", "ctx"), - }, - probe.StructFieldConst{ - Key: "path_ptr_pos", - Val: structfield.NewID("std", "net/url", "URL", "Path"), - }, - probe.StructFieldConst{ - Key: "headers_ptr_pos", - Val: structfield.NewID("std", "net/http", "Request", "Header"), - }, - probe.StructFieldConst{ - Key: "req_ptr_pos", - Val: structfield.NewID("std", "net/http", "response", "req"), - }, - probe.StructFieldConst{ - Key: "status_code_pos", - Val: structfield.NewID("std", "net/http", "response", "status"), - }, - probe.StructFieldConst{ - Key: "buckets_ptr_pos", - Val: structfield.NewID("std", "runtime", "hmap", "buckets"), - }, - probe.StructFieldConst{ - Key: "remote_addr_pos", - Val: structfield.NewID("std", "net/http", "Request", "RemoteAddr"), - }, - probe.StructFieldConst{ - Key: "host_pos", - Val: structfield.NewID("std", "net/http", "Request", "Host"), - }, - probe.StructFieldConst{ - Key: "proto_pos", - Val: structfield.NewID("std", "net/http", "Request", "Proto"), - }, - probe.StructFieldConstMinVersion{ - StructField: probe.StructFieldConst{ - Key: "req_pat_pos", - Val: structfield.NewID("std", "net/http", "Request", "pat"), - }, - MinVersion: patternPathMinVersion, - }, - probe.StructFieldConstMinVersion{ - StructField: probe.StructFieldConst{ - Key: "pat_str_pos", - Val: structfield.NewID("std", "net/http", "pattern", "str"), - }, - MinVersion: patternPathMinVersion, - }, - patternPathSupportedConst{}, - }, - Uprobes: []probe.Uprobe{ - { - Sym: "net/http.serverHandler.ServeHTTP", - EntryProbe: "uprobe_serverHandler_ServeHTTP", - ReturnProbe: "uprobe_serverHandler_ServeHTTP_Returns", - }, - }, - SpecFn: loadBpf, - }, - Version: version, - SchemaURL: semconv.SchemaURL, - ProcessFn: processFn, - } + + p := &HTTPServerProbe{TargetSpanProducingProbe: probe.NewTargetSpanProducingProbe[bpfObjects, event]()} + p.ProbeID = id + p.Logger = logger + p.SpecFn = loadBpf + p.Version = version + p.SchemaURL = semconv.SchemaURL + p.ProcessFn = processFn + p.Handler = handler + p.Consts = consts + p.Uprobes = uprobes + return p } type patternPathSupportedConst struct{} @@ -218,3 +159,76 @@ func processFn(e *event) ptrace.SpanSlice { return spans } + +var ( + consts = []probe.Const{ + probe.RegistersABIConst{}, + probe.StructFieldConst{ + Key: "method_ptr_pos", + Val: structfield.NewID("std", "net/http", "Request", "Method"), + }, + probe.StructFieldConst{ + Key: "url_ptr_pos", + Val: structfield.NewID("std", "net/http", "Request", "URL"), + }, + probe.StructFieldConst{ + Key: "ctx_ptr_pos", + Val: structfield.NewID("std", "net/http", "Request", "ctx"), + }, + probe.StructFieldConst{ + Key: "path_ptr_pos", + Val: structfield.NewID("std", "net/url", "URL", "Path"), + }, + probe.StructFieldConst{ + Key: "headers_ptr_pos", + Val: structfield.NewID("std", "net/http", "Request", "Header"), + }, + probe.StructFieldConst{ + Key: "req_ptr_pos", + Val: structfield.NewID("std", "net/http", "response", "req"), + }, + probe.StructFieldConst{ + Key: "status_code_pos", + Val: structfield.NewID("std", "net/http", "response", "status"), + }, + probe.StructFieldConst{ + Key: "buckets_ptr_pos", + Val: structfield.NewID("std", "runtime", "hmap", "buckets"), + }, + probe.StructFieldConst{ + Key: "remote_addr_pos", + Val: structfield.NewID("std", "net/http", "Request", "RemoteAddr"), + }, + probe.StructFieldConst{ + Key: "host_pos", + Val: structfield.NewID("std", "net/http", "Request", "Host"), + }, + probe.StructFieldConst{ + Key: "proto_pos", + Val: structfield.NewID("std", "net/http", "Request", "Proto"), + }, + probe.StructFieldConstMinVersion{ + StructField: probe.StructFieldConst{ + Key: "req_pat_pos", + Val: structfield.NewID("std", "net/http", "Request", "pat"), + }, + MinVersion: patternPathMinVersion, + }, + probe.StructFieldConstMinVersion{ + StructField: probe.StructFieldConst{ + Key: "pat_str_pos", + Val: structfield.NewID("std", "net/http", "pattern", "str"), + }, + MinVersion: patternPathMinVersion, + }, + patternPathSupportedConst{}, + } + + uprobes = []probe.Uprobe{ + { + Sym: "net/http.serverHandler.ServeHTTP", + EntryProbe: "uprobe_serverHandler_ServeHTTP", + ReturnProbe: "uprobe_serverHandler_ServeHTTP_Returns", + }, + } +)