Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add custom metrics names #23

Merged
merged 2 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: '1.20'
go-version: '1.21'

- name: Install xk6
run: go install go.k6.io/xk6/cmd/xk6@latest
Expand Down Expand Up @@ -47,7 +47,7 @@ jobs:
steps:
- uses: actions/setup-go@v3
with:
go-version: '1.20'
go-version: '1.21'
- uses: actions/checkout@v3
- name: golangci-lint
uses: golangci/golangci-lint-action@v3
51 changes: 49 additions & 2 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,49 @@ type conf struct {
clientCertKeyPath string
}

const (
sentBytesLabel = "mqtt_sent_bytes"
receivedBytesLabel = "mqtt_received_bytes"
sentMessagesCountLabel = "mqtt_sent_messages_count"
receivedMessagesCountLabel = "mqtt_received_messages_count"
)

func getLabels(labelsArg goja.Value, rt *goja.Runtime) mqttMetricsLabels {
labels := mqttMetricsLabels{}
metricsLabels := labelsArg
if metricsLabels == nil || goja.IsUndefined(metricsLabels) {
// set default values
labels.SentBytesLabel = sentBytesLabel
labels.ReceivedBytesLabel = receivedBytesLabel
labels.SentMessagesCountLabel = sentMessagesCountLabel
labels.ReceivedMessagesCountLabel = receivedMessagesCountLabel
return labels
}

labelsJS, ok := metricsLabels.Export().(map[string]any)
if !ok {
common.Throw(rt, fmt.Errorf("invalid metricsLabels %#v", metricsLabels.Export()))
}
labels.SentBytesLabel, ok = labelsJS["sentBytesLabel"].(string)
if !ok {
common.Throw(rt, fmt.Errorf("invalid metricsLabels sentBytesLabel %#v", metricsLabels.Export()))
}
labels.ReceivedBytesLabel, ok = labelsJS["receivedBytesLabel"].(string)
if !ok {
common.Throw(rt, fmt.Errorf("invalid metricsLabels receivedBytesLabel %#v", metricsLabels.Export()))
}
labels.SentMessagesCountLabel, ok = labelsJS["sentMessagesCountLabel"].(string)
if !ok {
common.Throw(rt, fmt.Errorf("invalid metricsLabels sentMessagesCountLabel %#v", metricsLabels.Export()))
}
labels.ReceivedMessagesCountLabel, ok = labelsJS["receivedMessagesCountLabel"].(string)
if !ok {
common.Throw(rt, fmt.Errorf("invalid metricsLabels receivedMessagesCountLabel %#v", metricsLabels.Export()))
}

return labels
}

//nolint:nosnakecase // their choice not mine
func (m *MqttAPI) client(c goja.ConstructorCall) *goja.Object {
serversArray := c.Argument(0)
Expand Down Expand Up @@ -113,10 +156,14 @@ func (m *MqttAPI) client(c goja.ConstructorCall) *goja.Object {
} else {
clientConf.clientCertKeyPath = clientCertKeyPathValue.String()
}

labels := getLabels(c.Argument(9), rt)
metrics, err := registerMetrics(m.vu, labels)
if err != nil {
common.Throw(m.vu.Runtime(), err)
}
client := &client{
vu: m.vu,
metrics: &m.metrics,
metrics: &metrics,
conf: clientConf,
obj: rt.NewObject(),
}
Expand Down
220 changes: 220 additions & 0 deletions examples/test_custom_labels.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
/*

This is a k6 test script that imports the xk6-mqtt and
tests Mqtt with a 100 messages per connection.

*/

import {
check
} from 'k6';

const mqtt = require('k6/x/mqtt');

const rnd_count = 2000;

// create random number to create a new topic at each run
let rnd = Math.random() * rnd_count;

// conection timeout (ms)
let connectTimeout = 2000

// publish timeout (ms)
let publishTimeout = 2000

// subscribe timeout (ms)
let subscribeTimeout = 2000

// connection close timeout (ms)
let closeTimeout = 2000

// Mqtt topic one per VU
const k6Topic = `test-k6-plugin-topic ${rnd} ${__VU}`;
// Connect IDs one connection per VU
const k6SubId = `k6-sub-${rnd}-${__VU}`;
const k6PubId = `k6-pub-${rnd}-${__VU}`;

// number of message pusblished and receives at each iteration
const messageCount = 3;

const host = "localhost";
const port = "1883";

const pub_labels = {
"sentBytesLabel": "pub_mqtt_sent_bytes",
"receivedBytesLabel": "pub_mqtt_received_bytes",
"sentMessagesCountLabel": "pub_mqtt_sent_messages_count",
"receivedMessagesCountLabel": "pub_mqtt_received_messages_count"
}

// create publisher client
let publisher = new mqtt.Client(
// The list of URL of MQTT server to connect to
[host + ":" + port],
// A username to authenticate to the MQTT server
"",
// Password to match username
"",
// clean session setting
false,
// Client id for reader
k6PubId,
// timeout in ms
connectTimeout,
// caRoot
"",
// client certificate path
"",
// client certificate key path
"",
pub_labels
)
let err;

try {
publisher.connect()
}
catch (error) {
err = error
}

if (err != undefined) {
console.error("publish connect error:", err)
// you may want to use fail here if you want only to test successfull connection only
// fail("fatal could not connect to broker for publish")
}

const sub_labels = {
"sentBytesLabel": "sub_mqtt_sent_bytes",
"receivedBytesLabel": "sub_mqtt_received_bytes",
"sentMessagesCountLabel": "sub_mqtt_sent_messages_count",
"receivedMessagesCountLabel": "sub_mqtt_received_messages_count"
}

// create subscriber client
let subscriber = new mqtt.Client(
// The list of URL of MQTT server to connect to
[host + ":" + port],
// A username to authenticate to the MQTT server
"",
// Password to match username
"",
// clean session setting
false,
// Client id for reader
k6SubId,
// timeout in ms
connectTimeout,
// caRoot
"",
// client certificate path
"",
// client certificate key path
"",
sub_labels
)


try {
subscriber.connect()
}
catch (error) {
err = error
}

if (err != undefined) {
console.error("subscribe connect error:", err)
// you may want to use fail here if you want only to test successfull connection only
// fail("fatal could not connect to broker for subscribe")
}

export default function () {
// Message content one per ITER
const k6Message = `k6-message-content-${rnd} ${__VU}:${__ITER}`;
check(publisher, {
"is publisher connected": publisher => publisher.isConnected()
});
check(subscriber, {
"is subcriber connected": subscriber => subscriber.isConnected()
});

// subscribe first
try {
subscriber.subscribe(
// topic to be used
k6Topic,
// The QoS of messages
1,
// timeout in ms
subscribeTimeout,
)
} catch (error) {
err = error
}

if (err != undefined) {
console.error("subscribe error:", err)
// you may want to use fail here if you want only to test successfull connection only
// fail("fatal could not connect to broker for subscribe")
}
let count = messageCount;
subscriber.addEventListener("message", (obj) => {
// closing as we received one message
let message = obj.message
check(message, {
"message received": msg => msg != undefined
});
check(message, {
"is content correct": msg => msg == k6Message
});

if (--count > 0) {
// tell the subscriber that you want to wait for more than one message
// if you don't call subContinue you'll receive only one message per subscribe
subscriber.subContinue();
}
})
subscriber.addEventListener("error", (err) => {
check(null, {
"message received": false
});
})
for (let i = 0; i < messageCount; i++) {
// publish count messages
let err_publish;
try {
publisher.publish(
// topic to be used
k6Topic,
// The QoS of messages
1,
// Message to be sent
k6Message,
// retain policy on message
false,
// timeout in ms
publishTimeout,
// async publish handlers if needed
// (obj) => { // success
// console.log(obj.type) // publish
// console.log(obj.topic) // published topic
// },
// (err) => { // failure
// console.log(err.type) // error
// console.log(err.message)
// }
);
} catch (error) {
err_publish = error
}
check(err_publish, {
"is sent": err => err == undefined
});
}
}

export function teardown() {
// closing both connections at VU close
publisher.close(closeTimeout);
subscriber.close(closeTimeout);
}
40 changes: 26 additions & 14 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,34 +1,46 @@
module github.com/pmalhaire/xk6-mqtt

go 1.18
go 1.21

require (
github.com/dop251/goja v0.0.0-20230707174833-636fdf960de1
github.com/dop251/goja v0.0.0-20231027120936-b396bb4c349d
github.com/eclipse/paho.mqtt.golang v1.4.3
github.com/mstoykov/k6-taskqueue-lib v0.1.0
go.k6.io/k6 v0.45.1
github.com/stretchr/testify v1.8.4
go.k6.io/k6 v0.47.0
gopkg.in/guregu/null.v3 v3.5.0
)

require (
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dlclark/regexp2 v1.10.0 // indirect
github.com/fatih/color v1.15.0 // indirect
github.com/fatih/color v1.16.0 // indirect
github.com/go-sourcemap/sourcemap v2.1.4-0.20211119122758-180fcef48034+incompatible // indirect
github.com/google/pprof v0.0.0-20230705174524-200ffdc848b8 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/pprof v0.0.0-20231101202521-4ca4178f5c7a // indirect
github.com/gorilla/websocket v1.5.1 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/klauspost/compress v1.16.7 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mccutchen/go-httpbin v1.1.2-0.20190116014521-c5cb2f4802fa // indirect
github.com/mstoykov/atlas v0.0.0-20220811071828-388f114305dd // indirect
github.com/onsi/ginkgo v1.16.5 // indirect
github.com/onsi/gomega v1.18.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/serenize/snaker v0.0.0-20201027110005-a7ad2135616e // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/spf13/afero v1.9.5 // indirect
golang.org/x/net v0.12.0 // indirect
golang.org/x/sync v0.3.0 // indirect
golang.org/x/sys v0.10.0 // indirect
golang.org/x/text v0.11.0 // indirect
golang.org/x/time v0.3.0 // indirect
gopkg.in/guregu/null.v3 v3.5.0 // indirect
github.com/spf13/afero v1.10.0 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sync v0.5.0 // indirect
golang.org/x/sys v0.14.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.4.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 // indirect
google.golang.org/grpc v1.57.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading
Loading