Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
carsonip committed Jan 17, 2025
1 parent e4647f2 commit 5552d83
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 60 deletions.
3 changes: 3 additions & 0 deletions receiver/loadgenreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ type Config struct {
Metrics MetricsConfig `mapstructure:"metrics"`
Logs LogsConfig `mapstructure:"logs"`
Traces TracesConfig `mapstructure:"traces"`

// NumWorker is the number of workers to share the load
NumWorkers int `mapstructure:"num_workers"`
}

type MetricsConfig struct {
Expand Down
1 change: 1 addition & 0 deletions receiver/loadgenreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func createDefaultReceiverConfig(logsDone, metricsDone, tracesDone chan Stats) c
Traces: TracesConfig{
doneCh: tracesDone,
},
NumWorkers: 1,
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

package internal // import "github.com/elastic/opentelemetry-collector-components/receiver/loadgenreceiver/internal"
package loopinglist // import "github.com/elastic/opentelemetry-collector-components/receiver/loadgenreceiver/internal/loopinglist"

type LoopingList[T any] struct {
items []T
Expand Down
6 changes: 3 additions & 3 deletions receiver/loadgenreceiver/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
"go.opentelemetry.io/collector/receiver"
"go.uber.org/zap"

"github.com/elastic/opentelemetry-collector-components/receiver/loadgenreceiver/internal"
"github.com/elastic/opentelemetry-collector-components/receiver/loadgenreceiver/internal/loopinglist"
)

//go:embed testdata/logs.jsonl
Expand All @@ -42,7 +42,7 @@ type logsGenerator struct {
cfg *Config
logger *zap.Logger

samples internal.LoopingList[plog.Logs]
samples loopinglist.LoopingList[plog.Logs]

stats Stats

Expand Down Expand Up @@ -89,7 +89,7 @@ func createLogsReceiver(
cfg: genConfig,
logger: set.Logger,
consumer: consumer,
samples: internal.NewLoopingList(samples),
samples: loopinglist.NewLoopingList(samples),
}, nil
}

Expand Down
6 changes: 3 additions & 3 deletions receiver/loadgenreceiver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
"go.opentelemetry.io/collector/receiver"
"go.uber.org/zap"

"github.com/elastic/opentelemetry-collector-components/receiver/loadgenreceiver/internal"
"github.com/elastic/opentelemetry-collector-components/receiver/loadgenreceiver/internal/loopinglist"
)

//go:embed testdata/metrics.jsonl
Expand All @@ -42,7 +42,7 @@ type metricsGenerator struct {
cfg *Config
logger *zap.Logger

samples internal.LoopingList[pmetric.Metrics]
samples loopinglist.LoopingList[pmetric.Metrics]

stats Stats

Expand Down Expand Up @@ -89,7 +89,7 @@ func createMetricsReceiver(
cfg: genConfig,
logger: set.Logger,
consumer: consumer,
samples: internal.NewLoopingList(samples),
samples: loopinglist.NewLoopingList(samples),
}, nil
}

Expand Down
79 changes: 48 additions & 31 deletions receiver/loadgenreceiver/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import (
"bytes"
"context"
_ "embed"
"errors"
"os"
"sync"
"time"

"go.opentelemetry.io/collector/component"
Expand All @@ -32,7 +34,7 @@ import (
"go.opentelemetry.io/collector/receiver"
"go.uber.org/zap"

"github.com/elastic/opentelemetry-collector-components/receiver/loadgenreceiver/internal"
"github.com/elastic/opentelemetry-collector-components/receiver/loadgenreceiver/internal/loopinglist"
)

const maxScannerBufSize = 1024 * 1024
Expand All @@ -44,9 +46,10 @@ type tracesGenerator struct {
cfg *Config
logger *zap.Logger

samples internal.LoopingList[ptrace.Traces]
samples loopinglist.BoundedLoopingList[ptrace.Traces]

Check failure on line 49 in receiver/loadgenreceiver/traces.go

View workflow job for this annotation

GitHub Actions / govulncheck (all)

undefined: loopinglist.BoundedLoopingList

stats Stats
stats Stats
statsMu sync.Mutex

consumer consumer.Traces

Expand Down Expand Up @@ -91,36 +94,50 @@ func createTracesReceiver(
cfg: genConfig,
logger: set.Logger,
consumer: consumer,
samples: internal.NewLoopingList(samples),
samples: loopinglist.NewBoundedLoopingList(samples, genConfig.Traces.MaxReplay),

Check failure on line 97 in receiver/loadgenreceiver/traces.go

View workflow job for this annotation

GitHub Actions / govulncheck (all)

undefined: loopinglist.NewBoundedLoopingList
}, nil
}

func (ar *tracesGenerator) Start(ctx context.Context, _ component.Host) error {
startCtx, cancelFn := context.WithCancel(ctx)
ar.cancelFn = cancelFn

go func() {
for {
select {
case <-startCtx.Done():
return
default:
}
m := ar.nextTraces()
if err := ar.consumer.ConsumeTraces(startCtx, m); err != nil {
ar.logger.Error(err.Error())
ar.stats.FailedRequests++
ar.stats.FailedSpans += m.SpanCount()
} else {
ar.stats.Requests++
ar.stats.Spans += m.SpanCount()
}
if ar.isDone() {
if ar.cfg.Traces.doneCh != nil {
ar.cfg.Traces.doneCh <- ar.stats
wg := sync.WaitGroup{}

for i := 0; i < ar.cfg.NumWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-startCtx.Done():
return
default:
}
return
m, err := ar.nextTraces()
if errors.Is(err, loopinglist.ErrLoopLimitReached) {

Check failure on line 118 in receiver/loadgenreceiver/traces.go

View workflow job for this annotation

GitHub Actions / govulncheck (all)

undefined: loopinglist.ErrLoopLimitReached
return
}
if err := ar.consumer.ConsumeTraces(startCtx, m); err != nil {
ar.logger.Error(err.Error())
ar.statsMu.Lock()
ar.stats.FailedRequests++
ar.stats.FailedSpans += m.SpanCount()
ar.statsMu.Unlock()
} else {
ar.statsMu.Lock()
ar.stats.Requests++
ar.stats.Spans += m.SpanCount()
ar.statsMu.Unlock()
}

}
}()
}
go func() {
wg.Wait()
if ar.cfg.Traces.doneCh != nil {
ar.cfg.Traces.doneCh <- ar.stats
}
}()
return nil
Expand All @@ -133,9 +150,13 @@ func (ar *tracesGenerator) Shutdown(context.Context) error {
return nil
}

func (ar *tracesGenerator) nextTraces() ptrace.Traces {
func (ar *tracesGenerator) nextTraces() (ptrace.Traces, error) {
nextLogs := ptrace.NewTraces()
ar.samples.Next().CopyTo(nextLogs)
s, err := ar.samples.Next()
if err != nil {
return s, err
}
s.CopyTo(nextLogs)

rm := nextLogs.ResourceSpans()
for i := 0; i < rm.Len(); i++ {
Expand All @@ -151,9 +172,5 @@ func (ar *tracesGenerator) nextTraces() ptrace.Traces {
}
}

return nextLogs
}

func (ar *tracesGenerator) isDone() bool {
return ar.cfg.Traces.MaxReplay > 0 && ar.samples.LoopCount() >= ar.cfg.Traces.MaxReplay
return nextLogs, nil
}
49 changes: 27 additions & 22 deletions receiver/loadgenreceiver/traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package loadgenreceiver // import "github.com/elastic/opentelemetry-collector-co
import (
"bytes"
"context"
"fmt"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -32,26 +33,30 @@ import (

func TestTracesGenerator_doneCh(t *testing.T) {
const maxReplay = 2
doneCh := make(chan Stats)
sink := &consumertest.TracesSink{}
r, _ := createTracesReceiver(context.Background(), receiver.Settings{
ID: component.ID{},
TelemetrySettings: component.TelemetrySettings{
Logger: zap.NewNop(),
},
BuildInfo: component.BuildInfo{},
}, &Config{Traces: TracesConfig{
MaxReplay: maxReplay,
doneCh: doneCh,
}}, sink)
err := r.Start(context.Background(), componenttest.NewNopHost())
assert.NoError(t, err)
defer func() {
assert.NoError(t, r.Shutdown(context.Background()))
}()
stats := <-doneCh
want := maxReplay * bytes.Count(demoTraces, []byte("\n"))
assert.Equal(t, want, stats.Requests)
assert.Equal(t, want, len(sink.AllTraces()))
assert.Equal(t, sink.SpanCount(), stats.Spans)
for _, workers := range []int{1, 2} {
t.Run(fmt.Sprintf("workers=%d", workers), func(t *testing.T) {
doneCh := make(chan Stats)
sink := &consumertest.TracesSink{}
cfg := createDefaultReceiverConfig(nil, nil, doneCh)
cfg.(*Config).Traces.MaxReplay = maxReplay
cfg.(*Config).NumWorkers = workers
r, _ := createTracesReceiver(context.Background(), receiver.Settings{
ID: component.ID{},
TelemetrySettings: component.TelemetrySettings{
Logger: zap.NewNop(),
},
BuildInfo: component.BuildInfo{},
}, cfg, sink)
err := r.Start(context.Background(), componenttest.NewNopHost())
assert.NoError(t, err)
defer func() {
assert.NoError(t, r.Shutdown(context.Background()))
}()
stats := <-doneCh
want := maxReplay * bytes.Count(demoTraces, []byte("\n"))
assert.Equal(t, want, stats.Requests)
assert.Equal(t, want, len(sink.AllTraces()))
assert.Equal(t, sink.SpanCount(), stats.Spans)
})
}
}

0 comments on commit 5552d83

Please sign in to comment.