Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HyperLogLog #30

Merged
merged 6 commits into from
Nov 28, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Godeps
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ golang.org/x/sys
github.com/stretchr/testify/assert
github.com/tidwall/gjson
github.com/tidwall/sjson
github.com/absolute8511/hyperloglog
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@ based on this golang sdk if you want use the redis client in other language.
- [x] List
- [x] Set
- [x] Sorted Set
- [ ] Geo
- [x] GeoHash
- [x] Expires
- [ ] HyperLogLog
- [x] HyperLogLog
- [x] JSON
* Distributed system
- [x] Raft based replication
Expand Down
1 change: 1 addition & 0 deletions default.conf
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"profile_port": 6666,
"election_tick": 30,
"tick_ms": 200,
"keep_wal": 20,
"local_raft_addr":"http://0.0.0.0:12379",
"tags": {"ssd":"", "dc_info":"dc1"},
"rocksdb_opts": {
Expand Down
File renamed without changes.
File renamed without changes.
31 changes: 21 additions & 10 deletions doc/getting_started.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,25 @@

## Build

Install the compress library
<pre>
yum install snappy-devel (for CentOS)
apt-get install libsnappy1 libsnappy-dev (for Debian/Ubuntu)
brew install snappy (for Mac)
</pre>

Build the rocksdb
<pre>
git clone https://github.com/absolute8511/rocksdb.git
cd rocksdb
USE_SSE=1 make static_lib
</pre>

Install the dependency:
<pre>
go get github.com/absolute8511/c-rocksdb
go get github.com/absolute8511/gorocksdb
go get github.com/cockroachdb/c-lz4
go get github.com/cockroachdb/c-snappy
CGO_CFLAGS="-I/path/to/rocksdb/include" CGO_LDFLAGS="-L/path/to/rocksdb -lrocksdb -lstdc++ -lm -lsnappy -lrt" go get github.com/absolute8511/gorocksdb

CGO_CFLAGS="-I/path/to/rocksdb/include" CGO_LDFLAGS="-L/path/to/rocksdb -lrocksdb -lstdc++ -lm -lsnappy" go get github.com/absolute8511/gorocksdb (for MacOS)
</pre>

use the `gpm` to install other dependencies
Expand All @@ -16,12 +29,6 @@ wget https://raw.githubusercontent.com/pote/gpm/v1.4.0/bin/gpm && chmod +x gpm &
gpm get
</pre>

Install the Snappy library
<pre>
yum install snappy-devel (for CentOS)
apt-get install libsnappy-dev (for Debian/Ubuntu)
brew install snappy (for Mac)
</pre>

Build zankv and placedriver from the source (only support go version 1.7.4+, gcc 4.9+ or xcode-command-line-tools on Mac):
<pre>
Expand All @@ -42,7 +49,9 @@ If you want package the binary release run the scripts
* Deploy the zankv for data storage server `zankv -config=/path/to/config`

## API

placedriver has several HTTP APIs to manager the namespace

* list the namespace: `GET /namespaces`
* list the data nodes: `GET /datanodes`
* list the placedriver nodes: `GET /listpd`
Expand All @@ -51,12 +60,14 @@ placedriver has several HTTP APIs to manager the namespace
* delete the namespace (handle only by leader): `POST /cluster/namespace/delete?namespace=test_p16&partition=**`

storage server HTTP APIs for stats:

* namespace stats : `GET /stats`
* namespace raft internal stats : `GET /raft/stats`
* optimize the data storage : `POST /kv/optimize`
* get the raft leader of the namespace partition: `GET /cluster/leader/namespace-partition`

storage server also support the redis apis for read/write :

* KV:
* Hash Set:
* List:
Expand Down
4 changes: 2 additions & 2 deletions doc/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
- [x] List
- [x] Set
- [x] Sorted Set
- [ ] Geo
- [x] GeoHash
- [x] Expires
- [ ] HyperLogLog
- [x] HyperLogLog
- [x] JSON
* Distributed system
- [x] Raft based replication
Expand Down
2 changes: 2 additions & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,6 @@ pages:
- 'Introduction': 'index.md'
- 'Getting Started' : 'getting_started.md'
- 'Design' : 'design.md'
- 'full scan' : 'design/fullscan.md'
- 'source analyse' : 'design/source_analyse.md'
- 'Examples' : 'examples.md'
2 changes: 2 additions & 0 deletions node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type MachineConfig struct {
DataRootDir string `json:"data_root_dir"`
ElectionTick int `json:"election_tick"`
TickMs int `json:"tick_ms"`
KeepWAL int `json:"keep_wal"`
RocksDBOpts rockredis.RockOptions `json:"rocksdb_opts"`
}

Expand All @@ -64,6 +65,7 @@ type RaftConfig struct {
RaftAddr string `json:"raft_addr"`
DataDir string `json:"data_dir"`
WALDir string `json:"wal_dir"`
KeepWAL int `json:"keep_wal"`
SnapDir string `json:"snap_dir"`
RaftPeers map[uint64]ReplicaInfo `json:"raft_peers"`
SnapCount int `json:"snap_count"`
Expand Down
28 changes: 28 additions & 0 deletions node/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,24 @@ func (nd *KVNode) delCommand(conn redcon.Conn, cmd redcon.Command, v interface{}
}
}

func (nd *KVNode) pfaddCommand(conn redcon.Conn, cmd redcon.Command, v interface{}) {
if rsp, ok := v.(int64); ok {
conn.WriteInt64(rsp)
} else {
conn.WriteError(errInvalidResponse.Error())
}
}

// current we restrict the pfcount to single key to avoid merge,
// since merge keys may across multi partitions on different nodes
func (nd *KVNode) pfcountCommand(conn redcon.Conn, cmd redcon.Command, v interface{}) {
if rsp, ok := v.(int64); ok {
conn.WriteInt64(rsp)
} else {
conn.WriteError(errInvalidResponse.Error())
}
}

// local write command execute only on follower or on the local commit of leader
// the return value of follower is ignored, return value of local leader will be
// return to the future response.
Expand Down Expand Up @@ -109,3 +127,13 @@ func (nd *KVNode) localDelCommand(cmd redcon.Command, ts int64) (interface{}, er
nd.store.DelKeys(cmd.Args[1:]...)
return int64(len(cmd.Args[1:])), nil
}

func (nd *KVNode) localPFCountCommand(cmd redcon.Command, ts int64) (interface{}, error) {
v, err := nd.store.PFCount(ts, cmd.Args[1:]...)
return v, err
}

func (nd *KVNode) localPFAddCommand(cmd redcon.Command, ts int64) (interface{}, error) {
v, err := nd.store.PFAdd(ts, cmd.Args[1], cmd.Args[2:]...)
return v, err
}
3 changes: 3 additions & 0 deletions node/keys_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func TestKVNode_kvCommand(t *testing.T) {
testKeyValue := []byte("1")
testKey2 := []byte("default:test:2")
testKey2Value := []byte("2")
testPFKey := []byte("default:test:pf1")
tests := []struct {
name string
args redcon.Command
Expand All @@ -165,6 +166,8 @@ func TestKVNode_kvCommand(t *testing.T) {
{"get", buildCommand([][]byte{[]byte("get"), testKey})},
{"mget", buildCommand([][]byte{[]byte("mget"), testKey, testKey2})},
{"exists", buildCommand([][]byte{[]byte("exists"), testKey})},
{"pfadd", buildCommand([][]byte{[]byte("pfadd"), testPFKey, testKeyValue})},
{"pfcount", buildCommand([][]byte{[]byte("pfcount"), testPFKey})},
}
defer os.RemoveAll(dataDir)
defer nd.Stop()
Expand Down
3 changes: 2 additions & 1 deletion node/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ var (
ErrNamespacePartitionNotFound = errors.New("ERR_CLUSTER_CHANGED: partition of the namespace is not found")
ErrNamespaceNotLeader = errors.New("ERR_CLUSTER_CHANGED: partition of the namespace is not leader on the node")
ErrNamespaceNoLeader = errors.New("ERR_CLUSTER_CHANGED: partition of the namespace has no leader")
ErrRaftGroupNotReady = errors.New("raft group not ready")
ErrRaftGroupNotReady = errors.New("ERR_CLUSTER_CHANGED: raft group not ready")
errNamespaceConfInvalid = errors.New("namespace config is invalid")
)

Expand Down Expand Up @@ -294,6 +294,7 @@ func (nsm *NamespaceMgr) InitNamespaceNode(conf *NamespaceConfig, raftID uint64,
SnapCatchup: conf.SnapCatchup,
Replicator: conf.Replicator,
OptimizedFsync: conf.OptimizedFsync,
KeepWAL: nsm.machineConf.KeepWAL,
}
kv, err := NewKVNode(kvOpts, nsm.machineConf, raftConf, nsm.raftTransport,
join, nsm.onNamespaceDeleted(raftConf.GroupID, conf.Name),
Expand Down
4 changes: 4 additions & 0 deletions node/node_cmd_reg.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ func (nd *KVNode) registerHandler() {
nd.router.Register(true, "del", wrapWriteCommandKK(nd, nd.delCommand))
nd.router.Register(false, "plget", nd.plgetCommand)
nd.router.Register(true, "plset", nd.plsetCommand)
nd.router.Register(true, "pfadd", wrapWriteCommandKAnySubkey(nd, nd.pfaddCommand, 0))
nd.router.Register(true, "pfcount", wrapWriteCommandK(nd, nd.pfcountCommand))
// for hash
nd.router.Register(false, "hget", wrapReadCommandKSubkey(nd.hgetCommand))
nd.router.Register(false, "hgetall", wrapReadCommandK(nd.hgetallCommand))
Expand Down Expand Up @@ -116,6 +118,8 @@ func (nd *KVNode) registerHandler() {
nd.router.RegisterInternal("mset", nd.localMSetCommand)
nd.router.RegisterInternal("incr", nd.localIncrCommand)
nd.router.RegisterInternal("plset", nd.localPlsetCommand)
nd.router.RegisterInternal("pfadd", nd.localPFAddCommand)
nd.router.RegisterInternal("pfcount", nd.localPFCountCommand)
// hash
nd.router.RegisterInternal("hset", nd.localHSetCommand)
nd.router.RegisterInternal("hsetnx", nd.localHSetNXCommand)
Expand Down
9 changes: 8 additions & 1 deletion node/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -993,9 +993,16 @@ func (rc *raftNode) purgeFile(done chan struct{}, stopC chan struct{}) {
rc.Infof("purge exit")
close(done)
}()
keep := rc.config.KeepWAL
if keep == 0 {
keep = 20
}
if keep < 10 {
keep = 10
}
var serrc, werrc <-chan error
serrc = fileutil.PurgeFile(rc.config.SnapDir, "snap", 10, time.Minute*10, rc.stopc)
werrc = fileutil.PurgeFile(rc.config.WALDir, "wal", 20, time.Minute*10, rc.stopc)
werrc = fileutil.PurgeFile(rc.config.WALDir, "wal", uint(keep), time.Minute*10, rc.stopc)
select {
case e := <-werrc:
rc.Infof("failed to purge wal file %v", e)
Expand Down
2 changes: 2 additions & 0 deletions pdserver/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,9 @@ func TestClusterSchemaAddIndex(t *testing.T) {
}

func TestClusterBalanceAcrossMultiDC(t *testing.T) {
// TODO:
}

func TestClusterRemoveNode(t *testing.T) {
// TODO:
}
5 changes: 2 additions & 3 deletions rockredis/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,7 @@ const (
SetType byte = 29
SSizeType byte = 30

JSONType byte = 31
HLLType byte = 32 // hyperloglog
GeoHashType byte = 33
JSONType byte = 31

ColumnType byte = 38 // used for column store for OLAP

Expand Down Expand Up @@ -74,6 +72,7 @@ var (
ZScoreType: "zscore",
SetType: "set",
SSizeType: "ssize",
JSONType: "json",
}
)

Expand Down
5 changes: 5 additions & 0 deletions rockredis/rockredis.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package rockredis
import (
"errors"
"fmt"
"hash"
"io"
"math"
"os"
Expand All @@ -15,6 +16,8 @@ import (
"sync/atomic"
"time"

"github.com/spaolacci/murmur3"

"github.com/absolute8511/ZanRedisDB/common"
"github.com/absolute8511/gorocksdb"
"github.com/shirou/gopsutil/mem"
Expand Down Expand Up @@ -203,6 +206,7 @@ type RockDB struct {
indexMgr *IndexMgr
isBatching int32
checkpointDirLock sync.Mutex
hasher64 hash.Hash64
}

func OpenRockDB(cfg *RockConfig) (*RockDB, error) {
Expand Down Expand Up @@ -269,6 +273,7 @@ func OpenRockDB(cfg *RockConfig) (*RockDB, error) {
wb: gorocksdb.NewWriteBatch(),
backupC: make(chan *BackupInfo),
quit: make(chan struct{}),
hasher64: murmur3.New64(),
}
eng, err := gorocksdb.OpenDb(opts, db.GetDataDir())
if err != nil {
Expand Down
Loading