Skip to content

Commit

Permalink
Merge pull request #49 from absolute8511/support-spop
Browse files Browse the repository at this point in the history
Support spop
  • Loading branch information
absolute8511 authored May 15, 2018
2 parents b85f64a + ae9f56a commit 84b12a6
Show file tree
Hide file tree
Showing 8 changed files with 181 additions and 4 deletions.
3 changes: 3 additions & 0 deletions node/node_cmd_reg.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func (kvsm *kvStoreSM) registerHandlers() {
kvsm.router.RegisterInternal("srem", kvsm.localSrem)
kvsm.router.RegisterInternal("sclear", kvsm.localSclear)
kvsm.router.RegisterInternal("smclear", kvsm.localSmclear)
kvsm.router.RegisterInternal("spop", kvsm.localSpop)
// expire
kvsm.router.RegisterInternal("setex", kvsm.localSetexCommand)
kvsm.router.RegisterInternal("expire", kvsm.localExpireCommand)
Expand Down Expand Up @@ -141,6 +142,7 @@ func (nd *KVNode) registerHandler() {
nd.router.Register(false, "scard", wrapReadCommandK(nd.scardCommand))
nd.router.Register(false, "sismember", wrapReadCommandKSubkey(nd.sismemberCommand))
nd.router.Register(false, "smembers", wrapReadCommandK(nd.smembersCommand))
nd.router.Register(true, "spop", nd.spopCommand)
nd.router.Register(true, "sadd", wrapWriteCommandKSubkeySubkey(nd, nd.saddCommand))
nd.router.Register(true, "srem", wrapWriteCommandKSubkeySubkey(nd, nd.sremCommand))
nd.router.Register(true, "sclear", wrapWriteCommandK(nd, nd.sclearCommand))
Expand Down Expand Up @@ -224,6 +226,7 @@ func (kvsm *kvStoreSM) registerConflictHandlers() {
// set
kvsm.cRouter.Register("sadd", kvsm.checkSetConflict)
kvsm.cRouter.Register("srem", kvsm.checkSetConflict)
kvsm.cRouter.Register("spop", kvsm.checkSetConflict)
// expire
kvsm.cRouter.Register("setex", kvsm.checkKVConflict)
kvsm.cRouter.Register("expire", kvsm.checkKVConflict)
Expand Down
69 changes: 69 additions & 0 deletions node/set.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package node

import (
"strconv"

"github.com/absolute8511/redcon"
)

Expand Down Expand Up @@ -44,6 +46,55 @@ func (nd *KVNode) saddCommand(conn redcon.Conn, cmd redcon.Command, v interface{
}
}

func (nd *KVNode) spopCommand(conn redcon.Conn, cmd redcon.Command) {
if len(cmd.Args) != 2 && len(cmd.Args) != 3 {
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
return
}
hasCount := len(cmd.Args) == 3
if hasCount {
cnt, err := strconv.Atoi(string(cmd.Args[2]))
if err != nil {
conn.WriteError(err.Error())
return
}
if cnt < 1 {
conn.WriteError("Invalid count")
return
}
}
_, v, ok := rebuildFirstKeyAndPropose(nd, conn, cmd)
if !ok {
return
}

// without the count argument, it is bulk string
if !hasCount {
if v == nil {
conn.WriteNull()
return
}
if rsp, ok := v.(string); ok {
conn.WriteBulkString(string(rsp[0]))
return
}
if !ok {
conn.WriteError("Invalid response type")
return
}
} else {
rsp, ok := v.([][]byte)
if !ok {
conn.WriteError("Invalid response type")
return
}
conn.WriteArray(len(rsp))
for _, d := range rsp {
conn.WriteBulk(d)
}
}
}

func (nd *KVNode) sremCommand(conn redcon.Conn, cmd redcon.Command, v interface{}) {
if rsp, ok := v.(int64); ok {
conn.WriteInt64(rsp)
Expand Down Expand Up @@ -76,6 +127,24 @@ func (kvsm *kvStoreSM) localSrem(cmd redcon.Command, ts int64) (interface{}, err
return kvsm.store.SRem(ts, cmd.Args[1], cmd.Args[2:]...)
}

func (kvsm *kvStoreSM) localSpop(cmd redcon.Command, ts int64) (interface{}, error) {
cnt := 1
if len(cmd.Args) == 3 {
cnt, _ = strconv.Atoi(string(cmd.Args[2]))
}
vals, err := kvsm.store.SPop(ts, cmd.Args[1], cnt)
if err != nil {
return nil, err
}
if len(cmd.Args) == 3 {
return vals, nil
}
if len(vals) > 0 {
return string(vals[0]), nil
}
return nil, nil
}

func (kvsm *kvStoreSM) localSclear(cmd redcon.Command, ts int64) (interface{}, error) {
return kvsm.store.SClear(cmd.Args[1])
}
Expand Down
2 changes: 1 addition & 1 deletion node/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestKVNode_setCommand(t *testing.T) {
{"sismember", buildCommand([][]byte{[]byte("sismember"), testKey, testMember})},
{"smembers", buildCommand([][]byte{[]byte("smembers"), testKey})},
{"scard", buildCommand([][]byte{[]byte("scard"), testKey})},
{"srem", buildCommand([][]byte{[]byte("srem"), testKey, testMember})},
{"spop", buildCommand([][]byte{[]byte("spop"), testKey})},
{"srem", buildCommand([][]byte{[]byte("srem"), testKey, testMember})},
{"sclear", buildCommand([][]byte{[]byte("sclear"), testKey})},
}
Expand Down
9 changes: 8 additions & 1 deletion rockredis/rockredis.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type RockOptions struct {
UseSharedCache bool `json:"use_shared_cache,omitempty"`
UseSharedRateLimiter bool `json:"use_shared_rate_limiter,omitempty"`
DisableWAL bool `json:"disable_wal,omitempty"`
DisableMergeCounter bool `json:"disable_merge_counter,omitempty"`
}

func FillDefaultOptions(opts *RockOptions) {
Expand Down Expand Up @@ -362,7 +363,13 @@ func OpenRockDB(cfg *RockConfig) (*RockDB, error) {
opts.SetMaxManifestFileSize(cfg.MaxMainifestFileSize)
// https://github.com/facebook/mysql-5.6/wiki/my.cnf-tuning
// rate limiter need to reduce the compaction io
opts.SetUint64AddMergeOperator()
if !cfg.DisableMergeCounter {
if cfg.EnableTableCounter {
opts.SetUint64AddMergeOperator()
}
} else {
cfg.EnableTableCounter = false
}

db := &RockDB{
cfg: cfg,
Expand Down
1 change: 1 addition & 0 deletions rockredis/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ func (db *RockDB) hScanGeneric(key []byte, cursor []byte, count int, match strin
if err != nil {
return nil, err
}
it.NoTimestamp(HashType)
defer it.Close()

for i := 0; it.Valid() && i < count; it.Next() {
Expand Down
21 changes: 19 additions & 2 deletions rockredis/t_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,11 @@ func (db *RockDB) SMembers(key []byte) ([][]byte, error) {
if err != nil {
return nil, err
}

return db.sMembersN(key, int(num))
}

func (db *RockDB) sMembersN(key []byte, num int) ([][]byte, error) {
if num > MAX_BATCH_NUM {
return nil, errTooMuchBatchSize
}
Expand All @@ -344,7 +349,7 @@ func (db *RockDB) SMembers(key []byte) ([][]byte, error) {
start := sEncodeStartKey(table, rk)
stop := sEncodeStopKey(table, rk)

v := make([][]byte, 0, 16)
v := make([][]byte, 0, num)

it, err := NewDBRangeIterator(db.eng, start, stop, common.RangeROpen, false)
if err != nil {
Expand All @@ -357,11 +362,23 @@ func (db *RockDB) SMembers(key []byte) ([][]byte, error) {
return nil, err
}
v = append(v, m)
if len(v) >= num {
break
}
}

return v, nil
}

func (db *RockDB) SPop(ts int64, key []byte, count int) ([][]byte, error) {
vals, err := db.sMembersN(key, count)
if err != nil {
return nil, err
}

_, err = db.SRem(ts, key, vals...)
return vals, err
}

func (db *RockDB) SRem(ts int64, key []byte, args ...[]byte) (int64, error) {
table, rk, _ := extractTableFromRedisKey(key)
if len(table) == 0 {
Expand Down
35 changes: 35 additions & 0 deletions rockredis/t_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,3 +168,38 @@ func TestSKeyExists(t *testing.T) {
}

}

func TestDBSPop(t *testing.T) {
db := getTestDB(t)
defer os.RemoveAll(db.cfg.DataDir)
defer db.Close()
key := []byte("test:spop_test")
if vals, err := db.SPop(0, key, 1); err != nil {
t.Fatal(err.Error())
} else if len(vals) != 0 {
t.Fatal("invalid value ", vals)
}

db.SAdd(0, key, []byte("hello"), []byte("world"), []byte("hello2"))

if vals, err := db.SPop(0, key, 1); err != nil {
t.Fatal(err.Error())
} else if len(vals) != 1 {
t.Fatal("invalid value ", vals)
}

if vals, err := db.SPop(0, key, 3); err != nil {
t.Fatal(err.Error())
} else if len(vals) != 2 {
t.Fatal("invalid value ", vals)
}

if vals, err := db.SPop(0, key, 1); err != nil {
t.Fatal(err.Error())
} else if len(vals) != 0 {
t.Fatal("invalid value ", vals)
}
if vals, _ := db.SMembers(key); len(vals) != 0 {
t.Errorf("should empty set")
}
}
45 changes: 45 additions & 0 deletions server/redis_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1472,6 +1472,44 @@ func TestSet(t *testing.T) {
} else if len(n) != 4 {
t.Fatal(n)
}
if val, err := goredis.String(c.Do("spop", key2)); err != nil {
t.Fatal(err)
} else if val != "0" {
t.Fatal(val)
}
if val, err := goredis.String(c.Do("spop", key2)); err != nil {
t.Fatal(err)
} else if val != "1" {
t.Fatal(val)
}
if val, err := goredis.Values(c.Do("spop", key2, 4)); err != nil {
t.Fatal(err)
} else if len(val) != 2 {
t.Fatal(val)
}
if n, err := goredis.Values(c.Do("smembers", key2)); err != nil {
t.Fatal(err)
} else if len(n) != 0 {
t.Fatal(n)
}
// empty spop single will return nil, but spop multi will return empty array
if val, err := c.Do("spop", key2); err != nil {
t.Fatal(err)
} else if val != nil {
t.Fatal(val)
}
if val, err := goredis.Values(c.Do("spop", key2, 2)); err != nil {
t.Fatal(err)
} else if val == nil {
t.Fatal(val)
} else if len(val) != 0 {
t.Fatal(val)
}
if n, err := goredis.Int(c.Do("sadd", key2, 0, 1, 2, 3)); err != nil {
t.Fatal(err)
} else if n != 4 {
t.Fatal(n)
}

if n, err := goredis.Int(c.Do("sclear", key2)); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -1513,6 +1551,13 @@ func TestSetErrorParams(t *testing.T) {
t.Fatalf("invalid err of %v", err)
}

if _, err := c.Do("spop"); err == nil {
t.Fatalf("invalid err of %v", err)
}
if _, err := c.Do("spop", key, "0"); err == nil {
t.Fatalf("invalid err of %v", err)
}

if _, err := c.Do("srem"); err == nil {
t.Fatalf("invalid err of %v", err)
}
Expand Down

0 comments on commit 84b12a6

Please sign in to comment.