Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow passing a custom reporter to the controller #207

Merged
merged 9 commits into from
Oct 31, 2024
38 changes: 38 additions & 0 deletions internal/controller/cache_size.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package controller // import "go.opentelemetry.io/ebpf-profiler/internal/controller"

import (
"fmt"
"time"

"github.com/tklauser/numcpus"
"go.opentelemetry.io/ebpf-profiler/util"
)

// TraceCacheSize defines the maximum number of elements for the caches in tracehandler.
// The caches in tracehandler have a size-"processing overhead" trade-off: Every cache miss will
// trigger additional processing for that trace in userspace (Go). For most maps, we use
// maxElementsPerInterval as a base sizing factor. For the tracehandler caches, we also multiply
// with traceCacheIntervals. For typical/small values of maxElementsPerInterval, this can lead to
// non-optimal map sizing (reduced cache_hit:cache_miss ratio and increased processing overhead).
// Simply increasing traceCacheIntervals is problematic when maxElementsPerInterval is large
// (e.g. too many CPU cores present) as we end up using too much memory. A minimum size is
// therefore used here.
func TraceCacheSize(monitorInterval time.Duration, samplesPerSecond int) (uint32, error) {
const (
traceCacheIntervals = 6
traceCacheMinSize = 65536
)

presentCores, err := numcpus.GetPresent()
if err != nil {
return 0, fmt.Errorf("failed to read CPU file: %w", err)
}

maxElements := maxElementsPerInterval(monitorInterval, samplesPerSecond, uint16(presentCores))

size := maxElements * uint32(traceCacheIntervals)
if size < traceCacheMinSize {
size = traceCacheMinSize
}
return util.NextPowerOfTwo(size), nil
}
3 changes: 3 additions & 0 deletions internal/controller/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

log "github.com/sirupsen/logrus"
"go.opentelemetry.io/ebpf-profiler/reporter"
"go.opentelemetry.io/ebpf-profiler/tracer"
)

Expand All @@ -30,6 +31,8 @@ type Config struct {
VerboseMode bool
Version bool

Reporter reporter.Reporter

Fs *flag.FlagSet
}

Expand Down
75 changes: 16 additions & 59 deletions internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"time"

log "github.com/sirupsen/logrus"
"github.com/tklauser/numcpus"

"go.opentelemetry.io/ebpf-profiler/host"
"go.opentelemetry.io/ebpf-profiler/hostmetadata"
Expand All @@ -17,7 +16,6 @@ import (
"go.opentelemetry.io/ebpf-profiler/tracehandler"
"go.opentelemetry.io/ebpf-profiler/tracer"
tracertypes "go.opentelemetry.io/ebpf-profiler/tracer/types"
"go.opentelemetry.io/ebpf-profiler/util"
)

// Controller is an instance that runs, manages and stops the agent.
Expand All @@ -31,12 +29,16 @@ type Controller struct {
// The controller can set global configurations (such as the eBPF syscalls) on
// setup. So there should only ever be one running.
func New(cfg *Config) *Controller {
return &Controller{
config: cfg,
c := &Controller{
config: cfg,
reporter: cfg.Reporter,
}

return c
}

// Start starts the controller
// The controller should only be started once.
func (c *Controller) Start(ctx context.Context) error {
if err := tracer.ProbeBPFSyscall(); err != nil {
return fmt.Errorf("failed to probe eBPF syscall: %w", err)
Expand All @@ -46,14 +48,12 @@ func (c *Controller) Start(ctx context.Context) error {
return fmt.Errorf("failed to probe tracepoint: %w", err)
}

presentCores, err := numcpus.GetPresent()
traceHandlerCacheSize, err :=
dmathieu marked this conversation as resolved.
Show resolved Hide resolved
TraceCacheSize(c.config.MonitorInterval, c.config.SamplesPerSecond)
if err != nil {
return fmt.Errorf("failed to read CPU file: %w", err)
return fmt.Errorf("retrieve trace cache size: %w", err)
}

traceHandlerCacheSize :=
traceCacheSize(c.config.MonitorInterval, c.config.SamplesPerSecond, uint16(presentCores))

intervals := times.New(c.config.MonitorInterval,
c.config.ReporterInterval, c.config.ProbabilisticInterval)

Expand Down Expand Up @@ -84,37 +84,19 @@ func (c *Controller) Start(ctx context.Context) error {
metadataCollector.AddCustomData("host.name", hostname)
metadataCollector.AddCustomData("host.ip", sourceIP)

// Network operations to CA start here
var rep reporter.Reporter
// Connect to the collection agent
rep, err = reporter.Start(ctx, &reporter.Config{
CollAgentAddr: c.config.CollAgentAddr,
DisableTLS: c.config.DisableTLS,
MaxRPCMsgSize: 32 << 20, // 32 MiB
MaxGRPCRetries: 5,
GRPCOperationTimeout: intervals.GRPCOperationTimeout(),
GRPCStartupBackoffTime: intervals.GRPCStartupBackoffTime(),
GRPCConnectionTimeout: intervals.GRPCConnectionTimeout(),
ReportInterval: intervals.ReportInterval(),
CacheSize: traceHandlerCacheSize,
SamplesPerSecond: c.config.SamplesPerSecond,
KernelVersion: kernelVersion,
HostName: hostname,
IPAddress: sourceIP,
})
err = c.reporter.Start(ctx)
if err != nil {
return fmt.Errorf("failed to start reporting: %w", err)
return fmt.Errorf("failed to start reporter: %w", err)
}
c.reporter = rep

metrics.SetReporter(rep)
metrics.SetReporter(c.reporter)

// Now that set the initial host metadata, start a goroutine to keep sending updates regularly.
metadataCollector.StartMetadataCollection(ctx, rep)
metadataCollector.StartMetadataCollection(ctx, c.reporter)

// Load the eBPF code and map definitions
trc, err := tracer.NewTracer(ctx, &tracer.Config{
Reporter: rep,
Reporter: c.reporter,
Intervals: intervals,
IncludeTracers: includeTracers,
FilterErrorFrames: !c.config.SendErrorFrames,
Expand Down Expand Up @@ -162,7 +144,8 @@ func (c *Controller) Start(ctx context.Context) error {
// change this log line update also the system test.
log.Printf("Attached sched monitor")

if err := startTraceHandling(ctx, rep, intervals, trc, traceHandlerCacheSize); err != nil {
if err := startTraceHandling(ctx, c.reporter, intervals, trc,
traceHandlerCacheSize); err != nil {
return fmt.Errorf("failed to start trace handling: %w", err)
}

Expand Down Expand Up @@ -195,32 +178,6 @@ func startTraceHandling(ctx context.Context, rep reporter.TraceReporter,
return err
}

// traceCacheSize defines the maximum number of elements for the caches in tracehandler.
//
// The caches in tracehandler have a size-"processing overhead" trade-off: Every cache miss will
// trigger additional processing for that trace in userspace (Go). For most maps, we use
// maxElementsPerInterval as a base sizing factor. For the tracehandler caches, we also multiply
// with traceCacheIntervals. For typical/small values of maxElementsPerInterval, this can lead to
// non-optimal map sizing (reduced cache_hit:cache_miss ratio and increased processing overhead).
// Simply increasing traceCacheIntervals is problematic when maxElementsPerInterval is large
// (e.g. too many CPU cores present) as we end up using too much memory. A minimum size is
// therefore used here.
func traceCacheSize(monitorInterval time.Duration, samplesPerSecond int,
presentCPUCores uint16) uint32 {
const (
traceCacheIntervals = 6
traceCacheMinSize = 65536
)

maxElements := maxElementsPerInterval(monitorInterval, samplesPerSecond, presentCPUCores)

size := maxElements * uint32(traceCacheIntervals)
if size < traceCacheMinSize {
size = traceCacheMinSize
}
return util.NextPowerOfTwo(size)
}

func maxElementsPerInterval(monitorInterval time.Duration, samplesPerSecond int,
presentCPUCores uint16) uint32 {
return uint32(uint16(samplesPerSecond) * uint16(monitorInterval.Seconds()) * presentCPUCores)
Expand Down
46 changes: 46 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ import (
"golang.org/x/sys/unix"

"go.opentelemetry.io/ebpf-profiler/internal/controller"
"go.opentelemetry.io/ebpf-profiler/internal/helpers"
"go.opentelemetry.io/ebpf-profiler/reporter"
"go.opentelemetry.io/ebpf-profiler/times"
"go.opentelemetry.io/ebpf-profiler/vc"

log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -96,6 +99,49 @@ func mainWithExitCode() exitCode {
}()
}

intervals := times.New(cfg.MonitorInterval,
cfg.ReporterInterval, cfg.ProbabilisticInterval)

tcs, err := controller.TraceCacheSize(cfg.MonitorInterval, cfg.SamplesPerSecond)
if err != nil {
log.Error(err)
return exitFailure
}

kernelVersion, err := helpers.GetKernelVersion()
if err != nil {
log.Error(err)
return exitFailure
}

// hostname and sourceIP will be populated from the root namespace.
hostname, sourceIP, err := helpers.GetHostnameAndSourceIP(cfg.CollAgentAddr)
if err != nil {
log.Error(err)
return exitFailure
}

rep, err := reporter.NewOTLP(&reporter.Config{
CollAgentAddr: cfg.CollAgentAddr,
DisableTLS: cfg.DisableTLS,
MaxRPCMsgSize: 32 << 20, // 32 MiB
MaxGRPCRetries: 5,
GRPCOperationTimeout: intervals.GRPCOperationTimeout(),
GRPCStartupBackoffTime: intervals.GRPCStartupBackoffTime(),
GRPCConnectionTimeout: intervals.GRPCConnectionTimeout(),
ReportInterval: intervals.ReportInterval(),
CacheSize: tcs,
SamplesPerSecond: cfg.SamplesPerSecond,
KernelVersion: kernelVersion,
HostName: hostname,
IPAddress: sourceIP,
})
if err != nil {
log.Error(err)
return exitFailure
}
cfg.Reporter = rep

log.Infof("Starting OTEL profiling agent %s (revision %s, build timestamp %s)",
vc.Version(), vc.Revision(), vc.BuildTimestamp())

Expand Down
7 changes: 7 additions & 0 deletions reporter/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ type Reporter interface {
HostMetadataReporter
MetricsReporter

// Start starts the reporter in the background.
//
// If the reporter needs to perform a long-running starting operation then it
// is recommended that Start() returns quickly and the long-running operation
// is performed in the background.
Comment on lines +23 to +25
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if we should have this here as in some cases (e.g. long-running operation that can fail) it's going to create problems (reporter that hasn't initialized properly with the rest of the agent being oblivious and executing as normal).

I'd reword to the following or remove altogether:

Suggested change
// If the reporter needs to perform a long-running starting operation then it
// is recommended that Start() returns quickly and the long-running operation
// is performed in the background.
// If this method needs to perform a long-running operation that can NOT fail, it is
// recommended that Start returns quickly and the long-running operation is
// performed in the background.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know. Running an HTTP server can fail, yet that's a long-running operation that should not run synchronously here.
If folks need something long-running that can report errors, they should setup a channel, or an error handler to be able to report the error upstream.

Note that this comment is heavily inspired by the one on collector component, which does the same thing.
https://github.com/open-telemetry/opentelemetry-collector/blob/main/component/component.go#L32

Start(context.Context) error

// Stop triggers a graceful shutdown of the reporter.
Stop()
// GetMetrics returns the reporter internal metrics.
Expand Down
Loading