Skip to content

Commit

Permalink
Add otelbench
Browse files Browse the repository at this point in the history
  • Loading branch information
carsonip committed Jan 14, 2025
1 parent 6848318 commit 96b1115
Show file tree
Hide file tree
Showing 10 changed files with 1,110 additions and 3 deletions.
1 change: 1 addition & 0 deletions loadgen/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../Makefile.Common
72 changes: 72 additions & 0 deletions loadgen/cmd/otelbench/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package main

import (
"context"
"flag"
"fmt"
"testing"
"time"

"github.com/elastic/opentelemetry-collector-components/loadgen"
)

func main() {
loadgen.Init()
flag.CommandLine = loadgen.FlagSet
testing.Init()
flag.Parse()

result := testing.Benchmark(func(b *testing.B) {
stop := make(chan bool)

go func() {
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-stop:
case <-ticker.C:
logs, metricPoints, spans, err := loadgen.GetTelemetrySent()
if err != nil {
b.Logf("error getting internal telemetry: %s", err)
continue
}
total := logs + metricPoints + spans
if total > int64(b.N) {
b.StopTimer()
close(stop)
b.ReportMetric(float64(logs)/b.Elapsed().Seconds(), "logs/s")
b.ReportMetric(float64(metricPoints)/b.Elapsed().Seconds(), "metric_points/s")
b.ReportMetric(float64(spans)/b.Elapsed().Seconds(), "spans/s")
b.ReportMetric(float64(total)/b.Elapsed().Seconds(), "total/s")
return
}
}
}
}()

err := loadgen.Run(context.Background(), stop)
if err != nil {
fmt.Println(err)
b.Log(err)
}
})
fmt.Println(result.String())
}
53 changes: 53 additions & 0 deletions loadgen/cmd/otelsoak/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package main

import (
"fmt"
"log"

"github.com/elastic/opentelemetry-collector-components/loadgen"
"github.com/spf13/cobra"
"go.opentelemetry.io/collector/otelcol"
)

func main() {
loadgen.Init()

settings, err := loadgen.NewCollectorSettings(nil)
if err != nil {
log.Fatalf("collector new settings error: %v", err)
}

cmd := otelcol.NewCommand(settings)
cmd.Flags().AddGoFlagSet(loadgen.FlagSet)
runE := cmd.RunE
cmd.RunE = func(cmd *cobra.Command, args []string) error {
// This is to pass in parsed loadgen flags e.g. api-key, secret-token to otel collector
sets := loadgen.CollectorSetFromConfig()
for _, set := range sets {
if err := cmd.Flags().Set("set", set); err != nil {
return fmt.Errorf("error passing --set to collector: %w", err)
}
}
return runE(cmd, args)
}
if err := cmd.Execute(); err != nil {
log.Fatalf("collector execute error: %v", err)
}
}
89 changes: 89 additions & 0 deletions loadgen/collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package loadgen // import "github.com/elastic/opentelemetry-collector-components/loadgen"

import (
"context"
"os"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/confmap/provider/envprovider"
"go.opentelemetry.io/collector/confmap/provider/fileprovider"
"go.opentelemetry.io/collector/confmap/provider/httpprovider"
"go.opentelemetry.io/collector/confmap/provider/httpsprovider"
"go.opentelemetry.io/collector/confmap/provider/yamlprovider"
"go.opentelemetry.io/collector/otelcol"
)

const (
buildDescription = "loadgen distribution"
buildVersion = "0.1.0"
)

func RunCollector(ctx context.Context, stop chan bool, configFiles []string) error {
settings, err := NewCollectorSettings(configFiles)
if err != nil {
return err
}

svc, err := otelcol.NewCollector(settings)
if err != nil {
return err
}

// cancel context on stop from event manager
cancelCtx, cancel := context.WithCancel(ctx)
go func() {
<-stop
cancel()
}()
defer cancel()

return svc.Run(cancelCtx)
}

func NewCollectorSettings(configPaths []string) (otelcol.CollectorSettings, error) {
buildInfo := component.BuildInfo{
Command: os.Args[0],
Description: buildDescription,
Version: buildVersion,
}
configProviderSettings := otelcol.ConfigProviderSettings{
ResolverSettings: confmap.ResolverSettings{
URIs: configPaths,
ProviderFactories: []confmap.ProviderFactory{
fileprovider.NewFactory(),
envprovider.NewFactory(),
yamlprovider.NewFactory(),
httpprovider.NewFactory(),
httpsprovider.NewFactory(),
},
ConverterFactories: []confmap.ConverterFactory{},
},
}

return otelcol.CollectorSettings{
Factories: components,
BuildInfo: buildInfo,
ConfigProviderSettings: configProviderSettings,
// we're handling DisableGracefulShutdown via the cancelCtx being passed
// to the collector's Run method in the Run function
DisableGracefulShutdown: true,
}, nil
}
75 changes: 75 additions & 0 deletions loadgen/components.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package loadgen // import "github.com/elastic/opentelemetry-collector-components/loadgen"

import (
"github.com/elastic/opentelemetry-collector-components/processor/ratelimitprocessor"
"github.com/elastic/opentelemetry-collector-components/receiver/loadgenreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor"
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/debugexporter"
"go.opentelemetry.io/collector/exporter/otlpexporter"
"go.opentelemetry.io/collector/extension"
"go.opentelemetry.io/collector/otelcol"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/receiver"
)

func components() (otelcol.Factories, error) {
var err error
factories := otelcol.Factories{}

// Receivers
factories.Receivers, err = receiver.MakeFactoryMap(
loadgenreceiver.NewFactory(),
)
if err != nil {
return otelcol.Factories{}, err
}

// Processors
factories.Processors, err = processor.MakeFactoryMap(
ratelimitprocessor.NewFactory(),
transformprocessor.NewFactory(),
)
if err != nil {
return otelcol.Factories{}, err
}

// Exporters
factories.Exporters, err = exporter.MakeFactoryMap(
otlpexporter.NewFactory(),
debugexporter.NewFactory(),
)
if err != nil {
return otelcol.Factories{}, err
}

factories.Connectors, err = connector.MakeFactoryMap()
if err != nil {
return otelcol.Factories{}, err
}

factories.Extensions, err = extension.MakeFactoryMap()
if err != nil {
return otelcol.Factories{}, err
}

return factories, err
}
11 changes: 8 additions & 3 deletions loadgen/config.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ service:
receivers: [loadgen]
processors: [ratelimit, transform/rewrite]
exporters: [otlp, debug]
# telemetry:
# logs:
# level: debug
telemetry:
metrics:
readers:
- pull:
exporter:
prometheus:
host: '127.0.0.1'
port: 8888
Loading

0 comments on commit 96b1115

Please sign in to comment.