Skip to content

Commit

Permalink
fixed further warnings
Browse files Browse the repository at this point in the history
  • Loading branch information
ElmarZeeb authored and pmalhaire committed Oct 28, 2024
1 parent 3ed51a8 commit c71f6fe
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 11 deletions.
32 changes: 28 additions & 4 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"crypto/x509"
"errors"
"fmt"
"math"
"os"
"time"

Expand Down Expand Up @@ -157,7 +158,11 @@ func (m *MqttAPI) client(c sobek.ConstructorCall) *sobek.Object {
if timeoutValue == nil || sobek.IsUndefined(timeoutValue) {
common.Throw(rt, errors.New("Client requires a timeout value"))
}
clientConf.timeout = uint(timeoutValue.ToInteger())
timeoutIntValue := timeoutValue.ToInteger()
if timeoutIntValue < 0 {
common.Throw(rt, errors.New("negative timeout value is not allowed"))
}
clientConf.timeout = uint(timeoutIntValue)

// optional args
if caRootPathValue := c.Argument(6); caRootPathValue == nil || sobek.IsUndefined(caRootPathValue) {
Expand Down Expand Up @@ -202,6 +207,14 @@ func (m *MqttAPI) client(c sobek.ConstructorCall) *sobek.Object {
conf: clientConf,
obj: rt.NewObject(),
}

m.defineRuntimeMethods(client)

return client.obj
}

func (m *MqttAPI) defineRuntimeMethods(client *client) {
rt := m.vu.Runtime()
must := func(err error) {
if err != nil {
common.Throw(rt, err)
Expand All @@ -224,14 +237,18 @@ func (m *MqttAPI) client(c sobek.ConstructorCall) *sobek.Object {

must(client.obj.DefineDataProperty(
"close", rt.ToValue(client.Close), sobek.FLAG_FALSE, sobek.FLAG_FALSE, sobek.FLAG_TRUE))

return client.obj
}

// Connect create a connection to mqtt
func (c *client) Connect() error {
opts := paho.NewClientOptions()

// check timeout value
timeoutValue, err := safeUintToInt64(c.conf.timeout)
if err != nil {
panic("timeout value is too large")
}

var tlsConfig *tls.Config
// Use root CA if specified
if len(c.conf.caRootPath) > 0 {
Expand Down Expand Up @@ -285,7 +302,7 @@ func (c *client) Connect() error {
client := paho.NewClient(opts)
token := client.Connect()
rt := c.vu.Runtime()
if !token.WaitTimeout(time.Duration(c.conf.timeout) * time.Millisecond) {
if !token.WaitTimeout(time.Duration(timeoutValue) * time.Millisecond) {
common.Throw(rt, ErrTimeout)
return ErrTimeout
}
Expand Down Expand Up @@ -334,3 +351,10 @@ func (c *client) newErrorEvent(msg string) *sobek.Object {
must(o.DefineDataProperty("message", rt.ToValue(msg), sobek.FLAG_FALSE, sobek.FLAG_FALSE, sobek.FLAG_TRUE))
return o
}

func safeUintToInt64(u uint) (int64, error) {
if u > math.MaxInt64 {
return 0, errors.New("value too large to convert to int64")
}
return int64(u), nil
}
2 changes: 2 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ var (
ErrClient = errors.New("client is not connected")
// ErrTimeout operation timeout
ErrTimeout = errors.New("operation timeout")
// ErrTimeoutToLong timeout value is too large
ErrTimeoutToLong = errors.New("timeout value is too large")
// ErrSubscribe failed to subscribe to mqtt topic
ErrSubscribe = errors.New("subscribe failure")
// ErrConsumeToken consume token is invalid
Expand Down
28 changes: 24 additions & 4 deletions publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,15 @@ func (c *client) Publish(
if success == nil && failure == nil {
return c.publishSync(topic, qos, message, retain, timeout)
}

// check timeout value
timeoutValue, err := safeUintToInt64(timeout)
if err != nil {
rt := c.vu.Runtime()
common.Throw(rt, ErrTimeoutToLong)
return ErrTimeoutToLong
}

// async case
callback := c.vu.RegisterCallback()
go func() {
Expand All @@ -40,7 +49,7 @@ func (c *client) Publish(
return
}
token := c.pahoClient.Publish(topic, byte(qos), retain, message)
if !token.WaitTimeout(time.Duration(timeout) * time.Millisecond) {
if !token.WaitTimeout(time.Duration(timeoutValue) * time.Millisecond) {
callback(func() error {
ev := c.newErrorEvent("publish timeout")

Expand Down Expand Up @@ -94,19 +103,30 @@ func (c *client) publishSync(
common.Throw(rt, ErrClient)
return ErrClient
}

// check timeout value
timeoutValue, err := safeUintToInt64(timeout)
if err != nil {
rt := c.vu.Runtime()
common.Throw(rt, ErrTimeoutToLong)
return ErrTimeoutToLong
}

token := c.pahoClient.Publish(topic, byte(qos), retain, message)
// sync case
if !token.WaitTimeout(time.Duration(timeout) * time.Millisecond) {
if !token.WaitTimeout(time.Duration(timeoutValue) * time.Millisecond) {
rt := c.vu.Runtime()
common.Throw(rt, ErrTimeout)
return ErrTimeout
}
if err := token.Error(); err != nil {

if err = token.Error(); err != nil {
rt := c.vu.Runtime()
common.Throw(rt, ErrPublish)
return ErrPublish
}
err := c.publishMessageMetric(float64(len(message)))

err = c.publishMessageMetric(float64(len(message)))
if err != nil {
return err
}
Expand Down
14 changes: 11 additions & 3 deletions subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,22 @@ func (c *client) Subscribe(
common.Throw(rt, ErrClient)
return ErrClient
}

// check timeout value
timeoutValue, err := safeUintToInt64(timeout)
if err != nil {
common.Throw(rt, ErrTimeoutToLong)
return ErrTimeoutToLong
}

c.messageChan = make(chan paho.Message)
messageCB := func(_ paho.Client, msg paho.Message) {
go func(msg paho.Message) {
c.messageChan <- msg
}(msg)
}
token := c.pahoClient.Subscribe(topic, byte(qos), messageCB)
if !token.WaitTimeout(time.Duration(timeout) * time.Millisecond) {
if !token.WaitTimeout(time.Duration(timeoutValue) * time.Millisecond) {
common.Throw(rt, ErrTimeout)
return ErrTimeout
}
Expand All @@ -47,7 +55,7 @@ func (c *client) Subscribe(
}
}
c.tq = taskqueue.New(registerCallback)
go c.loop(c.messageChan, timeout)
go c.loop(c.messageChan, timeoutValue)
return nil
}

Expand Down Expand Up @@ -77,7 +85,7 @@ func (c *client) receiveMessageMetric(msgLen float64) error {
}

//nolint:gocognit // todo improve this
func (c *client) loop(messageChan <-chan paho.Message, timeout uint) {
func (c *client) loop(messageChan <-chan paho.Message, timeout int64) {
ctx := c.vu.Context()
stop := make(chan struct{})
defer c.tq.Close()
Expand Down

0 comments on commit c71f6fe

Please sign in to comment.