Skip to content

Commit

Permalink
Print cfg and intervals
Browse files Browse the repository at this point in the history
  • Loading branch information
rockdaboot committed Jul 18, 2024
1 parent 0cf8254 commit fc2c399
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 6 deletions.
3 changes: 2 additions & 1 deletion hostmetadata/host/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ var (
// default.
func AddMetadata(result map[string]string) error {
caEndpoint := agent.GetCollectionAgentAddr()
// Extract the host part of the endpoint
log.Debugf("coll agent endpoint: %s", caEndpoint)

// Remove the port from the endpoint in case it is present
host, _, err := net.SplitHostPort(caEndpoint)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion otel/builder-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ receivers:
- gomod:
go.opentelemetry.io/collector/receiver/otlpreceiver v0.104.0
- gomod:
github.com/elastic/otel-profiling-agent v0.0.0-20240717140952-d386d2702214
github.com/elastic/otel-profiling-agent v0.0.0-20240718145131-96917d0e213a
import: github.com/elastic/otel-profiling-agent/otel/receiver
name: profilingreceiver
1 change: 0 additions & 1 deletion otel/receiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ type Config struct {
MonitorInterval time.Duration `mapstructure:"monitor-interval"`
SamplesPerSecond int `mapstructure:"samples-per-second"`
SendErrorFrames bool `mapstructure:"send-error-frames"`
OTLPReporter bool `mapstructure:"otlp-reporter"`

// Written in createDefaultConfig()
PresentCPUCores uint16
Expand Down
29 changes: 26 additions & 3 deletions otel/receiver/trace-receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ import (
"time"

"github.com/elastic/otel-profiling-agent/containermetadata"
"github.com/elastic/otel-profiling-agent/host"
hostinfo "github.com/elastic/otel-profiling-agent/host"
"github.com/elastic/otel-profiling-agent/hostmetadata"
agentmeta "github.com/elastic/otel-profiling-agent/hostmetadata/agent"
hostmeta "github.com/elastic/otel-profiling-agent/hostmetadata/host"
"github.com/elastic/otel-profiling-agent/platform"
"github.com/elastic/otel-profiling-agent/reporter"
Expand All @@ -17,6 +18,7 @@ import (
"github.com/elastic/otel-profiling-agent/tracer"
tracertypes "github.com/elastic/otel-profiling-agent/tracer/types"
"github.com/elastic/otel-profiling-agent/util"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.uber.org/zap"
Expand All @@ -36,6 +38,14 @@ var (

// Start starts the receiver.
func (otelRcvr *otelReceiver) Start(ctx context.Context, host component.Host) error {
cfg := otelRcvr.config

if cfg.Verbose {
// logrus is used in the agent code, so we need to set the log level here.
// todo: make the logger configurable
logrus.SetLevel(logrus.DebugLevel)
}

log := otelRcvr.logger.Sugar()

otelRcvr.host = host
Expand All @@ -44,7 +54,13 @@ func (otelRcvr *otelReceiver) Start(ctx context.Context, host component.Host) er
}
ctx, otelRcvr.cancel = context.WithCancel(ctx)

cfg := otelRcvr.config
if err := tracer.ProbeBPFSyscall(); err != nil {
return fmt.Errorf("failed to probe eBPF syscall: %v", err)
}

if err := tracer.ProbeTracepoint(); err != nil {
return fmt.Errorf("failed to probe tracepoint: %v", err)
}

log.Debug("Determining tracers to include")
includeTracers, err := tracertypes.Parse(cfg.Tracers)
Expand All @@ -66,6 +82,11 @@ func (otelRcvr *otelReceiver) Start(ctx context.Context, host component.Host) er
metadataCollector := hostmetadata.NewCollector(cfg.CollectionAgent, environment)
hostMetadataMap := metadataCollector.GetHostMetadata()

log.Debugf("Collection agent: %s", cfg.CollectionAgent)
agentmeta.SetAgentData(&agentmeta.Config{
CollectionAgentAddr: cfg.CollectionAgent,
})

// Connect to the collection agent and start reporting.
rep, err := reporter.Start(ctx, &reporter.Config{
CollAgentAddr: cfg.CollectionAgent,
Expand Down Expand Up @@ -137,6 +158,8 @@ func (otelRcvr *otelReceiver) Start(ctx context.Context, host component.Host) er
// This log line is used in our system tests to verify if that the agent has started.
// So if you change this log line, update also the system test.
log.Info("Attached sched monitor")
log.Infof("cfg: %+v", cfg)
log.Info("intervals: ", intervals)

if err := startTraceHandling(ctx, rep, intervals, trc, traceHandlerCacheSize); err != nil {
return fmt.Errorf("failed to start trace handling: %v", err)
Expand Down Expand Up @@ -165,7 +188,7 @@ func (otelRcvr *otelReceiver) Shutdown(_ context.Context) error {
func startTraceHandling(ctx context.Context, rep reporter.TraceReporter,
intervals *times.Times, trc *tracer.Tracer, cacheSize uint32) error {
// Spawn monitors for the various result maps
traceCh := make(chan *host.Trace)
traceCh := make(chan *hostinfo.Trace)

if err := trc.StartMapMonitors(ctx, traceCh); err != nil {
return fmt.Errorf("failed to start map monitors: %v", err)
Expand Down
2 changes: 2 additions & 0 deletions tracehandler/tracehandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ func (m *traceHandler) HandleTrace(bpfTrace *host.Trace) {
log.Warnf("Failed to determine container info for trace: %v", err)
}

log.Infof("HandleTrace: %v %s", bpfTrace.Hash, bpfTrace.Comm)

if !m.reporter.SupportsReportTraceEvent() {
// Fast path: if the trace is already known remotely, we just send a counter update.
postConvHash, traceKnown := m.bpfTraceCache.Get(bpfTrace.Hash)
Expand Down
3 changes: 3 additions & 0 deletions tracer/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,15 +165,18 @@ func startPollingPerfEventMonitor(ctx context.Context, perfEventMap *ebpf.Map,
break PollLoop
}

log.Debug("Polling perf event buffer")
// Eagerly read events until the buffer is exhausted.
for {
if err = eventReader.ReadInto(&data); err != nil {
if !errors.Is(err, os.ErrDeadlineExceeded) {
readErrorCount.Add(1)
}
log.Debugf(" ... %v", err)

break
}
log.Debugf(" ... %v", data)
if data.LostSamples != 0 {
lostEventsCount.Add(data.LostSamples)
continue
Expand Down
1 change: 1 addition & 0 deletions tracer/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -949,6 +949,7 @@ func (t *Tracer) loadBpfTrace(raw []byte) *host.Trace {
func (t *Tracer) StartMapMonitors(ctx context.Context, traceOutChan chan *host.Trace) error {
eventMetricCollector := t.startEventMonitor(ctx)

log.Debugf("trace_poll_interval: %v", t.intervals.TracePollInterval())
startPollingPerfEventMonitor(ctx, t.ebpfMaps["trace_events"], t.intervals.TracePollInterval(),
t.samplesPerSecond*int(unsafe.Sizeof(C.Trace{})), func(rawTrace []byte) {
traceOutChan <- t.loadBpfTrace(rawTrace)
Expand Down

0 comments on commit fc2c399

Please sign in to comment.