Skip to content

Commit

Permalink
Merge pull request #43 from absolute8511/rocksdb-optimize
Browse files Browse the repository at this point in the history
Rocksdb optimize option support config
  • Loading branch information
absolute8511 authored Apr 24, 2018
2 parents 061a624 + a7f7e16 commit ea0efcd
Show file tree
Hide file tree
Showing 13 changed files with 286 additions and 33 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ install:
- export CXX="g++-4.9" CC="gcc-4.9"
- sudo apt-get install libsnappy1 libsnappy-dev
- git clone https://github.com/absolute8511/rocksdb.git /tmp/rocksdb
- pushd /tmp/rocksdb && USE_SSE=1 make static_lib && popd
- pushd /tmp/rocksdb && git checkout v5.8.8-share-rate-limiter && USE_SSE=1 make static_lib && popd
script:
- CGO_CFLAGS="-I/tmp/rocksdb/include" CGO_LDFLAGS="-L/tmp/rocksdb -lrocksdb -lstdc++ -lm -lsnappy -lrt" go get github.com/absolute8511/gorocksdb
- CGO_CFLAGS="-I/tmp/rocksdb/include" CGO_LDFLAGS="-L/tmp/rocksdb -lrocksdb -lstdc++ -lm -lsnappy -lrt" go install -race github.com/absolute8511/gorocksdb
Expand Down
5 changes: 4 additions & 1 deletion default.conf
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"verify_read_checksum": false,
"block_size": 0,
"block_cache":0,
"use_shared_cache": true,
"cache_index_and_filter_blocks": false,
"write_buffer_size": 0,
"max_write_buffer_number": 0,
Expand All @@ -28,7 +29,9 @@
"max_background_compactions":0,
"min_level_to_compress":0,
"max_manifest_file_size":0,
"rate_bytes_per_sec":0
"adjust_thread_pool": true,
"use_shared_rate_limiter": true,
"rate_bytes_per_sec":30000000
}
}
}
25 changes: 13 additions & 12 deletions node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,19 @@ type RaftGroupConfig struct {

type MachineConfig struct {
// server node id
NodeID uint64 `json:"node_id"`
BroadcastAddr string `json:"broadcast_addr"`
HttpAPIPort int `json:"http_api_port"`
LocalRaftAddr string `json:"local_raft_addr"`
DataRootDir string `json:"data_root_dir"`
ElectionTick int `json:"election_tick"`
TickMs int `json:"tick_ms"`
KeepWAL int `json:"keep_wal"`
LearnerRole string `json:"learner_role"`
RemoteSyncCluster string `json:"remote_sync_cluster"`
StateMachineType string `json:"state_machine_type"`
RocksDBOpts rockredis.RockOptions `json:"rocksdb_opts"`
NodeID uint64 `json:"node_id"`
BroadcastAddr string `json:"broadcast_addr"`
HttpAPIPort int `json:"http_api_port"`
LocalRaftAddr string `json:"local_raft_addr"`
DataRootDir string `json:"data_root_dir"`
ElectionTick int `json:"election_tick"`
TickMs int `json:"tick_ms"`
KeepWAL int `json:"keep_wal"`
LearnerRole string `json:"learner_role"`
RemoteSyncCluster string `json:"remote_sync_cluster"`
StateMachineType string `json:"state_machine_type"`
RocksDBOpts rockredis.RockOptions `json:"rocksdb_opts"`
RocksDBSharedConfig *rockredis.SharedRockConfig
}

type ReplicaInfo struct {
Expand Down
2 changes: 2 additions & 0 deletions node/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type KVOptions struct {
EngType string
ExpirationPolicy common.ExpirationPolicy
RockOpts rockredis.RockOptions
SharedConfig *rockredis.SharedRockConfig
}

func NewKVStore(kvopts *KVOptions) (*KVStore, error) {
Expand All @@ -40,6 +41,7 @@ func (s *KVStore) openDB() error {
cfg.DataDir = s.opts.DataDir
cfg.RockOptions = s.opts.RockOpts
cfg.ExpirationPolicy = s.opts.ExpirationPolicy
cfg.SharedConfig = s.opts.SharedConfig
s.RockDB, err = rockredis.OpenRockDB(cfg)
if err != nil {
nodeLog.Warningf("failed to open rocksdb: %v", err)
Expand Down
40 changes: 40 additions & 0 deletions node/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,9 @@ func (nsm *NamespaceMgr) Stop() {
}
nsm.wg.Wait()
nodeLog.Infof("namespace manager stopped")
if nsm.machineConf.RocksDBSharedConfig != nil {
nsm.machineConf.RocksDBSharedConfig.Destroy()
}
}

func (nsm *NamespaceMgr) IsAllRecoveryDone() bool {
Expand Down Expand Up @@ -283,6 +286,7 @@ func (nsm *NamespaceMgr) InitNamespaceNode(conf *NamespaceConfig, raftID uint64,
EngType: conf.EngType,
RockOpts: nsm.machineConf.RocksDBOpts,
ExpirationPolicy: expPolicy,
SharedConfig: nsm.machineConf.RocksDBSharedConfig,
}
rockredis.FillDefaultOptions(&kvOpts.RockOpts)

Expand Down Expand Up @@ -633,3 +637,39 @@ func (nsm *NamespaceMgr) clearUnusedRaftPeer() {
}
}
}

type PerfReport struct {
Group string
Report string
}

func (nsm *NamespaceMgr) RunPerf(leaderOnly bool, level int, rt int) map[string]string {
nsm.mutex.RLock()
nodes := make([]*KVNode, 0, len(nsm.kvNodes))
for _, n := range nsm.kvNodes {
if !n.IsReady() {
continue
}
if leaderOnly && !n.Node.IsLead() {
continue
}
nodes = append(nodes, n.Node)
}
nsm.mutex.RUnlock()
reportCh := make(chan PerfReport, 1)
nsStats := make(map[string]string, len(nsm.kvNodes))
for _, n := range nodes {
go func(kv *KVNode) {
perfReport := kv.RunPerf(level, rt)
reportCh <- PerfReport{Group: kv.ns, Report: perfReport}
}(n)
}
for {
r := <-reportCh
nsStats[r.Group] = r.Report
if len(nsStats) >= len(nodes) {
break
}
}
return nsStats
}
19 changes: 17 additions & 2 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,16 @@ func (nd *KVNode) GetLocalMemberInfo() *common.MemberInfo {
return &m
}

// run perf for rt seconds and return perf result string
func (nd *KVNode) RunPerf(level int, rt int) string {
if s, ok := nd.sm.(*kvStoreSM); ok {
report := s.store.RunPerf(level, rt)
nd.rn.Infof("perf: %v", report)
return report
}
return ""
}

func (nd *KVNode) GetDBInternalStats() string {
if s, ok := nd.sm.(*kvStoreSM); ok {
return s.store.GetStatistics()
Expand Down Expand Up @@ -434,7 +444,7 @@ func (nd *KVNode) handleProposeReq() {
cancel()
cost = time.Now().UnixNano() - start
}
if cost >= int64(proposeTimeout.Nanoseconds())/2 {
if cost >= int64(time.Second.Nanoseconds())/2 {
nd.rn.Infof("slow for batch propose: %v, cost %v", len(reqList.Reqs), cost)
}
for i := range reqList.Reqs {
Expand Down Expand Up @@ -572,12 +582,17 @@ func (nd *KVNode) queueRequest(req *internalReq) (interface{}, error) {
err = common.ErrStopped
}
if req.reqData.Header.DataType == int32(RedisReq) {
nd.clusterWriteStats.UpdateWriteStats(int64(len(req.reqData.Data)), time.Since(start).Nanoseconds()/1000)
cost := time.Since(start)
nd.clusterWriteStats.UpdateWriteStats(int64(len(req.reqData.Data)), cost.Nanoseconds()/1000)
if err == nil && !nd.IsWriteReady() {
nd.rn.Infof("write request %v on raft success but raft member is less than replicator",
req.reqData.String())
return nil, errRaftNotReadyForWrite
}
if cost >= time.Second {
nd.rn.Infof("write request %v slow cost: %v",
req.reqData.String(), cost)
}
}
return rsp, err
}
Expand Down
6 changes: 5 additions & 1 deletion node/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -826,13 +826,17 @@ func (rc *raftNode) serveChannels() {
return
}
if rd.SoftState != nil {
isMeNewLeader = rd.RaftState == raft.StateLeader
isMeNewLeader = (rd.RaftState == raft.StateLeader)
oldLead := atomic.LoadUint64(&rc.lead)
isMeLosingLeader := (oldLead == uint64(rc.config.ID)) && !isMeNewLeader
if rd.SoftState.Lead != raft.None && oldLead != rd.SoftState.Lead {
rc.Infof("leader changed from %v to %v", oldLead, rd.SoftState)
atomic.StoreInt64(&rc.lastLeaderChangedTs, time.Now().UnixNano())
}
if rd.SoftState.Lead == raft.None && oldLead != raft.None {
// TODO: handle proposal drop if leader is lost
//rc.triggerLeaderLost()
}
if isMeNewLeader || isMeLosingLeader {
rc.triggerLeaderChanged()
}
Expand Down
6 changes: 3 additions & 3 deletions node/state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ func (kvsm *kvStoreSM) ApplyRaftRequest(isReplaying bool, reqList BatchInternalR
}

cost := time.Since(start)
if cost >= proposeTimeout/2 {
if cost >= time.Second {
kvsm.Infof("slow for batch write db: %v, cost %v", len(reqList.Reqs), cost)
}
// used for grpc raft proposal, will notify that all the raft logs in this batch is done.
Expand Down Expand Up @@ -651,7 +651,7 @@ func (kvsm *kvStoreSM) processBatching(cmdName string, reqList BatchInternalRaft
err := kvsm.store.CommitBatchWrite()
dupCheckMap = make(map[string]bool, len(reqList.Reqs))
batchCost := time.Since(batchStart)
if nodeLog.Level() >= common.LOG_DETAIL {
if nodeLog.Level() > common.LOG_DETAIL {
kvsm.Infof("batching command number: %v", len(batchReqIDList))
}
// write the future response or error
Expand All @@ -663,7 +663,7 @@ func (kvsm *kvStoreSM) processBatching(cmdName string, reqList BatchInternalRaft
}
}
if batchCost > dbWriteSlow || (nodeLog.Level() >= common.LOG_DEBUG && batchCost > dbWriteSlow/2) {
kvsm.Infof("slow batch write command: %v, batch: %v, cost: %v",
kvsm.Infof("slow batch write db, command: %v, batch: %v, cost: %v",
cmdName, len(batchReqIDList), batchCost)
}
if len(batchReqIDList) > 0 {
Expand Down
Loading

0 comments on commit ea0efcd

Please sign in to comment.