Skip to content

Commit

Permalink
Merge pull request #46 from absolute8511/optimize-range-iterator
Browse files Browse the repository at this point in the history
Optimize range related performance
  • Loading branch information
absolute8511 authored May 3, 2018
2 parents 0d220bf + c8d2467 commit d4f8f59
Show file tree
Hide file tree
Showing 26 changed files with 921 additions and 408 deletions.
6 changes: 3 additions & 3 deletions cluster/pdnode_coord/pd_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,9 +662,10 @@ func (pdCoord *PDCoordinator) doCheckNamespaces(monitorChan chan struct{}, faile
currentNodes, currentNodesEpoch := pdCoord.getCurrentNodesWithRemoving()
cluster.CoordLog().Infof("do check namespaces (%v), current nodes: %v, ...", len(namespaces), len(currentNodes))
checkOK := true
fullReady := true
defer func() {
if checkOK {
if fullCheck {
if fullCheck && fullReady {
atomic.StoreInt32(&pdCoord.isClusterUnstable, 0)
pdCoord.doSchemaCheck()
}
Expand Down Expand Up @@ -776,8 +777,7 @@ func (pdCoord *PDCoordinator) doCheckNamespaces(monitorChan chan struct{}, faile
} else {
delete(partitions, nsInfo.Partition)
if ok, err := IsAllISRFullReady(&nsInfo); err != nil || !ok {
checkOK = false
atomic.StoreInt32(&pdCoord.isClusterUnstable, 1)
fullReady = false
cluster.CoordLog().Infof("namespace %v isr is not full ready", nsInfo.GetDesp())
continue
}
Expand Down
2 changes: 1 addition & 1 deletion cluster/pdnode_coord/place_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func IsRaftNodeFullReady(nsInfo *cluster.PartitionMetaInfo, nid string) (bool, e
}
_, err = common.APIRequest("GET",
"http://"+net.JoinHostPort(nip, httpPort)+common.APIIsRaftSynced+"/"+nsInfo.GetDesp(),
nil, time.Second*3, nil)
nil, time.Second*5, nil)
if err != nil {
cluster.CoordLog().Infof("failed (%v) to check sync state for namespace %v: %v", nip, nsInfo.GetDesp(), err)
return false, err
Expand Down
5 changes: 3 additions & 2 deletions common/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,9 @@ func (ws *WriteStats) Copy() *WriteStats {
}

type TableStats struct {
Name string `json:"name"`
KeyNum int64 `json:"key_num"`
Name string `json:"name"`
KeyNum int64 `json:"key_num"`
DiskBytesUsage int64 `json:"disk_bytes_usage"`
}

type NamespaceStats struct {
Expand Down
79 changes: 44 additions & 35 deletions node/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ var (
errNamespaceConfInvalid = errors.New("namespace config is invalid")
)

var perfLevel int32

type NamespaceNode struct {
Node *KVNode
conf *NamespaceConfig
Expand Down Expand Up @@ -494,10 +496,14 @@ func (nsm *NamespaceMgr) GetStats(leaderOnly bool) []common.NamespaceStats {
return nsStats
}

func (nsm *NamespaceMgr) OptimizeDB() {
func (nsm *NamespaceMgr) OptimizeDB(ns string, table string) {
nsm.mutex.RLock()
nodeList := make([]*NamespaceNode, 0, len(nsm.kvNodes))
for _, n := range nsm.kvNodes {
for k, n := range nsm.kvNodes {
baseName, _ := common.GetNamespaceAndPartition(k)
if ns != "" && ns != baseName {
continue
}
nodeList = append(nodeList, n)
}
nsm.mutex.RUnlock()
Expand All @@ -506,9 +512,34 @@ func (nsm *NamespaceMgr) OptimizeDB() {
return
}
if n.IsReady() {
n.Node.OptimizeDB()
n.Node.OptimizeDB(table)
}
}
}

func (nsm *NamespaceMgr) DeleteRange(ns string, dtr DeleteTableRange) error {
nsm.mutex.RLock()
nodeList := make([]*NamespaceNode, 0, len(nsm.kvNodes))
for k, n := range nsm.kvNodes {
baseName, _ := common.GetNamespaceAndPartition(k)
if ns != baseName {
continue
}
nodeList = append(nodeList, n)
}
nsm.mutex.RUnlock()
for _, n := range nodeList {
if atomic.LoadInt32(&nsm.stopping) == 1 {
return common.ErrStopped
}
if n.IsReady() {
err := n.Node.DeleteRange(dtr)
if err != nil {
return err
}
}
}
return nil
}

func (nsm *NamespaceMgr) onNamespaceDeleted(gid uint64, ns string) func() {
Expand Down Expand Up @@ -638,38 +669,16 @@ func (nsm *NamespaceMgr) clearUnusedRaftPeer() {
}
}

type PerfReport struct {
Group string
Report string
func SetPerfLevel(level int) {
atomic.StoreInt32(&perfLevel, int32(level))
rockredis.SetPerfLevel(level)
}

func (nsm *NamespaceMgr) RunPerf(leaderOnly bool, level int, rt int) map[string]string {
nsm.mutex.RLock()
nodes := make([]*KVNode, 0, len(nsm.kvNodes))
for _, n := range nsm.kvNodes {
if !n.IsReady() {
continue
}
if leaderOnly && !n.Node.IsLead() {
continue
}
nodes = append(nodes, n.Node)
}
nsm.mutex.RUnlock()
reportCh := make(chan PerfReport, 1)
nsStats := make(map[string]string, len(nsm.kvNodes))
for _, n := range nodes {
go func(kv *KVNode) {
perfReport := kv.RunPerf(level, rt)
reportCh <- PerfReport{Group: kv.ns, Report: perfReport}
}(n)
}
for {
r := <-reportCh
nsStats[r.Group] = r.Report
if len(nsStats) >= len(nodes) {
break
}
}
return nsStats
func IsPerfEnabled() bool {
lv := GetPerfLevel()
return rockredis.IsPerfEnabledLevel(lv)
}

func GetPerfLevel() int {
return int(atomic.LoadInt32(&perfLevel))
}
76 changes: 55 additions & 21 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,30 @@ const (
ProposeOp_ApplyRemoteSnap int = 3
ProposeOp_RemoteConfChange int = 4
ProposeOp_ApplySkippedRemoteSnap int = 5
ProposeOp_DeleteTable int = 6
)

type DeleteTableRange struct {
Table string `json:"table,omitempty"`
StartFrom []byte `json:"start_from,omitempty"`
EndTo []byte `json:"end_to,omitempty"`
// to avoid drop all table data, this is needed to delete all data in table
DeleteAll bool `json:"delete_all,omitempty"`
Dryrun bool `json:"dryrun,omitempty"`
}

func (dtr DeleteTableRange) CheckValid() error {
if dtr.Table == "" {
return errors.New("delete range must have table name")
}
if len(dtr.StartFrom) == 0 && len(dtr.EndTo) == 0 {
if !dtr.DeleteAll {
return errors.New("delete all must be true if deleting whole table")
}
}
return nil
}

type nodeProgress struct {
confState raftpb.ConfState
snapi uint64
Expand Down Expand Up @@ -214,18 +236,40 @@ func (nd *KVNode) Stop() {
nd.rn.Infof("node %v stopped", nd.ns)
}

func (nd *KVNode) OptimizeDB() {
nd.rn.Infof("node %v begin optimize db", nd.ns)
nd.sm.Optimize()
nd.rn.Infof("node %v end optimize db", nd.ns)
// since we can not know whether leader or follower is done on optimize
// we backup anyway after optimize
func (nd *KVNode) OptimizeDB(table string) {
nd.rn.Infof("node %v begin optimize db, table %v", nd.ns, table)
defer nd.rn.Infof("node %v end optimize db", nd.ns)
nd.sm.Optimize(table)
// empty table means optimize for all data, so we backup to keep optimized data
// after restart
if table == "" {
// since we can not know whether leader or follower is done on optimize
// we backup anyway after optimize
p := &customProposeData{
ProposeOp: ProposeOp_Backup,
NeedBackup: true,
}
d, _ := json.Marshal(p)
nd.CustomPropose(d)
}
}

func (nd *KVNode) DeleteRange(drange DeleteTableRange) error {
if err := drange.CheckValid(); err != nil {
return err
}
d, _ := json.Marshal(drange)
p := &customProposeData{
ProposeOp: ProposeOp_Backup,
NeedBackup: true,
ProposeOp: ProposeOp_DeleteTable,
NeedBackup: false,
Data: d,
}
d, _ := json.Marshal(p)
nd.CustomPropose(d)
dd, _ := json.Marshal(p)
_, err := nd.CustomPropose(dd)
if err != nil {
nd.rn.Infof("node %v delete table range %v failed: %v", nd.ns, drange, err)
}
return err
}

func (nd *KVNode) switchForLearnerLeader(isLearnerLeader bool) {
Expand Down Expand Up @@ -287,16 +331,6 @@ func (nd *KVNode) GetLocalMemberInfo() *common.MemberInfo {
return &m
}

// run perf for rt seconds and return perf result string
func (nd *KVNode) RunPerf(level int, rt int) string {
if s, ok := nd.sm.(*kvStoreSM); ok {
report := s.store.RunPerf(level, rt)
nd.rn.Infof("perf: %v", report)
return report
}
return ""
}

func (nd *KVNode) GetDBInternalStats() string {
if s, ok := nd.sm.(*kvStoreSM); ok {
return s.store.GetStatistics()
Expand Down Expand Up @@ -766,7 +800,7 @@ func (nd *KVNode) IsRaftSynced(checkCommitIndex bool) bool {
if !checkCommitIndex {
return true
}
to := time.Second * 2
to := time.Second * 5
req := make([]byte, 8)
binary.BigEndian.PutUint64(req, nd.rn.reqIDGen.Next())
ctx, cancel := context.WithTimeout(context.Background(), to)
Expand Down
5 changes: 5 additions & 0 deletions node/remote_sync_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,11 @@ func (nd *KVNode) ApplyRemoteSnapshot(skip bool, name string, term uint64, index
if skip {
nd.rn.Infof("got skipped snapshot from remote cluster %v : %v-%v", name, term, index)
} else {
// we should disallow applying remote snap while we are running as master cluster
if !IsSyncerOnly() {
nd.rn.Infof("cluster %v snapshot is not allowed: %v-%v", name, term, index)
return errors.New("apply remote snapshot is not allowed while not in syncer only mode")
}
oldS, ok := nd.remoteSyncedStates.GetApplyingSnap(name)
if !ok {
return errors.New("no remote snapshot waiting apply")
Expand Down
35 changes: 26 additions & 9 deletions node/state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type StateMachine interface {
RestoreFromSnapshot(startup bool, raftSnapshot raftpb.Snapshot, stop chan struct{}) error
Destroy()
CleanData() error
Optimize()
Optimize(string)
GetStats() common.NamespaceStats
Start() error
Close()
Expand Down Expand Up @@ -96,7 +96,7 @@ func (esm *emptySM) Destroy() {
func (esm *emptySM) CleanData() error {
return nil
}
func (esm *emptySM) Optimize() {
func (esm *emptySM) Optimize(t string) {

}
func (esm *emptySM) GetStats() common.NamespaceStats {
Expand Down Expand Up @@ -172,8 +172,12 @@ func (kvsm *kvStoreSM) Close() {
kvsm.store.Close()
}

func (kvsm *kvStoreSM) Optimize() {
kvsm.store.CompactRange()
func (kvsm *kvStoreSM) Optimize(table string) {
if table == "" {
kvsm.store.CompactRange()
} else {
kvsm.store.CompactTableRange(table)
}
}

func (kvsm *kvStoreSM) GetDBInternalStats() string {
Expand All @@ -185,17 +189,19 @@ func (kvsm *kvStoreSM) GetStats() common.NamespaceStats {
var ns common.NamespaceStats
ns.InternalStats = kvsm.store.GetInternalStatus()
ns.DBWriteStats = kvsm.dbWriteStats.Copy()

for t := range tbs {
cnt, err := kvsm.store.GetTableKeyCount(t)
if err != nil {
continue
diskUsages := kvsm.store.GetBTablesSizes(tbs)
for i, t := range tbs {
cnt, _ := kvsm.store.GetTableKeyCount(t)
if cnt <= 0 {
cnt = kvsm.store.GetTableApproximateNumInRange(string(t), nil, nil)
}
var ts common.TableStats
ts.Name = string(t)
ts.KeyNum = cnt
ts.DiskBytesUsage = diskUsages[i]
ns.TStats = append(ns.TStats, ts)
}

return ns
}

Expand Down Expand Up @@ -489,6 +495,7 @@ func (kvsm *kvStoreSM) ApplyRaftRequest(isReplaying bool, reqList BatchInternalR
(nodeLog.Level() >= common.LOG_DEBUG && cmdCost > dbWriteSlow/2) {
kvsm.Infof("slow write command: %v, cost: %v", string(cmd.Raw), cmdCost)
}

kvsm.dbWriteStats.UpdateWriteStats(int64(len(cmd.Raw)), cmdCost.Nanoseconds()/1000)
// write the future response or error
if err != nil {
Expand Down Expand Up @@ -560,6 +567,15 @@ func (kvsm *kvStoreSM) handleCustomRequest(req *InternalRaftRequest, reqID uint6
kvsm.Infof("got force backup request")
forceBackup = true
kvsm.w.Trigger(reqID, nil)
} else if p.ProposeOp == ProposeOp_DeleteTable {
var dr DeleteTableRange
err = json.Unmarshal(p.Data, &dr)
if err != nil {
kvsm.Infof("invalid delete table range data: %v", string(p.Data))
} else {
err = kvsm.store.DeleteTableRange(dr.Dryrun, dr.Table, dr.StartFrom, dr.EndTo)
}
kvsm.w.Trigger(reqID, err)
} else if p.ProposeOp == ProposeOp_RemoteConfChange {
var cc raftpb.ConfChange
cc.Unmarshal(p.Data)
Expand Down Expand Up @@ -648,6 +664,7 @@ func (kvsm *kvStoreSM) handleCustomRequest(req *InternalRaftRequest, reqID uint6
// return if configure changed and whether need force backup
func (kvsm *kvStoreSM) processBatching(cmdName string, reqList BatchInternalRaftRequest, batchStart time.Time, batchReqIDList []uint64, batchReqRspList []interface{},
dupCheckMap map[string]bool) ([]uint64, []interface{}, map[string]bool) {

err := kvsm.store.CommitBatchWrite()
dupCheckMap = make(map[string]bool, len(reqList.Reqs))
batchCost := time.Since(batchStart)
Expand Down
2 changes: 1 addition & 1 deletion node/syncer_learner.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (sm *logSyncerSM) Errorf(f string, args ...interface{}) {
nodeLog.ErrorDepth(1, fmt.Sprintf("%v-%v: %s", sm.fullNS, sm.ID, msg))
}

func (sm *logSyncerSM) Optimize() {
func (sm *logSyncerSM) Optimize(t string) {
}

func (sm *logSyncerSM) GetDBInternalStats() string {
Expand Down
4 changes: 2 additions & 2 deletions pdserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func TestLeaderLost(t *testing.T) {
oldRaftReplicaID := nsNode.Node.GetLocalMemberInfo().ID
// call this to propose some request to write raft logs
for i := 0; i < 50; i++ {
nsNode.Node.OptimizeDB()
nsNode.Node.OptimizeDB("")
}
leader.Stop()

Expand Down Expand Up @@ -321,7 +321,7 @@ func TestFollowerLost(t *testing.T) {
assert.NotNil(t, leader)
// call this to propose some request to write raft logs
for i := 0; i < 50; i++ {
nsNode.Node.OptimizeDB()
nsNode.Node.OptimizeDB("")
}
follower, followerNode := getFollowerNode(t, ns, 0)
oldFollowerReplicaID := followerNode.GetRaftID()
Expand Down
Loading

0 comments on commit d4f8f59

Please sign in to comment.