Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Syncer stats support timestamp #50

Merged
merged 5 commits into from
May 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions common/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,11 @@ type NamespaceStats struct {
}

type LogSyncStats struct {
Name string `json:"name"`
Term uint64 `json:"term"`
Index uint64 `json:"index"`
IsLeader bool `json:"is_leader"`
Name string `json:"name"`
Term uint64 `json:"term"`
Index uint64 `json:"index"`
Timestamp int64 `json:"timestamp"`
IsLeader bool `json:"is_leader"`
}

type ScanStats struct {
Expand Down
11 changes: 6 additions & 5 deletions node/log_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,19 +181,19 @@ func (s *RemoteLogSender) doSendOnce(r []*BatchInternalRaftRequest) error {
}

in := &syncerpb.RaftReqs{RaftLog: raftLogs}
if nodeLog.Level() >= common.LOG_DETAIL {
nodeLog.Debugf("sending log : %v", addr, in.String())
if nodeLog.Level() > common.LOG_DETAIL {
nodeLog.Debugf("sending(%v) log : %v", addr, in.String())
}
ctx, cancel := context.WithTimeout(context.Background(), sendLogTimeout)
defer cancel()
rpcErr, err := c.ApplyRaftReqs(ctx, in)
rpcErr, err := c.ApplyRaftReqs(ctx, in, grpc.MaxCallSendMsgSize(256<<20))
if err != nil {
nodeLog.Infof("sending(%v) log failed: %v, %v", addr, err.Error(), in.String())
nodeLog.Infof("sending(%v) log failed: %v", addr, err.Error())
return err
}
if rpcErr != nil && rpcErr.ErrCode != http.StatusOK &&
rpcErr.ErrCode != 0 {
nodeLog.Infof("sending(%v) log failed: %v, %v", addr, rpcErr, in.String())
nodeLog.Infof("sending(%v) log failed: %v", addr, rpcErr)
return errors.New(rpcErr.String())
}
return nil
Expand Down Expand Up @@ -383,6 +383,7 @@ func (s *RemoteLogSender) getRemoteSyncedRaftOnce() (SyncedState, error) {
}
state.SyncedTerm = rsp.Term
state.SyncedIndex = rsp.Index
state.Timestamp = rsp.Timestamp
nodeLog.Debugf("remote(%v) raft group %v synced : %v", addr, s.grpName, state)
return state, nil
}
Expand Down
24 changes: 23 additions & 1 deletion node/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,27 @@ func (nsm *NamespaceMgr) GetDBStats(leaderOnly bool) map[string]string {
return nsStats
}

func (nsm *NamespaceMgr) GetLogSyncStatsInSyncer() ([]common.LogSyncStats, []common.LogSyncStats) {
nsm.mutex.RLock()
nsRecvStats := make([]common.LogSyncStats, 0, len(nsm.kvNodes))
nsSyncStats := make([]common.LogSyncStats, 0, len(nsm.kvNodes))
for k, n := range nsm.kvNodes {
if !n.IsReady() {
continue
}
recvStats, syncStats := n.Node.GetLogSyncStatsInSyncLearner()
if recvStats == nil || syncStats == nil {
continue
}
recvStats.Name = k
syncStats.Name = k
nsRecvStats = append(nsRecvStats, *recvStats)
nsSyncStats = append(nsSyncStats, *syncStats)
}
nsm.mutex.RUnlock()
return nsRecvStats, nsSyncStats
}

func (nsm *NamespaceMgr) GetLogSyncStats(leaderOnly bool, srcClusterName string) []common.LogSyncStats {
if srcClusterName == "" {
return nil
Expand All @@ -461,7 +482,7 @@ func (nsm *NamespaceMgr) GetLogSyncStats(leaderOnly bool, srcClusterName string)
if leaderOnly && !n.Node.IsLead() {
continue
}
term, index := n.Node.GetRemoteClusterSyncedRaft(srcClusterName)
term, index, ts := n.Node.GetRemoteClusterSyncedRaft(srcClusterName)
if term == 0 && index == 0 {
continue
}
Expand All @@ -470,6 +491,7 @@ func (nsm *NamespaceMgr) GetLogSyncStats(leaderOnly bool, srcClusterName string)
s.IsLeader = n.Node.IsLead()
s.Term = term
s.Index = index
s.Timestamp = ts
nsStats = append(nsStats, s)
}
nsm.mutex.RUnlock()
Expand Down
2 changes: 1 addition & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ func (nd *KVNode) handleProposeReq() {
}()
for {
pc := nd.reqProposeC
if len(reqList.Reqs) >= proposeQueueLen*4 {
if len(reqList.Reqs) >= proposeQueueLen*2 {
pc = nil
}
select {
Expand Down
36 changes: 28 additions & 8 deletions node/remote_sync_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@ import (
"errors"
"sync"
"time"

"github.com/absolute8511/ZanRedisDB/common"
)

type SyncedState struct {
SyncedTerm uint64 `json:"synced_term,omitempty"`
SyncedIndex uint64 `json:"synced_index,omitempty"`
Timestamp int64 `json:"timestamp,omitempty"`
// this is used to disallow compare using the ==
disableEqual []byte
}

func (ss *SyncedState) IsNewer(other *SyncedState) bool {
Expand All @@ -19,6 +24,10 @@ func (ss *SyncedState) IsNewer(other *SyncedState) bool {
return false
}

func (ss *SyncedState) IsSame(other *SyncedState) bool {
return ss.SyncedTerm == other.SyncedTerm && ss.SyncedIndex == other.SyncedIndex
}

func (ss *SyncedState) IsNewer2(term uint64, index uint64) bool {
if ss.SyncedTerm >= term && ss.SyncedIndex >= index {
return true
Expand Down Expand Up @@ -68,7 +77,7 @@ func newRemoteSyncedStateMgr() *remoteSyncedStateMgr {
func (rss *remoteSyncedStateMgr) RemoveApplyingSnap(name string, state SyncedState) {
rss.Lock()
sas, ok := rss.remoteSnapshotsApplying[name]
if ok && sas.SS == state {
if ok && sas.SS.IsSame(&state) {
delete(rss.remoteSnapshotsApplying, name)
}
rss.Unlock()
Expand Down Expand Up @@ -109,7 +118,7 @@ func (rss *remoteSyncedStateMgr) AddApplyingSnap(name string, state SyncedState)
func (rss *remoteSyncedStateMgr) UpdateApplyingSnapStatus(name string, ss SyncedState, status int) {
rss.Lock()
sas, ok := rss.remoteSnapshotsApplying[name]
if ok && status < len(applyStatusMsgs) && ss == sas.SS {
if ok && status < len(applyStatusMsgs) && ss.IsSame(&sas.SS) {
if sas.StatusCode != status {
sas.StatusCode = status
sas.Status = applyStatusMsgs[status]
Expand Down Expand Up @@ -175,7 +184,8 @@ func (nd *KVNode) isAlreadyApplied(reqList BatchInternalRaftRequest) bool {

// return as (cluster name, is transferring remote snapshot, is applying remote snapshot)
func (nd *KVNode) preprocessRemoteSnapApply(reqList BatchInternalRaftRequest) (bool, bool) {
ss := SyncedState{SyncedTerm: reqList.OrigTerm, SyncedIndex: reqList.OrigIndex}
ss := SyncedState{SyncedTerm: reqList.OrigTerm,
SyncedIndex: reqList.OrigIndex, Timestamp: reqList.Timestamp}
for _, req := range reqList.Reqs {
if req.Header.DataType == int32(CustomReq) {
var cr customProposeData
Expand All @@ -199,7 +209,7 @@ func (nd *KVNode) preprocessRemoteSnapApply(reqList BatchInternalRaftRequest) (b

func (nd *KVNode) postprocessRemoteSnapApply(reqList BatchInternalRaftRequest,
isRemoteSnapTransfer bool, isRemoteSnapApply bool, retErr error) {
ss := SyncedState{SyncedTerm: reqList.OrigTerm, SyncedIndex: reqList.OrigIndex}
ss := SyncedState{SyncedTerm: reqList.OrigTerm, SyncedIndex: reqList.OrigIndex, Timestamp: reqList.Timestamp}
// for remote snapshot transfer, we need wait apply success before update sync state
if !isRemoteSnapTransfer {
if retErr != errIgnoredRemoteApply {
Expand All @@ -221,12 +231,22 @@ func (nd *KVNode) postprocessRemoteSnapApply(reqList BatchInternalRaftRequest,
}
}

func (nd *KVNode) SetRemoteClusterSyncedRaft(name string, term uint64, index uint64) {
nd.remoteSyncedStates.UpdateState(name, SyncedState{SyncedTerm: term, SyncedIndex: index})
func (nd *KVNode) SetRemoteClusterSyncedRaft(name string, term uint64, index uint64, ts int64) {
nd.remoteSyncedStates.UpdateState(name, SyncedState{SyncedTerm: term, SyncedIndex: index, Timestamp: ts})
}
func (nd *KVNode) GetRemoteClusterSyncedRaft(name string) (uint64, uint64) {
func (nd *KVNode) GetRemoteClusterSyncedRaft(name string) (uint64, uint64, int64) {
state, _ := nd.remoteSyncedStates.GetState(name)
return state.SyncedTerm, state.SyncedIndex
return state.SyncedTerm, state.SyncedIndex, state.Timestamp
}

func (nd *KVNode) GetLogSyncStatsInSyncLearner() (*common.LogSyncStats, *common.LogSyncStats) {
logSyncer, ok := nd.sm.(*logSyncerSM)
if !ok {
return nil, nil
}

recv, sync := logSyncer.GetLogSyncStats()
return &recv, &sync
}

func (nd *KVNode) ApplyRemoteSnapshot(skip bool, name string, term uint64, index uint64) error {
Expand Down
91 changes: 65 additions & 26 deletions node/syncer_learner.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func EnableForTest() {
}

const (
logSendBufferLen = 100
logSendBufferLen = 64
)

var syncerNormalInit = false
Expand All @@ -29,14 +29,17 @@ func SetSyncerNormalInit() {
syncerNormalInit = true
}

var syncLearnerRecvStats common.WriteStats
var syncLearnerDoneStats common.WriteStats

type logSyncerSM struct {
clusterInfo common.IClusterInfo
fullNS string
machineConfig MachineConfig
ID uint64
syncedCnt int64
syncedIndex uint64
syncedTerm uint64
receivedState SyncedState
syncedState SyncedState
lgSender *RemoteLogSender
stopping int32
sendCh chan *BatchInternalRaftRequest
Expand Down Expand Up @@ -96,13 +99,30 @@ func (sm *logSyncerSM) GetDBInternalStats() string {
return ""
}

func GetLogLatencyStats() (*common.WriteStats, *common.WriteStats) {
return syncLearnerRecvStats.Copy(), syncLearnerDoneStats.Copy()
}

func (sm *logSyncerSM) GetLogSyncStats() (common.LogSyncStats, common.LogSyncStats) {
var recvStats common.LogSyncStats
var syncStats common.LogSyncStats
syncStats.Name = sm.fullNS
syncStats.Term, syncStats.Index, syncStats.Timestamp = sm.getSyncedState()
recvStats.Term = atomic.LoadUint64(&sm.receivedState.SyncedTerm)
recvStats.Index = atomic.LoadUint64(&sm.receivedState.SyncedIndex)
recvStats.Timestamp = atomic.LoadInt64(&sm.receivedState.Timestamp)
recvStats.Name = sm.fullNS
return recvStats, syncStats
}

func (sm *logSyncerSM) GetStats() common.NamespaceStats {
var ns common.NamespaceStats
stat := make(map[string]interface{})
stat["role"] = common.LearnerRoleLogSyncer
stat["synced"] = atomic.LoadInt64(&sm.syncedCnt)
stat["synced_index"] = atomic.LoadUint64(&sm.syncedIndex)
stat["synced_term"] = atomic.LoadUint64(&sm.syncedTerm)
stat["synced_index"] = atomic.LoadUint64(&sm.syncedState.SyncedIndex)
stat["synced_term"] = atomic.LoadUint64(&sm.syncedState.SyncedTerm)
stat["synced_timestamp"] = atomic.LoadInt64(&sm.syncedState.Timestamp)
ns.InternalStats = stat
return ns
}
Expand Down Expand Up @@ -136,32 +156,49 @@ func (sm *logSyncerSM) Close() {
sm.wg.Wait()
}

func (sm *logSyncerSM) setReceivedState(term uint64, index uint64, ts int64) {
atomic.StoreUint64(&sm.receivedState.SyncedTerm, term)
atomic.StoreUint64(&sm.receivedState.SyncedIndex, index)
atomic.StoreInt64(&sm.receivedState.Timestamp, ts)
}

func (sm *logSyncerSM) setSyncedState(term uint64, index uint64, ts int64) {
atomic.StoreUint64(&sm.syncedState.SyncedTerm, term)
atomic.StoreUint64(&sm.syncedState.SyncedIndex, index)
atomic.StoreInt64(&sm.syncedState.Timestamp, ts)
}

func (sm *logSyncerSM) getSyncedState() (uint64, uint64, int64) {
syncedTerm := atomic.LoadUint64(&sm.syncedState.SyncedTerm)
syncedIndex := atomic.LoadUint64(&sm.syncedState.SyncedIndex)
syncedTs := atomic.LoadInt64(&sm.syncedState.Timestamp)
return syncedTerm, syncedIndex, syncedTs
}

func (sm *logSyncerSM) switchIgnoreSend(send bool) {
old := atomic.LoadInt32(&sm.ignoreSend)

syncedTerm := atomic.LoadUint64(&sm.syncedTerm)
syncedIndex := atomic.LoadUint64(&sm.syncedIndex)
syncedTerm, syncedIndex, syncedTs := sm.getSyncedState()
if send {
if old == 0 {
return
}
sm.Infof("switch to send log really at: %v-%v", syncedTerm, syncedIndex)
sm.Infof("switch to send log really at: %v-%v-%v", syncedTerm, syncedIndex, syncedTs)
atomic.StoreInt32(&sm.ignoreSend, 0)
} else {
if old == 1 {
return
}
sm.Infof("switch to ignore send log at: %v-%v", syncedTerm, syncedIndex)
sm.Infof("switch to ignore send log at: %v-%v-%v", syncedTerm, syncedIndex, syncedTs)
atomic.StoreInt32(&sm.ignoreSend, 1)
}
}

func (sm *logSyncerSM) handlerRaftLogs() {
defer func() {
sm.lgSender.Stop()
syncedTerm := atomic.LoadUint64(&sm.syncedTerm)
syncedIndex := atomic.LoadUint64(&sm.syncedIndex)
sm.Infof("raft log syncer send loop exit at synced: %v-%v", syncedTerm, syncedIndex)
syncedTerm, syncedIndex, syncedTs := sm.getSyncedState()
sm.Infof("raft log syncer send loop exit at synced: %v-%v-%v", syncedTerm, syncedIndex, syncedTs)
}()
raftLogs := make([]*BatchInternalRaftRequest, 0, logSendBufferLen)
var last *BatchInternalRaftRequest
Expand Down Expand Up @@ -225,17 +262,19 @@ func (sm *logSyncerSM) handlerRaftLogs() {
case <-sm.sendStop:
return
default:
syncedTerm := atomic.LoadUint64(&sm.syncedTerm)
syncedIndex := atomic.LoadUint64(&sm.syncedIndex)
sm.Errorf("failed to send raft log to remote: %v, %v, current: %v-%v",
err, raftLogs, syncedTerm, syncedIndex)
syncedTerm, syncedIndex, syncedTs := sm.getSyncedState()
sm.Errorf("failed to send raft log to remote: %v, %v, current: %v-%v-%v",
err, len(raftLogs), syncedTerm, syncedIndex, syncedTs)
}
continue
}
if handled {
atomic.AddInt64(&sm.syncedCnt, int64(len(raftLogs)))
atomic.StoreUint64(&sm.syncedIndex, last.OrigIndex)
atomic.StoreUint64(&sm.syncedTerm, last.OrigTerm)
sm.setSyncedState(last.OrigTerm, last.OrigIndex, last.Timestamp)
t := time.Now().UnixNano()
for _, rl := range raftLogs {
syncLearnerDoneStats.UpdateLatencyStats((t - rl.Timestamp) / time.Microsecond.Nanoseconds())
}
raftLogs = raftLogs[:0]
}
}
Expand Down Expand Up @@ -278,8 +317,7 @@ func (sm *logSyncerSM) waitIgnoreUntilChanged(term uint64, index uint64, stop ch
for {
if atomic.LoadInt32(&sm.ignoreSend) == 1 {
// check local to avoid call rpc too much
syncTerm := atomic.LoadUint64(&sm.syncedTerm)
syncIndex := atomic.LoadUint64(&sm.syncedIndex)
syncTerm, syncIndex, _ := sm.getSyncedState()
if syncTerm >= term && syncIndex >= index {
return true, nil
}
Expand All @@ -288,8 +326,7 @@ func (sm *logSyncerSM) waitIgnoreUntilChanged(term uint64, index uint64, stop ch
sm.Infof("failed to get the synced state from remote: %v, at %v-%v", err, term, index)
} else {
if state.IsNewer2(term, index) {
atomic.StoreUint64(&sm.syncedIndex, state.SyncedIndex)
atomic.StoreUint64(&sm.syncedTerm, state.SyncedTerm)
sm.setSyncedState(state.SyncedTerm, state.SyncedIndex, state.Timestamp)
return true, nil
}
}
Expand Down Expand Up @@ -319,8 +356,7 @@ func (sm *logSyncerSM) RestoreFromSnapshot(startup bool, raftSnapshot raftpb.Sna
}
if state.IsNewer2(raftSnapshot.Metadata.Term, raftSnapshot.Metadata.Index) {
sm.Infof("ignored restore snapshot since remote has newer raft: %v than %v", state, raftSnapshot.Metadata.String())
atomic.StoreUint64(&sm.syncedIndex, raftSnapshot.Metadata.Index)
atomic.StoreUint64(&sm.syncedTerm, raftSnapshot.Metadata.Term)
sm.setSyncedState(raftSnapshot.Metadata.Term, raftSnapshot.Metadata.Index, 0)
return nil
}

Expand Down Expand Up @@ -389,8 +425,7 @@ func (sm *logSyncerSM) RestoreFromSnapshot(startup bool, raftSnapshot raftpb.Sna
}

sm.Infof("apply snap done %v", raftSnapshot.Metadata)
atomic.StoreUint64(&sm.syncedIndex, raftSnapshot.Metadata.Index)
atomic.StoreUint64(&sm.syncedTerm, raftSnapshot.Metadata.Term)
sm.setSyncedState(raftSnapshot.Metadata.Term, raftSnapshot.Metadata.Index, 0)
return nil
}

Expand Down Expand Up @@ -425,6 +460,10 @@ func (sm *logSyncerSM) ApplyRaftRequest(isReplaying bool, reqList BatchInternalR
sm.Infof("ignore sync from cluster syncer, %v-%v:%v", term, index, reqList.String())
return false, nil
}
sm.setReceivedState(term, index, reqList.Timestamp)
latency := time.Now().UnixNano() - reqList.Timestamp
syncLearnerRecvStats.UpdateLatencyStats(latency / time.Microsecond.Nanoseconds())

forceBackup := false
reqList.OrigTerm = term
reqList.OrigIndex = index
Expand Down
Loading