Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use promises.New from k6 #35

Merged
merged 2 commits into from
Oct 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 41 additions & 63 deletions redis/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/redis/go-redis/v9"
"go.k6.io/k6/js/common"
"go.k6.io/k6/js/modules"
"go.k6.io/k6/js/promises"
"go.k6.io/k6/lib"
)

Expand All @@ -28,7 +29,7 @@ type Client struct {
//
// The value for `expiration` is interpreted as seconds.
func (c *Client) Set(key string, value interface{}, expiration int) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand Down Expand Up @@ -59,7 +60,7 @@ func (c *Client) Set(key string, value interface{}, expiration int) *sobek.Promi
//
// If the key does not exist, the promise is rejected with an error.
func (c *Client) Get(key string) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -83,7 +84,7 @@ func (c *Client) Get(key string) *sobek.Promise {
//
// If the provided value is not a supported type, the promise is rejected with an error.
func (c *Client) GetSet(key string, value interface{}) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -110,7 +111,7 @@ func (c *Client) GetSet(key string, value interface{}) *sobek.Promise {

// Del removes the specified keys. A key is ignored if it does not exist
func (c *Client) Del(keys ...string) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -134,7 +135,7 @@ func (c *Client) Del(keys ...string) *sobek.Promise {
//
// If the key does not exist, the promise is rejected with an error.
func (c *Client) GetDel(key string) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -158,7 +159,7 @@ func (c *Client) GetDel(key string) *sobek.Promise {
// Note that if the same existing key is mentioned in the argument
// multiple times, it will be counted multiple times.
func (c *Client) Exists(keys ...string) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -183,7 +184,7 @@ func (c *Client) Exists(keys ...string) *sobek.Promise {
// error is returned if the key contains a value of the wrong type, or
// contains a string that cannot be represented as an integer.
func (c *Client) Incr(key string) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -208,7 +209,7 @@ func (c *Client) Incr(key string) *sobek.Promise {
// error is returned if the key contains a value of the wrong type, or
// contains a string that cannot be represented as an integer.
func (c *Client) IncrBy(key string, increment int64) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -233,7 +234,7 @@ func (c *Client) IncrBy(key string, increment int64) *sobek.Promise {
// error is returned if the key contains a value of the wrong type, or
// contains a string that cannot be represented as an integer.
func (c *Client) Decr(key string) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -258,7 +259,7 @@ func (c *Client) Decr(key string) *sobek.Promise {
// error is returned if the key contains a value of the wrong type, or
// contains a string that cannot be represented as an integer.
func (c *Client) DecrBy(key string, decrement int64) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -282,7 +283,7 @@ func (c *Client) DecrBy(key string, decrement int64) *sobek.Promise {
//
// If the database is empty, the promise is rejected with an error.
func (c *Client) RandomKey() *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -304,7 +305,7 @@ func (c *Client) RandomKey() *sobek.Promise {

// Mget returns the values associated with the specified keys.
func (c *Client) Mget(keys ...string) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -329,7 +330,7 @@ func (c *Client) Mget(keys ...string) *sobek.Promise {
// Note that calling Expire with a non-positive timeout will result in
// the key being deleted rather than expired.
func (c *Client) Expire(key string, seconds int) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -353,7 +354,7 @@ func (c *Client) Expire(key string, seconds int) *sobek.Promise {
//
//nolint:revive,stylecheck
func (c *Client) Ttl(key string) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -375,7 +376,7 @@ func (c *Client) Ttl(key string) *sobek.Promise {

// Persist removes the existing timeout on key.
func (c *Client) Persist(key string) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -400,7 +401,7 @@ func (c *Client) Persist(key string) *sobek.Promise {
// performing the push operations. When `key` holds a value that is not
// a list, and error is returned.
func (c *Client) Lpush(key string, values ...interface{}) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand Down Expand Up @@ -429,7 +430,7 @@ func (c *Client) Lpush(key string, values ...interface{}) *sobek.Promise {
// at `key`. If `key` does not exist, it is created as empty list before
// performing the push operations.
func (c *Client) Rpush(key string, values ...interface{}) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand Down Expand Up @@ -459,7 +460,7 @@ func (c *Client) Rpush(key string, values ...interface{}) *sobek.Promise {
// If the list does not exist, this command rejects the promise with an error.
func (c *Client) Lpop(key string) *sobek.Promise {
// TODO: redis supports indicating the amount of values to pop
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -484,7 +485,7 @@ func (c *Client) Lpop(key string) *sobek.Promise {
// If the list does not exist, this command rejects the promise with an error.
func (c *Client) Rpop(key string) *sobek.Promise {
// TODO: redis supports indicating the amount of values to pop
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -509,7 +510,7 @@ func (c *Client) Rpop(key string) *sobek.Promise {
// negative numbers, where they indicate offsets starting at the end of
// the list.
func (c *Client) Lrange(key string, start, stop int64) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -535,7 +536,7 @@ func (c *Client) Lrange(key string, start, stop int64) *sobek.Promise {
//
// If the list does not exist, this command rejects the promise with an error.
func (c *Client) Lindex(key string, index int64) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -559,7 +560,7 @@ func (c *Client) Lindex(key string, index int64) *sobek.Promise {
//
// If the list does not exist, this command rejects the promise with an error.
func (c *Client) Lset(key string, index int64, element string) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -586,7 +587,7 @@ func (c *Client) Lset(key string, index int64, element string) *sobek.Promise {
//
// If the list does not exist, this command rejects the promise with an error.
func (c *Client) Lrem(key string, count int64, value string) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -611,7 +612,7 @@ func (c *Client) Lrem(key string, count int64, value string) *sobek.Promise {
//
// If the list does not exist, this command rejects the promise with an error.
func (c *Client) Llen(key string) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -637,7 +638,7 @@ func (c *Client) Llen(key string) *sobek.Promise {
//
// If the hash does not exist, this command rejects the promise with an error.
func (c *Client) Hset(key string, field string, value interface{}) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand Down Expand Up @@ -667,7 +668,7 @@ func (c *Client) Hset(key string, field string, value interface{}) *sobek.Promis
// holding a hash is created. If `field` already exists, this operation
// has no effect.
func (c *Client) Hsetnx(key, field, value string) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -691,7 +692,7 @@ func (c *Client) Hsetnx(key, field, value string) *sobek.Promise {
//
// If the hash does not exist, this command rejects the promise with an error.
func (c *Client) Hget(key, field string) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -713,7 +714,7 @@ func (c *Client) Hget(key, field string) *sobek.Promise {

// Hdel deletes the specified fields from the hash stored at `key`.
func (c *Client) Hdel(key string, fields ...string) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -737,7 +738,7 @@ func (c *Client) Hdel(key string, fields ...string) *sobek.Promise {
//
// If the hash does not exist, this command rejects the promise with an error.
func (c *Client) Hgetall(key string) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -761,7 +762,7 @@ func (c *Client) Hgetall(key string) *sobek.Promise {
//
// If the hash does not exist, this command rejects the promise with an error.
func (c *Client) Hkeys(key string) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -785,7 +786,7 @@ func (c *Client) Hkeys(key string) *sobek.Promise {
//
// If the hash does not exist, this command rejects the promise with an error.
func (c *Client) Hvals(key string) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -809,7 +810,7 @@ func (c *Client) Hvals(key string) *sobek.Promise {
//
// If the hash does not exist, this command rejects the promise with an error.
func (c *Client) Hlen(key string) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -834,7 +835,7 @@ func (c *Client) Hlen(key string) *sobek.Promise {
// If `field` does not exist the value is set to 0 before the operation is
// set to 0 before the operation is performed.
func (c *Client) Hincrby(key, field string, increment int64) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -858,7 +859,7 @@ func (c *Client) Hincrby(key, field string, increment int64) *sobek.Promise {
// Specified members that are already a member of this set are ignored.
// If key does not exist, a new set is created before adding the specified members.
func (c *Client) Sadd(key string, members ...interface{}) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand Down Expand Up @@ -887,7 +888,7 @@ func (c *Client) Sadd(key string, members ...interface{}) *sobek.Promise {
// Specified members that are not a member of this set are ignored.
// If key does not exist, it is treated as an empty set and this command returns 0.
func (c *Client) Srem(key string, members ...interface{}) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -914,7 +915,7 @@ func (c *Client) Srem(key string, members ...interface{}) *sobek.Promise {

// Sismember returns if member is a member of the set stored at key.
func (c *Client) Sismember(key string, member interface{}) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -941,7 +942,7 @@ func (c *Client) Sismember(key string, member interface{}) *sobek.Promise {

// Smembers returns all members of the set stored at key.
func (c *Client) Smembers(key string) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -965,7 +966,7 @@ func (c *Client) Smembers(key string) *sobek.Promise {
//
// If the set does not exist, the promise is rejected with an error.
func (c *Client) Srandmember(key string) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -989,7 +990,7 @@ func (c *Client) Srandmember(key string) *sobek.Promise {
//
// If the set does not exist, the promise is rejected with an error.
func (c *Client) Spop(key string) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -1015,7 +1016,7 @@ func (c *Client) SendCommand(command string, args ...interface{}) *sobek.Promise
doArgs = append(doArgs, command)
doArgs = append(doArgs, args...)

promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -1040,29 +1041,6 @@ func (c *Client) SendCommand(command string, args ...interface{}) *sobek.Promise
return promise
}

// makeHandledPromise will create a promise and return its resolve and reject methods,
// wrapped in such a way that it will block the eventloop from exiting before they are
// called even if the promise isn't resolved by the time the current script ends executing.
func (c *Client) makeHandledPromise() (*sobek.Promise, func(interface{}), func(interface{})) {
runtime := c.vu.Runtime()
callback := c.vu.RegisterCallback()
p, resolve, reject := runtime.NewPromise()

return p, func(i interface{}) {
// more stuff
callback(func() error {
resolve(i)
return nil
})
}, func(i interface{}) {
// more stuff
callback(func() error {
reject(i)
return nil
})
}
}

// connect establishes the client's connection to the target
// redis instance(s).
func (c *Client) connect() error {
Expand Down
Loading