Skip to content

Commit

Permalink
Allow passing a custom reporter to the controller (#207)
Browse files Browse the repository at this point in the history
  • Loading branch information
dmathieu authored Oct 31, 2024
1 parent c2e8af7 commit a720d06
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 86 deletions.
3 changes: 3 additions & 0 deletions internal/controller/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

log "github.com/sirupsen/logrus"
"go.opentelemetry.io/ebpf-profiler/reporter"
"go.opentelemetry.io/ebpf-profiler/tracer"
)

Expand All @@ -30,6 +31,8 @@ type Config struct {
VerboseMode bool
Version bool

Reporter reporter.Reporter

Fs *flag.FlagSet
}

Expand Down
42 changes: 13 additions & 29 deletions internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,16 @@ type Controller struct {
// The controller can set global configurations (such as the eBPF syscalls) on
// setup. So there should only ever be one running.
func New(cfg *Config) *Controller {
return &Controller{
config: cfg,
c := &Controller{
config: cfg,
reporter: cfg.Reporter,
}

return c
}

// Start starts the controller
// The controller should only be started once.
func (c *Controller) Start(ctx context.Context) error {
if err := tracer.ProbeBPFSyscall(); err != nil {
return fmt.Errorf("failed to probe eBPF syscall: %w", err)
Expand Down Expand Up @@ -86,40 +90,19 @@ func (c *Controller) Start(ctx context.Context) error {
metadataCollector.AddCustomData("host.name", hostname)
metadataCollector.AddCustomData("host.ip", sourceIP)

// Network operations to CA start here
var rep reporter.Reporter
// Connect to the collection agent
rep, err = reporter.Start(ctx, &reporter.Config{
CollAgentAddr: c.config.CollAgentAddr,
DisableTLS: c.config.DisableTLS,
MaxRPCMsgSize: 32 * MiB,
MaxGRPCRetries: 5,
GRPCOperationTimeout: intervals.GRPCOperationTimeout(),
GRPCStartupBackoffTime: intervals.GRPCStartupBackoffTime(),
GRPCConnectionTimeout: intervals.GRPCConnectionTimeout(),
ReportInterval: intervals.ReportInterval(),
ExecutablesCacheElements: 4096,
// Next step: Calculate FramesCacheElements from numCores and samplingRate.
FramesCacheElements: 65536,
CGroupCacheElements: 1024,
SamplesPerSecond: c.config.SamplesPerSecond,
KernelVersion: kernelVersion,
HostName: hostname,
IPAddress: sourceIP,
})
err = c.reporter.Start(ctx)
if err != nil {
return fmt.Errorf("failed to start reporting: %w", err)
return fmt.Errorf("failed to start reporter: %w", err)
}
c.reporter = rep

metrics.SetReporter(rep)
metrics.SetReporter(c.reporter)

// Now that set the initial host metadata, start a goroutine to keep sending updates regularly.
metadataCollector.StartMetadataCollection(ctx, rep)
metadataCollector.StartMetadataCollection(ctx, c.reporter)

// Load the eBPF code and map definitions
trc, err := tracer.NewTracer(ctx, &tracer.Config{
Reporter: rep,
Reporter: c.reporter,
Intervals: intervals,
IncludeTracers: includeTracers,
FilterErrorFrames: !c.config.SendErrorFrames,
Expand Down Expand Up @@ -167,7 +150,8 @@ func (c *Controller) Start(ctx context.Context) error {
// change this log line update also the system test.
log.Printf("Attached sched monitor")

if err := startTraceHandling(ctx, rep, intervals, trc, traceHandlerCacheSize); err != nil {
if err := startTraceHandling(ctx, c.reporter, intervals, trc,
traceHandlerCacheSize); err != nil {
return fmt.Errorf("failed to start trace handling: %w", err)
}

Expand Down
43 changes: 43 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ import (
"golang.org/x/sys/unix"

"go.opentelemetry.io/ebpf-profiler/internal/controller"
"go.opentelemetry.io/ebpf-profiler/internal/helpers"
"go.opentelemetry.io/ebpf-profiler/reporter"
"go.opentelemetry.io/ebpf-profiler/times"
"go.opentelemetry.io/ebpf-profiler/vc"

log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -96,6 +99,46 @@ func mainWithExitCode() exitCode {
}()
}

intervals := times.New(cfg.MonitorInterval,
cfg.ReporterInterval, cfg.ProbabilisticInterval)

kernelVersion, err := helpers.GetKernelVersion()
if err != nil {
log.Error(err)
return exitFailure
}

// hostname and sourceIP will be populated from the root namespace.
hostname, sourceIP, err := helpers.GetHostnameAndSourceIP(cfg.CollAgentAddr)
if err != nil {
log.Error(err)
return exitFailure
}

rep, err := reporter.NewOTLP(&reporter.Config{
CollAgentAddr: cfg.CollAgentAddr,
DisableTLS: cfg.DisableTLS,
MaxRPCMsgSize: 32 << 20, // 32 MiB
MaxGRPCRetries: 5,
GRPCOperationTimeout: intervals.GRPCOperationTimeout(),
GRPCStartupBackoffTime: intervals.GRPCStartupBackoffTime(),
GRPCConnectionTimeout: intervals.GRPCConnectionTimeout(),
ReportInterval: intervals.ReportInterval(),
ExecutablesCacheElements: 4096,
// Next step: Calculate FramesCacheElements from numCores and samplingRate.
FramesCacheElements: 65536,
CGroupCacheElements: 1024,
SamplesPerSecond: cfg.SamplesPerSecond,
KernelVersion: kernelVersion,
HostName: hostname,
IPAddress: sourceIP,
})
if err != nil {
log.Error(err)
return exitFailure
}
cfg.Reporter = rep

log.Infof("Starting OTEL profiling agent %s (revision %s, build timestamp %s)",
vc.Version(), vc.Revision(), vc.BuildTimestamp())

Expand Down
7 changes: 7 additions & 0 deletions reporter/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ type Reporter interface {
HostMetadataReporter
MetricsReporter

// Start starts the reporter in the background.
//
// If the reporter needs to perform a long-running starting operation then it
// is recommended that Start() returns quickly and the long-running operation
// is performed in the background.
Start(context.Context) error

// Stop triggers a graceful shutdown of the reporter.
Stop()
// GetMetrics returns the reporter internal metrics.
Expand Down
119 changes: 62 additions & 57 deletions reporter/otlp_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ type attrKeyValue struct {

// OTLPReporter receives and transforms information to be OTLP/profiles compliant.
type OTLPReporter struct {
config *Config
// name is the ScopeProfile's name.
name string

Expand Down Expand Up @@ -144,6 +145,60 @@ type OTLPReporter struct {
ipAddress string
}

// NewOTLP returns a new instance of OTLPReporter
func NewOTLP(cfg *Config) (*OTLPReporter, error) {
executables, err :=
lru.NewSynced[libpf.FileID, execInfo](cfg.ExecutablesCacheElements, libpf.FileID.Hash32)
if err != nil {
return nil, err
}
executables.SetLifetime(1 * time.Hour) // Allow GC to clean stale items.

frames, err := lru.NewSynced[libpf.FileID,
*xsync.RWMutex[map[libpf.AddressOrLineno]sourceInfo]](
cfg.FramesCacheElements, libpf.FileID.Hash32)
if err != nil {
return nil, err
}
frames.SetLifetime(1 * time.Hour) // Allow GC to clean stale items.

cgroupv2ID, err := lru.NewSynced[libpf.PID, string](cfg.CGroupCacheElements,
func(pid libpf.PID) uint32 { return uint32(pid) })
if err != nil {
return nil, err
}
// Set a lifetime to reduce risk of invalid data in case of PID reuse.
cgroupv2ID.SetLifetime(90 * time.Second)

// Next step: Dynamically configure the size of this LRU.
// Currently, we use the length of the JSON array in
// hostmetadata/hostmetadata.json.
hostmetadata, err := lru.NewSynced[string, string](115, hashString)
if err != nil {
return nil, err
}

return &OTLPReporter{
config: cfg,
name: cfg.Name,
version: cfg.Version,
kernelVersion: cfg.KernelVersion,
hostName: cfg.HostName,
ipAddress: cfg.IPAddress,
samplesPerSecond: cfg.SamplesPerSecond,
hostID: strconv.FormatUint(cfg.HostID, 10),
stopSignal: make(chan libpf.Void),
pkgGRPCOperationTimeout: cfg.GRPCOperationTimeout,
client: nil,
rpcStats: NewStatsHandler(),
executables: executables,
frames: frames,
hostmetadata: hostmetadata,
traceEvents: xsync.NewRWMutex(map[traceAndMetaKey]*traceEvents{}),
cgroupv2ID: cgroupv2ID,
}, nil
}

// hashString is a helper function for LRUs that use string as a key.
// Xxh3 turned out to be the fastest hash function for strings in the FreeLRU benchmarks.
// It was only outperformed by the AES hash function, which is implemented in Plan9 assembly.
Expand Down Expand Up @@ -303,73 +358,23 @@ func (r *OTLPReporter) GetMetrics() Metrics {
}

// Start sets up and manages the reporting connection to a OTLP backend.
func Start(mainCtx context.Context, cfg *Config) (Reporter, error) {
executables, err :=
lru.NewSynced[libpf.FileID, execInfo](cfg.ExecutablesCacheElements, libpf.FileID.Hash32)
if err != nil {
return nil, err
}
executables.SetLifetime(1 * time.Hour) // Allow GC to clean stale items.

frames, err := lru.NewSynced[libpf.FileID,
*xsync.RWMutex[map[libpf.AddressOrLineno]sourceInfo]](
cfg.FramesCacheElements, libpf.FileID.Hash32)
if err != nil {
return nil, err
}
frames.SetLifetime(1 * time.Hour) // Allow GC to clean stale items.

cgroupv2ID, err := lru.NewSynced[libpf.PID, string](cfg.CGroupCacheElements,
func(pid libpf.PID) uint32 { return uint32(pid) })
if err != nil {
return nil, err
}
// Set a lifetime to reduce risk of invalid data in case of PID reuse.
cgroupv2ID.SetLifetime(90 * time.Second)

// Next step: Dynamically configure the size of this LRU.
// Currently, we use the length of the JSON array in
// hostmetadata/hostmetadata.json.
hostmetadata, err := lru.NewSynced[string, string](115, hashString)
if err != nil {
return nil, err
}

r := &OTLPReporter{
name: cfg.Name,
version: cfg.Version,
kernelVersion: cfg.KernelVersion,
hostName: cfg.HostName,
ipAddress: cfg.IPAddress,
samplesPerSecond: cfg.SamplesPerSecond,
hostID: strconv.FormatUint(cfg.HostID, 10),
stopSignal: make(chan libpf.Void),
pkgGRPCOperationTimeout: cfg.GRPCOperationTimeout,
client: nil,
rpcStats: NewStatsHandler(),
executables: executables,
frames: frames,
hostmetadata: hostmetadata,
traceEvents: xsync.NewRWMutex(map[traceAndMetaKey]*traceEvents{}),
cgroupv2ID: cgroupv2ID,
}

func (r *OTLPReporter) Start(ctx context.Context) error {
// Create a child context for reporting features
ctx, cancelReporting := context.WithCancel(mainCtx)
ctx, cancelReporting := context.WithCancel(ctx)

// Establish the gRPC connection before going on, waiting for a response
// from the collectionAgent endpoint.
// Use grpc.WithBlock() in setupGrpcConnection() for this to work.
otlpGrpcConn, err := waitGrpcEndpoint(ctx, cfg, r.rpcStats)
otlpGrpcConn, err := waitGrpcEndpoint(ctx, r.config, r.rpcStats)
if err != nil {
cancelReporting()
close(r.stopSignal)
return nil, err
return err
}
r.client = otlpcollector.NewProfilesServiceClient(otlpGrpcConn)

go func() {
tick := time.NewTicker(cfg.ReportInterval)
tick := time.NewTicker(r.config.ReportInterval)
defer tick.Stop()
purgeTick := time.NewTicker(5 * time.Minute)
defer purgeTick.Stop()
Expand All @@ -383,7 +388,7 @@ func Start(mainCtx context.Context, cfg *Config) (Reporter, error) {
if err := r.reportOTLPProfile(ctx); err != nil {
log.Errorf("Request failed: %v", err)
}
tick.Reset(libpf.AddJitter(cfg.ReportInterval, 0.2))
tick.Reset(libpf.AddJitter(r.config.ReportInterval, 0.2))
case <-purgeTick.C:
// Allow the GC to purge expired entries to avoid memory leaks.
r.executables.PurgeExpired()
Expand All @@ -404,7 +409,7 @@ func Start(mainCtx context.Context, cfg *Config) (Reporter, error) {
}
}()

return r, nil
return nil
}

// reportOTLPProfile creates and sends out an OTLP profile.
Expand Down

0 comments on commit a720d06

Please sign in to comment.