Skip to content

Commit

Permalink
Merge pull request #36 from absolute8511/raft-learner
Browse files Browse the repository at this point in the history
Raft learner
  • Loading branch information
absolute8511 authored Jan 2, 2018
2 parents be1d934 + 5c06d7e commit a62c356
Show file tree
Hide file tree
Showing 14 changed files with 750 additions and 83 deletions.
6 changes: 3 additions & 3 deletions node/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (nn *NamespaceNode) Start(forceStandaloneCluster bool) error {
}

func (nn *NamespaceNode) TransferMyLeader(to uint64, toRaftID uint64) error {
waitTimeout := time.Duration(nn.Node.machineConfig.ElectionTick)*time.Duration(nn.Node.machineConfig.TickMs)* time.Millisecond
waitTimeout := time.Duration(nn.Node.machineConfig.ElectionTick) * time.Duration(nn.Node.machineConfig.TickMs) * time.Millisecond
ctx, cancel := context.WithTimeout(context.Background(), waitTimeout)
defer cancel()
oldLeader := nn.Node.rn.Lead()
Expand Down Expand Up @@ -399,12 +399,12 @@ func (nsm *NamespaceMgr) GetNamespaceNodeFromGID(gid uint64) *NamespaceNode {
defer nsm.mutex.RUnlock()
gn, ok := nsm.groups[gid]
if !ok {
nodeLog.Errorf("group name not found %v ", gid)
nodeLog.Debugf("group name not found %v ", gid)
return nil
}
kv, ok := nsm.kvNodes[gn]
if !ok {
nodeLog.Errorf("kv namespace not found %v ", gn)
nodeLog.Infof("kv namespace not found %v ", gn)
return nil
}

Expand Down
6 changes: 3 additions & 3 deletions node/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ import (

"github.com/absolute8511/ZanRedisDB/common"
"github.com/absolute8511/ZanRedisDB/pkg/fileutil"
"github.com/absolute8511/ZanRedisDB/pkg/idutil"
"github.com/absolute8511/ZanRedisDB/pkg/types"
"github.com/absolute8511/ZanRedisDB/raft"
"github.com/absolute8511/ZanRedisDB/raft/raftpb"
"github.com/absolute8511/ZanRedisDB/snap"
"github.com/absolute8511/ZanRedisDB/transport/rafthttp"
"github.com/absolute8511/ZanRedisDB/wal"
"github.com/absolute8511/ZanRedisDB/wal/walpb"
"github.com/absolute8511/ZanRedisDB/pkg/idutil"
"github.com/absolute8511/ZanRedisDB/pkg/types"
"golang.org/x/net/context"
)

Expand Down Expand Up @@ -611,7 +611,7 @@ func (rc *raftNode) beginSnapshot(snapi uint64, confState raftpb.ConfState) erro
if err != nil {
return err
}
rc.Infof("get snapshot object done: %v", snapi)
rc.Infof("get snapshot object done: %v, state: %v", snapi, confState.String())

rc.wgAsync.Add(1)
go func() {
Expand Down
16 changes: 9 additions & 7 deletions raft/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,11 +350,11 @@ func (n *node) run(r *raft) {
case mdrop := <-n.recvc:
m := mdrop.m
// filter out response message from unknown From.
from, ok := r.prs[m.From]
if ok || !IsResponseMsg(m.Type) {
from := r.getProgress(m.From)
if from != nil || !IsResponseMsg(m.Type) {
if m.Type == pb.MsgTransferLeader {
if m.FromGroup.NodeId == 0 {
if !ok {
if from == nil {
if m.From == r.id {
m.FromGroup = r.group
} else {
Expand All @@ -367,8 +367,8 @@ func (n *node) run(r *raft) {
}
}
if m.ToGroup.NodeId == 0 {
g, ok := r.prs[m.To]
if !ok {
pr := r.getProgress(m.To)
if pr == nil {
if m.To == r.id {
m.ToGroup = r.group
} else {
Expand All @@ -377,13 +377,13 @@ func (n *node) run(r *raft) {
continue
}
} else {
m.ToGroup = g.group
m.ToGroup = pr.group
}
}
} else {
// if we missing the peer node group info, try update it from
// raft message
if ok && from.group.NodeId == 0 && m.FromGroup.NodeId > 0 &&
if from != nil && from.group.NodeId == 0 && m.FromGroup.NodeId > 0 &&
m.FromGroup.GroupId == r.group.GroupId {
from.group = m.FromGroup
}
Expand All @@ -402,6 +402,8 @@ func (n *node) run(r *raft) {
switch cc.Type {
case pb.ConfChangeAddNode:
r.addNode(cc.ReplicaID, cc.NodeGroup)
case pb.ConfChangeAddLearnerNode:
r.addLearner(cc.ReplicaID, cc.NodeGroup)
case pb.ConfChangeRemoveNode:
// block incoming proposal when local node is
// removed
Expand Down
3 changes: 3 additions & 0 deletions raft/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package raft

import (
"fmt"

pb "github.com/absolute8511/ZanRedisDB/raft/raftpb"
)

Expand Down Expand Up @@ -82,6 +83,8 @@ type Progress struct {

// group indicate which node the raft follower is belonged
group pb.Group
// IsLearner is true if this progress is tracked for a learner.
IsLearner bool
}

func (pr *Progress) resetState(state ProgressStateType) {
Expand Down
Loading

0 comments on commit a62c356

Please sign in to comment.