Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Probe API #1307

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 30 additions & 2 deletions instrumentation.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,19 @@ import (
"go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"

dbSql "go.opentelemetry.io/auto/internal/pkg/instrumentation/bpf/database/sql"
kafkaConsumer "go.opentelemetry.io/auto/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/consumer"
kafkaProducer "go.opentelemetry.io/auto/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/producer"
autosdk "go.opentelemetry.io/auto/internal/pkg/instrumentation/bpf/go.opentelemetry.io/auto/sdk"
otelTraceGlobal "go.opentelemetry.io/auto/internal/pkg/instrumentation/bpf/go.opentelemetry.io/otel/traceglobal"
grpcClient "go.opentelemetry.io/auto/internal/pkg/instrumentation/bpf/google.golang.org/grpc/client"
grpcServer "go.opentelemetry.io/auto/internal/pkg/instrumentation/bpf/google.golang.org/grpc/server"
httpClient "go.opentelemetry.io/auto/internal/pkg/instrumentation/bpf/net/http/client"
httpServer "go.opentelemetry.io/auto/internal/pkg/instrumentation/bpf/net/http/server"

"go.opentelemetry.io/auto/internal/pkg/instrumentation"
"go.opentelemetry.io/auto/internal/pkg/instrumentation/probe"
"go.opentelemetry.io/auto/internal/pkg/instrumentation/utils"
"go.opentelemetry.io/auto/internal/pkg/opentelemetry"
"go.opentelemetry.io/auto/internal/pkg/process"
)
Expand Down Expand Up @@ -94,7 +106,23 @@ func NewInstrumentation(ctx context.Context, opts ...InstrumentationOption) (*In
}

cp := convertConfigProvider(c.cp)
mngr, err := instrumentation.NewManager(c.logger, ctrl, c.globalImpl, cp, Version())

// TODO: Probes should be passed by the end developer to NewInstrumentation() when they're public
probes := []probe.BaseProbe{
grpcClient.New(c.logger, Version(), ctrl.Trace),
grpcServer.New(c.logger, Version(), ctrl.Trace),
httpServer.New(c.logger, Version(), ctrl.Trace),
httpClient.New(c.logger, Version(), ctrl.Trace, httpClient.Config{SupportsContextPropagation: utils.SupportsContextPropagation()}),
dbSql.New(c.logger, Version(), ctrl.Trace),
kafkaProducer.New(c.logger, Version(), ctrl.Trace),
kafkaConsumer.New(c.logger, Version(), ctrl.Trace),
autosdk.New(c.logger, ctrl.Trace),
}
if c.globalImpl {
probes = append(probes, otelTraceGlobal.New(c.logger, ctrl.Trace))
}

mngr, err := instrumentation.NewManager(c.logger, ctrl, probes, cp, Version())
if err != nil {
return nil, err
}
Expand All @@ -117,7 +145,7 @@ func NewInstrumentation(ctx context.Context, opts ...InstrumentationOption) (*In
"dependencies", td.Libraries,
"total_functions_found", len(td.Functions),
)
mngr.FilterUnusedProbes(td)
mngr.FilterUnusedProbesForTarget(td)

return &Instrumentation{
target: td,
Expand Down
83 changes: 49 additions & 34 deletions internal/pkg/instrumentation/bpf/database/sql/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,45 +29,34 @@ const (
IncludeDBStatementEnvVar = "OTEL_GO_AUTO_INCLUDE_DB_STATEMENT"
)

// New returns a new [probe.Probe].
func New(logger *slog.Logger, version string) probe.Probe {
type DatabaseSQLProbe struct {
*probe.TargetSpanProducingProbe[bpfObjects, event]
}

func (d *DatabaseSQLProbe) ApplyConfig(c probe.Config) error {
return nil
}

// New returns a new [probe.GoLibraryTelemetryProbe].
func New(logger *slog.Logger, version string, handler func(ptrace.ScopeSpans)) probe.GoLibraryTelemetryProbe {
id := probe.ID{
SpanKind: trace.SpanKindClient,
InstrumentedPkg: pkg,
}
return &probe.SpanProducer[bpfObjects, event]{
Base: probe.Base[bpfObjects, event]{
ID: id,
Logger: logger,
Consts: []probe.Const{
probe.RegistersABIConst{},
probe.AllocationConst{},
probe.KeyValConst{
Key: "should_include_db_statement",
Val: shouldIncludeDBStatement(),
},
},
Uprobes: []probe.Uprobe{
{
Sym: "database/sql.(*DB).queryDC",
EntryProbe: "uprobe_queryDC",
ReturnProbe: "uprobe_queryDC_Returns",
Optional: true,
},
{
Sym: "database/sql.(*DB).execDC",
EntryProbe: "uprobe_execDC",
ReturnProbe: "uprobe_execDC_Returns",
Optional: true,
},
},

SpecFn: loadBpf,
},
Version: version,
SchemaURL: semconv.SchemaURL,
ProcessFn: processFn,

p := &DatabaseSQLProbe{
TargetSpanProducingProbe: probe.NewTargetSpanProducingProbe[bpfObjects, event](),
}
p.ProbeID = id
p.Logger = logger
p.Consts = consts
p.Uprobes = uprobes
p.SpecFn = loadBpf
p.Version = version
p.SchemaURL = semconv.SchemaURL
p.ProcessFn = processFn
p.Handler = handler
return p
}

// event represents an event in an SQL database
Expand Down Expand Up @@ -112,3 +101,29 @@ func shouldIncludeDBStatement() bool {

return false
}

var (
consts = []probe.Const{
probe.RegistersABIConst{},
probe.AllocationConst{},
probe.KeyValConst{
Key: "should_include_db_statement",
Val: shouldIncludeDBStatement(),
},
}

uprobes = []probe.Uprobe{
{
Sym: "database/sql.(*DB).queryDC",
EntryProbe: "uprobe_queryDC",
ReturnProbe: "uprobe_queryDC_Returns",
Optional: true,
},
{
Sym: "database/sql.(*DB).execDC",
EntryProbe: "uprobe_execDC",
ReturnProbe: "uprobe_execDC_Returns",
Optional: true,
},
}
)
Original file line number Diff line number Diff line change
Expand Up @@ -27,61 +27,34 @@ const (
pkg = "github.com/segmentio/kafka-go"
)

// New returns a new [probe.Probe].
func New(logger *slog.Logger, version string) probe.Probe {
type KafkaConsumerProbe struct {
*probe.TargetSpanProducingProbe[bpfObjects, event]
}

func (k *KafkaConsumerProbe) ApplyConfig(c probe.Config) error {
return nil
}

// New returns a new [probe.GoLibraryTelemetryProbe].
func New(logger *slog.Logger, version string, handler func(ptrace.ScopeSpans)) probe.GoLibraryTelemetryProbe {
id := probe.ID{
SpanKind: trace.SpanKindConsumer,
InstrumentedPkg: pkg,
}
return &probe.SpanProducer[bpfObjects, event]{
Base: probe.Base[bpfObjects, event]{
ID: id,
Logger: logger,
Consts: []probe.Const{
probe.RegistersABIConst{},
probe.AllocationConst{},
probe.StructFieldConst{
Key: "message_headers_pos",
Val: structfield.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Message", "Headers"),
},
probe.StructFieldConst{
Key: "message_key_pos",
Val: structfield.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Message", "Key"),
},
probe.StructFieldConst{
Key: "message_topic_pos",
Val: structfield.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Message", "Topic"),
},
probe.StructFieldConst{
Key: "message_partition_pos",
Val: structfield.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Message", "Partition"),
},
probe.StructFieldConst{
Key: "message_offset_pos",
Val: structfield.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Message", "Offset"),
},
probe.StructFieldConst{
Key: "reader_config_pos",
Val: structfield.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Reader", "config"),
},
probe.StructFieldConst{
Key: "reader_config_group_id_pos",
Val: structfield.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "ReaderConfig", "GroupID"),
},
},
Uprobes: []probe.Uprobe{
{
Sym: "github.com/segmentio/kafka-go.(*Reader).FetchMessage",
EntryProbe: "uprobe_FetchMessage",
ReturnProbe: "uprobe_FetchMessage_Returns",
},
},
SpecFn: loadBpf,
},
Version: version,
SchemaURL: semconv.SchemaURL,
ProcessFn: processFn,

p := &KafkaConsumerProbe{
TargetSpanProducingProbe: probe.NewTargetSpanProducingProbe[bpfObjects, event](),
}
p.ProbeID = id
p.Logger = logger
p.Uprobes = uprobes
p.Consts = consts
p.SpecFn = loadBpf
p.Version = version
p.SchemaURL = semconv.SchemaURL
p.ProcessFn = processFn
p.Handler = handler
return p
}

// event represents a kafka message received by the consumer.
Expand Down Expand Up @@ -129,3 +102,46 @@ func processFn(e *event) ptrace.SpanSlice {
func kafkaConsumerSpanName(topic string) string {
return fmt.Sprintf("%s receive", topic)
}

var (
consts = []probe.Const{
probe.RegistersABIConst{},
probe.AllocationConst{},
probe.StructFieldConst{
Key: "message_headers_pos",
Val: structfield.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Message", "Headers"),
},
probe.StructFieldConst{
Key: "message_key_pos",
Val: structfield.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Message", "Key"),
},
probe.StructFieldConst{
Key: "message_topic_pos",
Val: structfield.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Message", "Topic"),
},
probe.StructFieldConst{
Key: "message_partition_pos",
Val: structfield.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Message", "Partition"),
},
probe.StructFieldConst{
Key: "message_offset_pos",
Val: structfield.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Message", "Offset"),
},
probe.StructFieldConst{
Key: "reader_config_pos",
Val: structfield.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Reader", "config"),
},
probe.StructFieldConst{
Key: "reader_config_group_id_pos",
Val: structfield.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "ReaderConfig", "GroupID"),
},
}

uprobes = []probe.Uprobe{
{
Sym: "github.com/segmentio/kafka-go.(*Reader).FetchMessage",
EntryProbe: "uprobe_FetchMessage",
ReturnProbe: "uprobe_FetchMessage_Returns",
},
}
)
Original file line number Diff line number Diff line change
Expand Up @@ -27,49 +27,33 @@ const (
pkg = "github.com/segmentio/kafka-go"
)

// New returns a new [probe.Probe].
func New(logger *slog.Logger, version string) probe.Probe {
type KafkaProducerProbe struct {
*probe.TargetSpanProducingProbe[bpfObjects, event]
}

func (k *KafkaProducerProbe) ApplyConfig(c probe.Config) error {
return nil
}

// New returns a new [probe.GoLibraryTelemetryProbe].
func New(logger *slog.Logger, version string, handler func(ptrace.ScopeSpans)) probe.GoLibraryTelemetryProbe {
id := probe.ID{
SpanKind: trace.SpanKindProducer,
InstrumentedPkg: pkg,
}
return &probe.SpanProducer[bpfObjects, event]{
Base: probe.Base[bpfObjects, event]{
ID: id,
Logger: logger,
Consts: []probe.Const{
probe.RegistersABIConst{},
probe.AllocationConst{},
probe.StructFieldConst{
Key: "writer_topic_pos",
Val: structfield.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Writer", "Topic"),
},
probe.StructFieldConst{
Key: "message_headers_pos",
Val: structfield.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Message", "Headers"),
},
probe.StructFieldConst{
Key: "message_key_pos",
Val: structfield.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Message", "Key"),
},
probe.StructFieldConst{
Key: "message_time_pos",
Val: structfield.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Message", "Time"),
},
},
Uprobes: []probe.Uprobe{
{
Sym: "github.com/segmentio/kafka-go.(*Writer).WriteMessages",
EntryProbe: "uprobe_WriteMessages",
ReturnProbe: "uprobe_WriteMessages_Returns",
},
},
SpecFn: loadBpf,
},
Version: version,
SchemaURL: semconv.SchemaURL,
ProcessFn: processFn,

p := &KafkaProducerProbe{
TargetSpanProducingProbe: probe.NewTargetSpanProducingProbe[bpfObjects, event](),
}
p.ProbeID = id
p.Logger = logger
p.Consts = consts
p.Uprobes = uprobes
p.Version = version
p.SchemaURL = semconv.SchemaURL
p.ProcessFn = processFn
p.Handler = handler
return p
}

type messageAttributes struct {
Expand Down Expand Up @@ -147,3 +131,34 @@ func processFn(e *event) ptrace.SpanSlice {
func kafkaProducerSpanName(topic string) string {
return fmt.Sprintf("%s publish", topic)
}

var (
consts = []probe.Const{
probe.RegistersABIConst{},
probe.AllocationConst{},
probe.StructFieldConst{
Key: "writer_topic_pos",
Val: structfield.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Writer", "Topic"),
},
probe.StructFieldConst{
Key: "message_headers_pos",
Val: structfield.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Message", "Headers"),
},
probe.StructFieldConst{
Key: "message_key_pos",
Val: structfield.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Message", "Key"),
},
probe.StructFieldConst{
Key: "message_time_pos",
Val: structfield.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Message", "Time"),
},
}

uprobes = []probe.Uprobe{
{
Sym: "github.com/segmentio/kafka-go.(*Writer).WriteMessages",
EntryProbe: "uprobe_WriteMessages",
ReturnProbe: "uprobe_WriteMessages_Returns",
},
}
)
Loading
Loading