Skip to content

Commit

Permalink
Merge pull request #38 from absolute8511/raft-learner
Browse files Browse the repository at this point in the history
handle learner nodes while apply config change
  • Loading branch information
absolute8511 authored Jan 2, 2018
2 parents a62c356 + 0f3a476 commit 1169f80
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 7 deletions.
6 changes: 6 additions & 0 deletions node/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,12 @@ func getIDsAndGroups(snap *raftpb.Snapshot, ents []raftpb.Entry) ([]uint64, map[
for _, grp := range snap.Metadata.ConfState.Groups {
grps[grp.RaftReplicaId] = *grp
}
for _, id := range snap.Metadata.ConfState.Learners {
ids[id] = true
}
for _, grp := range snap.Metadata.ConfState.LearnerGroups {
grps[grp.RaftReplicaId] = *grp
}
}
for _, e := range ents {
if e.Type != raftpb.EntryConfChange {
Expand Down
10 changes: 8 additions & 2 deletions raft/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,10 @@ func (n *node) run(r *raft) {
if cc.ReplicaID == None {
r.resetPendingConf()
select {
case n.confstatec <- pb.ConfState{Nodes: r.nodes(), Groups: r.groups()}:
case n.confstatec <- pb.ConfState{Nodes: r.nodes(),
Groups: r.groups(),
Learners: r.learnerNodes(),
LearnerGroups: r.learnerGroups()}:
case <-n.done:
}
break
Expand All @@ -417,7 +420,10 @@ func (n *node) run(r *raft) {
panic("unexpected conf type")
}
select {
case n.confstatec <- pb.ConfState{Nodes: r.nodes(), Groups: r.groups()}:
case n.confstatec <- pb.ConfState{Nodes: r.nodes(),
Groups: r.groups(),
Learners: r.learnerNodes(),
LearnerGroups: r.learnerGroups()}:
case <-n.done:
}
case <-n.tickc:
Expand Down
59 changes: 58 additions & 1 deletion raft/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import (
"testing"
"time"

"github.com/absolute8511/ZanRedisDB/raft/raftpb"
"github.com/absolute8511/ZanRedisDB/pkg/testutil"
"github.com/absolute8511/ZanRedisDB/raft/raftpb"
"golang.org/x/net/context"
)

Expand Down Expand Up @@ -772,3 +772,60 @@ func TestIsHardStateEqual(t *testing.T) {
}
}
}

func TestNodeProposeAddLearnerNode(t *testing.T) {
grp := raftpb.Group{
NodeId: 2,
RaftReplicaId: 2,
GroupId: 1,
}

ticker := time.NewTicker(time.Millisecond * 100)
defer ticker.Stop()
n := newNode()
s := NewMemoryStorage()
r := newTestRaft(1, []uint64{1}, 10, 1, s)
go n.run(r)
n.Campaign(context.TODO())
stop := make(chan struct{})
done := make(chan struct{})
applyConfChan := make(chan struct{})
go func() {
defer close(done)
for {
select {
case <-stop:
return
case <-ticker.C:
n.Tick()
case rd := <-n.Ready():
s.Append(rd.Entries)
t.Logf("raft: %v", rd.Entries)
for _, ent := range rd.Entries {
if ent.Type == raftpb.EntryConfChange {
var cc raftpb.ConfChange
cc.Unmarshal(ent.Data)
state := n.ApplyConfChange(cc)
if len(state.Learners) == 0 || state.Learners[0] != cc.ReplicaID {
t.Fatalf("apply conf change should return new added learner: %v", state.String())
}
if len(state.LearnerGroups) == 0 || state.LearnerGroups[0].String() != cc.NodeGroup.String() {
t.Fatalf("apply conf change should return new added learner group: %v", state.String())
}
if len(state.Nodes) != 1 {
t.Fatalf("add learner should not change the nodes: %v", state.String())
}
t.Logf("apply raft conf %v changed to: %v", cc, state.String())
applyConfChan <- struct{}{}
}
}
n.Advance()
}
}
}()
cc := raftpb.ConfChange{Type: raftpb.ConfChangeAddLearnerNode, ReplicaID: 2, NodeGroup: grp}
n.ProposeConfChange(context.TODO(), cc)
<-applyConfChan
close(stop)
<-done
}
20 changes: 19 additions & 1 deletion raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,9 @@ func newRaft(c *Config) *raft {
for _, n := range r.nodes() {
nodesStrs = append(nodesStrs, fmt.Sprintf("%x", n))
}
for _, n := range r.learnerNodes() {
nodesStrs = append(nodesStrs, fmt.Sprintf("learner-%x", n))
}

r.logger.Infof("newRaft %x(%v) [peers: [%s], term: %d, commit: %d, applied: %d, lastindex: %d, lastterm: %d]",
r.id, r.group.Name, strings.Join(nodesStrs, ","), r.Term, r.raftLog.committed, r.raftLog.applied, r.raftLog.lastIndex(), r.raftLog.lastTerm())
Expand All @@ -401,10 +404,16 @@ func (r *raft) hardState() pb.HardState {
func (r *raft) quorum() int { return len(r.prs)/2 + 1 }

func (r *raft) nodes() []uint64 {
nodes := make([]uint64, 0, len(r.prs)+len(r.learnerPrs))
nodes := make([]uint64, 0, len(r.prs))
for id := range r.prs {
nodes = append(nodes, id)
}
sort.Sort(uint64Slice(nodes))
return nodes
}

func (r *raft) learnerNodes() []uint64 {
nodes := make([]uint64, 0, len(r.learnerPrs))
for id := range r.learnerPrs {
nodes = append(nodes, id)
}
Expand All @@ -425,6 +434,15 @@ func (r *raft) groups() []*pb.Group {
return groups
}

func (r *raft) learnerGroups() []*pb.Group {
groups := make([]*pb.Group, 0, len(r.learnerPrs))
for _, pr := range r.learnerPrs {
newg := pr.group
groups = append(groups, &newg)
}
return groups
}

// send persists state to stable storage and then sends to its mailbox.
func (r *raft) send(m pb.Message) {
m.From = r.id
Expand Down
3 changes: 2 additions & 1 deletion raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2504,7 +2504,7 @@ func TestRestoreWithLearner(t *testing.T) {
t.Errorf("%x is not learner, want yes", sm.id)
}
sg := sm.nodes()
if len(sg) != len(s.Metadata.ConfState.Nodes)+len(s.Metadata.ConfState.Learners) {
if len(sg)+len(sm.learnerNodes()) != len(s.Metadata.ConfState.Nodes)+len(s.Metadata.ConfState.Learners) {
t.Errorf("sm.Nodes = %+v, length not equal with %+v", sg, s.Metadata.ConfState)
}
for _, n := range s.Metadata.ConfState.Nodes {
Expand Down Expand Up @@ -2873,6 +2873,7 @@ func TestAddLearner(t *testing.T) {
t.Errorf("pendingConf = %v, want false", r.pendingConf)
}
nodes := r.nodes()
nodes = append(nodes, r.learnerNodes()...)
wnodes := []uint64{1, 2}
if !reflect.DeepEqual(nodes, wnodes) {
t.Errorf("nodes = %v, want %v", nodes, wnodes)
Expand Down
6 changes: 4 additions & 2 deletions raft/rawnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ func (rn *RawNode) ProposeConfChange(cc pb.ConfChange) error {
func (rn *RawNode) ApplyConfChange(cc pb.ConfChange) *pb.ConfState {
if cc.ReplicaID == None {
rn.raft.resetPendingConf()
return &pb.ConfState{Nodes: rn.raft.nodes(), Groups: rn.raft.groups()}
return &pb.ConfState{Nodes: rn.raft.nodes(), Groups: rn.raft.groups(),
Learners: rn.raft.learnerNodes(), LearnerGroups: rn.raft.learnerGroups()}
}
switch cc.Type {
case pb.ConfChangeAddNode:
Expand All @@ -188,7 +189,8 @@ func (rn *RawNode) ApplyConfChange(cc pb.ConfChange) *pb.ConfState {
default:
panic("unexpected conf type")
}
return &pb.ConfState{Nodes: rn.raft.nodes(), Groups: rn.raft.groups()}
return &pb.ConfState{Nodes: rn.raft.nodes(), Groups: rn.raft.groups(),
Learners: rn.raft.learnerNodes(), LearnerGroups: rn.raft.learnerGroups()}
}

// Step advances the state machine using the given message.
Expand Down
4 changes: 4 additions & 0 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,7 @@ func TestRestartCluster(t *testing.T) {
// TODO:
// stop all nodes in cluster and start one by one
}

func TestRestartWithForceAlone(t *testing.T) {
// TODO: test force restart with alone
}

0 comments on commit 1169f80

Please sign in to comment.