Skip to content

Commit

Permalink
Add -bench-replay
Browse files Browse the repository at this point in the history
  • Loading branch information
rockdaboot committed Aug 15, 2024
1 parent 560ef1d commit d9fd463
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 0 deletions.
5 changes: 5 additions & 0 deletions cli_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const (
defaultArgSendErrorFrames = false
defaultArgBenchDataDir = ""
defaultArgBenchProtoDir = ""
defaultArgBenchReplay = false

// This is the X in 2^(n + x) where n is the default hardcoded map size value
defaultArgMapScaleFactor = 0
Expand Down Expand Up @@ -70,11 +71,13 @@ var (
sendErrorFramesHelp = "Send error frames (devfiler only, breaks Kibana)"
benchDataDirHelp = "Directory to store data for benchmarking."
benchProtoDirHelp = "Directory to store raw protobuf wire messages."
benchReplayHelp = "Replay data from -bench-data-dir directory."
)

type arguments struct {
benchDataDir string
benchProtoDir string
benchReplay bool
bpfVerifierLogLevel uint
bpfVerifierLogSize int
collAgentAddr string
Expand Down Expand Up @@ -154,6 +157,8 @@ func parseArgs() (*arguments, error) {
benchDataDirHelp)
fs.StringVar(&args.benchProtoDir, "bench-proto-dir", defaultArgBenchProtoDir,
benchProtoDirHelp)
fs.BoolVar(&args.benchReplay, "bench-replay", defaultArgBenchReplay,
benchReplayHelp)

fs.Usage = func() {
fs.PrintDefaults()
Expand Down
10 changes: 10 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,12 @@ func mainWithExitCode() exitCode {
}

if args.benchDataDir != "" {
if args.benchReplay {
if err = reporter.Replay(mainCtx, args.benchDataDir, rep); err != nil {
return failure("Failed to replay benchmark data: %v", err)
}
return exitSuccess
}
rep, err = reporter.NewBenchmarkReporter(args.benchDataDir, rep)
if err != nil {
return failure("Failed to create benchmark reporter: %v", err)
Expand Down Expand Up @@ -344,6 +350,10 @@ func sanityCheck(args *arguments) exitCode {
}
}

if args.benchReplay && args.benchDataDir == "" {
return failure("Replay requested but no data directory specified")
}

return exitSuccess
}

Expand Down
141 changes: 141 additions & 0 deletions reporter/replay.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package reporter

import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"sort"
"strings"
"time"

log "github.com/sirupsen/logrus"

"github.com/open-telemetry/opentelemetry-ebpf-profiler/libpf"
)

type fileInfo struct {
name string
timestamp int64
id uint64
funcName string
}

// Replay replays the stored data from benchDataDir.
// The argument r is the reporter that will receive the replayed data.
func Replay(ctx context.Context, benchDataDir string, rep Reporter) error {
files, err := os.ReadDir(benchDataDir)
if err != nil {
return fmt.Errorf("failed to read directory %s: %v", benchDataDir, err)
}

fileInfos := make([]fileInfo, 0, len(files))

for _, f := range files {
if !strings.HasSuffix(f.Name(), ".json") {
continue
}

name := f.Name()
// scan name for timestamp, counter and function name
var timestamp int64
var id uint64
var funcName string
if _, err = fmt.Sscanf(name, "%d_%x_%s", &timestamp, &id, &funcName); err != nil {
log.Errorf("Failed to parse file name %s: %v", name, err)
continue
}
funcName = strings.TrimSuffix(funcName, ".json")

fileInfos = append(fileInfos, fileInfo{
name: name,
timestamp: timestamp,
id: id,
funcName: funcName,
})
}

if len(fileInfos) == 0 {
return nil
}

// Sort fileInfos ascending by ID.
sort.Slice(fileInfos, func(i, j int) bool {
return fileInfos[i].id < fileInfos[j].id
})

if fileInfos[0].funcName != "Start" {
return fmt.Errorf("first function name must be \"Start\", instead it is \"%s\"",
fileInfos[0].funcName)
}

curTS := fileInfos[0].timestamp

// Replay the stored data
for _, fi := range fileInfos[1:] {
time.Sleep(time.Duration(fi.timestamp-curTS) * time.Nanosecond)
curTS = fi.timestamp

switch fi.funcName {
case "TraceEvent":
var v traceEvent
if err = dataFromFileInfo(benchDataDir, fi, &v); err == nil {
rep.ReportTraceEvent(v.Trace, v.Meta)
}
case "CountForTrace":
var v countForTrace
if err = dataFromFileInfo(benchDataDir, fi, &v); err == nil {
rep.ReportCountForTrace(v.TraceHash, v.Count, v.Meta)
}
case "FramesForTrace":
var v libpf.Trace
if err = dataFromFileInfo[libpf.Trace](benchDataDir, fi, &v); err == nil {
rep.ReportFramesForTrace(&v)
}
case "FallbackSymbol":
var v fallbackSymbol
if err = dataFromFileInfo(benchDataDir, fi, &v); err == nil {
rep.ReportFallbackSymbol(v.FrameID, v.Symbol)
}
case "FrameMetadata":
var v frameMetadata
if err = dataFromFileInfo(benchDataDir, fi, &v); err == nil {
rep.FrameMetadata(v.FileID, v.AddressOrLine, v.LineNumber, v.FunctionOffset,
v.FunctionName, v.FilePath)
}
case "Metrics":
var v metrics
if err = dataFromFileInfo[metrics](benchDataDir, fi, &v); err == nil {
rep.ReportMetrics(v.Timestamp, v.IDs, v.Values)
}
default:
err = fmt.Errorf("unsupported function name in file %s: %s", fi.name, fi.funcName)
}

if err != nil {
log.Errorf("Failed to replay data from file %s: %v", fi.name, err)
}

if err = ctx.Err(); err != nil {
return err
}
}

return nil
}

func dataFromFileInfo[T any](dir string, fi fileInfo, data *T) error {
pathName := filepath.Join(dir, fi.name)
f, err := os.Open(pathName)
if err != nil {
return fmt.Errorf("failed to open file %s: %v", pathName, err)
}
defer f.Close()

if err = json.NewDecoder(f).Decode(data); err != nil {
return fmt.Errorf("failed to decode JSON from file %s: %v", pathName, err)
}

return nil
}

0 comments on commit d9fd463

Please sign in to comment.