Skip to content

Commit

Permalink
Refactor code to use the new metrics registry in k6
Browse files Browse the repository at this point in the history
  • Loading branch information
mostafa committed May 5, 2022
1 parent 7192e1e commit 467ca40
Show file tree
Hide file tree
Showing 4 changed files with 318 additions and 93 deletions.
44 changes: 22 additions & 22 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"time"

kafkago "github.com/segmentio/kafka-go"
"go.k6.io/k6/stats"
"go.k6.io/k6/metrics"
)

func (*Kafka) Reader(
Expand Down Expand Up @@ -155,52 +155,52 @@ func (k *Kafka) reportReaderStats(currentStats kafkago.ReaderStats) error {

now := time.Now()

stats.PushIfNotDone(ctx, state.Samples, stats.Sample{
metrics.PushIfNotDone(ctx, state.Samples, metrics.Sample{
Time: now,
Metric: ReaderDials,
Tags: stats.IntoSampleTags(&tags),
Metric: k.metrics.ReaderDials,
Tags: metrics.IntoSampleTags(&tags),
Value: float64(currentStats.Dials),
})

stats.PushIfNotDone(ctx, state.Samples, stats.Sample{
metrics.PushIfNotDone(ctx, state.Samples, metrics.Sample{
Time: now,
Metric: ReaderFetches,
Tags: stats.IntoSampleTags(&tags),
Metric: k.metrics.ReaderFetches,
Tags: metrics.IntoSampleTags(&tags),
Value: float64(currentStats.Fetches),
})

stats.PushIfNotDone(ctx, state.Samples, stats.Sample{
metrics.PushIfNotDone(ctx, state.Samples, metrics.Sample{
Time: now,
Metric: ReaderMessages,
Tags: stats.IntoSampleTags(&tags),
Metric: k.metrics.ReaderMessages,
Tags: metrics.IntoSampleTags(&tags),
Value: float64(currentStats.Messages),
})

stats.PushIfNotDone(ctx, state.Samples, stats.Sample{
metrics.PushIfNotDone(ctx, state.Samples, metrics.Sample{
Time: now,
Metric: ReaderBytes,
Tags: stats.IntoSampleTags(&tags),
Metric: k.metrics.ReaderBytes,
Tags: metrics.IntoSampleTags(&tags),
Value: float64(currentStats.Bytes),
})

stats.PushIfNotDone(ctx, state.Samples, stats.Sample{
metrics.PushIfNotDone(ctx, state.Samples, metrics.Sample{
Time: now,
Metric: ReaderRebalances,
Tags: stats.IntoSampleTags(&tags),
Metric: k.metrics.ReaderRebalances,
Tags: metrics.IntoSampleTags(&tags),
Value: float64(currentStats.Rebalances),
})

stats.PushIfNotDone(ctx, state.Samples, stats.Sample{
metrics.PushIfNotDone(ctx, state.Samples, metrics.Sample{
Time: now,
Metric: ReaderTimeouts,
Tags: stats.IntoSampleTags(&tags),
Metric: k.metrics.ReaderTimeouts,
Tags: metrics.IntoSampleTags(&tags),
Value: float64(currentStats.Timeouts),
})

stats.PushIfNotDone(ctx, state.Samples, stats.Sample{
metrics.PushIfNotDone(ctx, state.Samples, metrics.Sample{
Time: now,
Metric: ReaderErrors,
Tags: stats.IntoSampleTags(&tags),
Metric: k.metrics.ReaderErrors,
Tags: metrics.IntoSampleTags(&tags),
Value: float64(currentStats.Errors),
})

Expand Down
15 changes: 12 additions & 3 deletions module.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package kafka

import "go.k6.io/k6/js/modules"
import (
"go.k6.io/k6/js/common"
"go.k6.io/k6/js/modules"
)

func init() {
modules.Register("k6/x/kafka", New())
}

type (
Kafka struct {
vu modules.VU
vu modules.VU
metrics kafkaMetrics
}
RootModule struct{}
KafkaModule struct {
Expand All @@ -26,7 +30,12 @@ func New() *RootModule {
}

func (*RootModule) NewModuleInstance(vu modules.VU) modules.Instance {
return &KafkaModule{Kafka: &Kafka{vu: vu}}
m, err := registerMetrics(vu)
if err != nil {
common.Throw(vu.Runtime(), err)
}

return &KafkaModule{Kafka: &Kafka{vu: vu, metrics: m}}
}

func (c *KafkaModule) Exports() modules.Exports {
Expand Down
38 changes: 19 additions & 19 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

kafkago "github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/compress"
"go.k6.io/k6/stats"
"go.k6.io/k6/metrics"
)

var (
Expand Down Expand Up @@ -160,45 +160,45 @@ func (k *Kafka) reportWriterStats(currentStats kafkago.WriterStats) error {

now := time.Now()

stats.PushIfNotDone(ctx, state.Samples, stats.Sample{
metrics.PushIfNotDone(ctx, state.Samples, metrics.Sample{
Time: now,
Metric: WriterDials,
Tags: stats.IntoSampleTags(&tags),
Metric: k.metrics.WriterDials,
Tags: metrics.IntoSampleTags(&tags),
Value: float64(currentStats.Dials),
})

stats.PushIfNotDone(ctx, state.Samples, stats.Sample{
metrics.PushIfNotDone(ctx, state.Samples, metrics.Sample{
Time: now,
Metric: WriterWrites,
Tags: stats.IntoSampleTags(&tags),
Metric: k.metrics.WriterWrites,
Tags: metrics.IntoSampleTags(&tags),
Value: float64(currentStats.Writes),
})

stats.PushIfNotDone(ctx, state.Samples, stats.Sample{
metrics.PushIfNotDone(ctx, state.Samples, metrics.Sample{
Time: now,
Metric: WriterMessages,
Tags: stats.IntoSampleTags(&tags),
Metric: k.metrics.WriterMessages,
Tags: metrics.IntoSampleTags(&tags),
Value: float64(currentStats.Messages),
})

stats.PushIfNotDone(ctx, state.Samples, stats.Sample{
metrics.PushIfNotDone(ctx, state.Samples, metrics.Sample{
Time: now,
Metric: WriterBytes,
Tags: stats.IntoSampleTags(&tags),
Metric: k.metrics.WriterBytes,
Tags: metrics.IntoSampleTags(&tags),
Value: float64(currentStats.Bytes),
})

stats.PushIfNotDone(ctx, state.Samples, stats.Sample{
metrics.PushIfNotDone(ctx, state.Samples, metrics.Sample{
Time: now,
Metric: WriterRebalances,
Tags: stats.IntoSampleTags(&tags),
Metric: k.metrics.WriterRebalances,
Tags: metrics.IntoSampleTags(&tags),
Value: float64(currentStats.Rebalances),
})

stats.PushIfNotDone(ctx, state.Samples, stats.Sample{
metrics.PushIfNotDone(ctx, state.Samples, metrics.Sample{
Time: now,
Metric: WriterErrors,
Tags: stats.IntoSampleTags(&tags),
Metric: k.metrics.WriterErrors,
Tags: metrics.IntoSampleTags(&tags),
Value: float64(currentStats.Errors),
})

Expand Down
Loading

0 comments on commit 467ca40

Please sign in to comment.