Skip to content

Commit

Permalink
Unload the newSpan probe after first call
Browse files Browse the repository at this point in the history
  • Loading branch information
MrAlias committed Dec 20, 2024
1 parent 957d831 commit b25ff44
Show file tree
Hide file tree
Showing 13 changed files with 138 additions and 41 deletions.
2 changes: 1 addition & 1 deletion internal/pkg/instrumentation/bpf/database/sql/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func New(logger *slog.Logger, version string) probe.Probe {
Val: shouldIncludeDBStatement(),
},
},
Uprobes: []probe.Uprobe{
Uprobes: []*probe.Uprobe{
{
Sym: "database/sql.(*DB).queryDC",
EntryProbe: "uprobe_queryDC",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func New(logger *slog.Logger, version string) probe.Probe {
Val: structfield.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "ReaderConfig", "GroupID"),
},
},
Uprobes: []probe.Uprobe{
Uprobes: []*probe.Uprobe{
{
Sym: "github.com/segmentio/kafka-go.(*Reader).FetchMessage",
EntryProbe: "uprobe_FetchMessage",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func New(logger *slog.Logger, version string) probe.Probe {
Val: structfield.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Message", "Time"),
},
},
Uprobes: []probe.Uprobe{
Uprobes: []*probe.Uprobe{
{
Sym: "github.com/segmentio/kafka-go.(*Writer).WriteMessages",
EntryProbe: "uprobe_WriteMessages",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func New(logger *slog.Logger) probe.Probe {
),
},
},
Uprobes: []probe.Uprobe{
Uprobes: []*probe.Uprobe{
{
Sym: "go.opentelemetry.io/auto/sdk.(*tracer).start",
EntryProbe: "uprobe_Tracer_start",
Expand All @@ -86,25 +86,25 @@ type converter struct {
logger *slog.Logger
}

func (c *converter) decodeEvent(record perf.Record) (event, error) {
func (c *converter) decodeEvent(record perf.Record) (*event, error) {
reader := bytes.NewReader(record.RawSample)

var e event
err := binary.Read(reader, binary.LittleEndian, &e.Size)
if err != nil {
c.logger.Error("failed to decode size", "error", err)
return event{}, err
return nil, err
}
c.logger.Debug("decoded size", "size", e.Size)

e.SpanData = make([]byte, e.Size)
_, err = reader.Read(e.SpanData)
if err != nil {
c.logger.Error("failed to read span data", "error", err)
return event{}, err
return nil, err
}
c.logger.Debug("decoded span data", "size", e.Size)
return e, nil
return &e, nil
}

func (c *converter) processFn(e *event) ptrace.ScopeSpans {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,12 @@ typedef struct tracer_id {
char schema_url[MAX_TRACER_SCHEMA_URL_LEN];
} tracer_id_t;

struct control_t {
u32 kind; // Required to be 1.
};

struct otel_span_t {
u32 kind; // Required to be 0.
BASE_SPAN_PROPERTIES
struct span_name_t span_name;
otel_status_t status;
Expand Down Expand Up @@ -415,7 +420,10 @@ int uprobe_newStart(struct pt_regs *ctx) {
}

wrote_flag = true;
return 0;

// Signal this uprobe should be unloaded.
struct control_t ctrl = {1};
return bpf_perf_event_output(ctx, &events, BPF_F_CURRENT_CPU, (void *)(&ctrl), sizeof(struct control_t));
}

// This instrumentation attaches uprobe to the following function:
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
package global

import (
"bytes"
"encoding/binary"
"fmt"
"log/slog"
"math"
"sync"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
Expand All @@ -18,6 +20,7 @@ import (
"go.opentelemetry.io/auto/internal/pkg/process"
"go.opentelemetry.io/auto/internal/pkg/structfield"

"github.com/cilium/ebpf/perf"
"github.com/hashicorp/go-version"
"golang.org/x/sys/unix"

Expand All @@ -41,6 +44,17 @@ func New(logger *slog.Logger) probe.Probe {
SpanKind: trace.SpanKindClient,
InstrumentedPkg: pkg,
}

uprobeNewStart := &probe.Uprobe{
Sym: "go.opentelemetry.io/otel/internal/global.(*tracer).newSpan",
EntryProbe: "uprobe_newStart",
FailureMode: probe.FailureModeWarn,
}

c := &converter{
logger: logger,
uprobeNewStart: uprobeNewStart,
}
return &probe.TraceProducer[bpfObjects, event]{
Base: probe.Base[bpfObjects, event]{
ID: id,
Expand Down Expand Up @@ -107,12 +121,8 @@ func New(logger *slog.Logger) probe.Probe {
tracerIDContainsSchemaURL{},
tracerIDContainsScopeAttributes{},
},
Uprobes: []probe.Uprobe{
{
Sym: "go.opentelemetry.io/otel/internal/global.(*tracer).newSpan",
EntryProbe: "uprobe_newStart",
FailureMode: probe.FailureModeWarn,
},
Uprobes: []*probe.Uprobe{
uprobeNewStart,
{
Sym: "go.opentelemetry.io/otel/internal/global.(*tracer).Start",
EntryProbe: "uprobe_Start",
Expand All @@ -138,12 +148,55 @@ func New(logger *slog.Logger) probe.Probe {
FailureMode: probe.FailureModeIgnore,
},
},
SpecFn: loadBpf,
SpecFn: loadBpf,
ProcessRecord: c.decodeEvent,
},
ProcessFn: processFn,
}
}

type recordKind uint32

const (
recordKindTelemetry recordKind = iota
recordKindConrol
)

type converter struct {
logger *slog.Logger

uprobeNewStart *probe.Uprobe
uprobeNewStartMu sync.Mutex
}

func (c *converter) decodeEvent(record perf.Record) (*event, error) {
reader := bytes.NewReader(record.RawSample)

var kind recordKind
err := binary.Read(reader, binary.LittleEndian, &kind)
if err != nil {
return nil, err
}

var e *event
switch kind {
case recordKindTelemetry:
e := new(event)
reader.Reset(record.RawSample)
err = binary.Read(reader, binary.LittleEndian, e)
case recordKindConrol:
c.uprobeNewStartMu.Lock()
if c.uprobeNewStart != nil {
err = c.uprobeNewStart.Close()
c.uprobeNewStart = nil
}
c.uprobeNewStartMu.Unlock()
default:
err = fmt.Errorf("unknown record kind: %d", kind)
}
return e, err
}

// tracerIDContainsSchemaURL is a Probe Const defining whether the tracer key contains schemaURL.
type tracerIDContainsSchemaURL struct{}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func New(logger *slog.Logger, version string) probe.Probe {
MinVersion: writeStatusMinVersion,
},
},
Uprobes: []probe.Uprobe{
Uprobes: []*probe.Uprobe{
{
Sym: "google.golang.org/grpc.(*ClientConn).Invoke",
EntryProbe: "uprobe_ClientConn_Invoke",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func New(logger *slog.Logger, ver string) probe.Probe {
},
framePosConst{},
},
Uprobes: []probe.Uprobe{
Uprobes: []*probe.Uprobe{
{
Sym: "google.golang.org/grpc.(*Server).handleStream",
EntryProbe: "uprobe_server_handleStream",
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/instrumentation/bpf/net/http/client/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func New(logger *slog.Logger, version string) probe.Probe {
InstrumentedPkg: pkg,
}

uprobes := []probe.Uprobe{
uprobes := []*probe.Uprobe{
{
Sym: "net/http.(*Transport).roundTrip",
EntryProbe: "uprobe_Transport_roundTrip",
Expand All @@ -52,7 +52,7 @@ func New(logger *slog.Logger, version string) probe.Probe {
// probe which writes the data in the outgoing buffer.
if utils.SupportsContextPropagation() {
uprobes = append(uprobes,
probe.Uprobe{
&probe.Uprobe{
Sym: "net/http.Header.writeSubset",
EntryProbe: "uprobe_writeSubset",
// We mark this probe as dependent on roundTrip, so we don't accidentally
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/instrumentation/bpf/net/http/server/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func New(logger *slog.Logger, version string) probe.Probe {
},
patternPathSupportedConst{},
},
Uprobes: []probe.Uprobe{
Uprobes: []*probe.Uprobe{
{
Sym: "net/http.serverHandler.ServeHTTP",
EntryProbe: "uprobe_serverHandler_ServeHTTP",
Expand Down
Loading

0 comments on commit b25ff44

Please sign in to comment.