Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

handle learner nodes while apply config change #38

Merged
merged 1 commit into from
Jan 2, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions node/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,12 @@ func getIDsAndGroups(snap *raftpb.Snapshot, ents []raftpb.Entry) ([]uint64, map[
for _, grp := range snap.Metadata.ConfState.Groups {
grps[grp.RaftReplicaId] = *grp
}
for _, id := range snap.Metadata.ConfState.Learners {
ids[id] = true
}
for _, grp := range snap.Metadata.ConfState.LearnerGroups {
grps[grp.RaftReplicaId] = *grp
}
}
for _, e := range ents {
if e.Type != raftpb.EntryConfChange {
Expand Down
10 changes: 8 additions & 2 deletions raft/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,10 @@ func (n *node) run(r *raft) {
if cc.ReplicaID == None {
r.resetPendingConf()
select {
case n.confstatec <- pb.ConfState{Nodes: r.nodes(), Groups: r.groups()}:
case n.confstatec <- pb.ConfState{Nodes: r.nodes(),
Groups: r.groups(),
Learners: r.learnerNodes(),
LearnerGroups: r.learnerGroups()}:
case <-n.done:
}
break
Expand All @@ -417,7 +420,10 @@ func (n *node) run(r *raft) {
panic("unexpected conf type")
}
select {
case n.confstatec <- pb.ConfState{Nodes: r.nodes(), Groups: r.groups()}:
case n.confstatec <- pb.ConfState{Nodes: r.nodes(),
Groups: r.groups(),
Learners: r.learnerNodes(),
LearnerGroups: r.learnerGroups()}:
case <-n.done:
}
case <-n.tickc:
Expand Down
59 changes: 58 additions & 1 deletion raft/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import (
"testing"
"time"

"github.com/absolute8511/ZanRedisDB/raft/raftpb"
"github.com/absolute8511/ZanRedisDB/pkg/testutil"
"github.com/absolute8511/ZanRedisDB/raft/raftpb"
"golang.org/x/net/context"
)

Expand Down Expand Up @@ -772,3 +772,60 @@ func TestIsHardStateEqual(t *testing.T) {
}
}
}

func TestNodeProposeAddLearnerNode(t *testing.T) {
grp := raftpb.Group{
NodeId: 2,
RaftReplicaId: 2,
GroupId: 1,
}

ticker := time.NewTicker(time.Millisecond * 100)
defer ticker.Stop()
n := newNode()
s := NewMemoryStorage()
r := newTestRaft(1, []uint64{1}, 10, 1, s)
go n.run(r)
n.Campaign(context.TODO())
stop := make(chan struct{})
done := make(chan struct{})
applyConfChan := make(chan struct{})
go func() {
defer close(done)
for {
select {
case <-stop:
return
case <-ticker.C:
n.Tick()
case rd := <-n.Ready():
s.Append(rd.Entries)
t.Logf("raft: %v", rd.Entries)
for _, ent := range rd.Entries {
if ent.Type == raftpb.EntryConfChange {
var cc raftpb.ConfChange
cc.Unmarshal(ent.Data)
state := n.ApplyConfChange(cc)
if len(state.Learners) == 0 || state.Learners[0] != cc.ReplicaID {
t.Fatalf("apply conf change should return new added learner: %v", state.String())
}
if len(state.LearnerGroups) == 0 || state.LearnerGroups[0].String() != cc.NodeGroup.String() {
t.Fatalf("apply conf change should return new added learner group: %v", state.String())
}
if len(state.Nodes) != 1 {
t.Fatalf("add learner should not change the nodes: %v", state.String())
}
t.Logf("apply raft conf %v changed to: %v", cc, state.String())
applyConfChan <- struct{}{}
}
}
n.Advance()
}
}
}()
cc := raftpb.ConfChange{Type: raftpb.ConfChangeAddLearnerNode, ReplicaID: 2, NodeGroup: grp}
n.ProposeConfChange(context.TODO(), cc)
<-applyConfChan
close(stop)
<-done
}
20 changes: 19 additions & 1 deletion raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,9 @@ func newRaft(c *Config) *raft {
for _, n := range r.nodes() {
nodesStrs = append(nodesStrs, fmt.Sprintf("%x", n))
}
for _, n := range r.learnerNodes() {
nodesStrs = append(nodesStrs, fmt.Sprintf("learner-%x", n))
}

r.logger.Infof("newRaft %x(%v) [peers: [%s], term: %d, commit: %d, applied: %d, lastindex: %d, lastterm: %d]",
r.id, r.group.Name, strings.Join(nodesStrs, ","), r.Term, r.raftLog.committed, r.raftLog.applied, r.raftLog.lastIndex(), r.raftLog.lastTerm())
Expand All @@ -401,10 +404,16 @@ func (r *raft) hardState() pb.HardState {
func (r *raft) quorum() int { return len(r.prs)/2 + 1 }

func (r *raft) nodes() []uint64 {
nodes := make([]uint64, 0, len(r.prs)+len(r.learnerPrs))
nodes := make([]uint64, 0, len(r.prs))
for id := range r.prs {
nodes = append(nodes, id)
}
sort.Sort(uint64Slice(nodes))
return nodes
}

func (r *raft) learnerNodes() []uint64 {
nodes := make([]uint64, 0, len(r.learnerPrs))
for id := range r.learnerPrs {
nodes = append(nodes, id)
}
Expand All @@ -425,6 +434,15 @@ func (r *raft) groups() []*pb.Group {
return groups
}

func (r *raft) learnerGroups() []*pb.Group {
groups := make([]*pb.Group, 0, len(r.learnerPrs))
for _, pr := range r.learnerPrs {
newg := pr.group
groups = append(groups, &newg)
}
return groups
}

// send persists state to stable storage and then sends to its mailbox.
func (r *raft) send(m pb.Message) {
m.From = r.id
Expand Down
3 changes: 2 additions & 1 deletion raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2504,7 +2504,7 @@ func TestRestoreWithLearner(t *testing.T) {
t.Errorf("%x is not learner, want yes", sm.id)
}
sg := sm.nodes()
if len(sg) != len(s.Metadata.ConfState.Nodes)+len(s.Metadata.ConfState.Learners) {
if len(sg)+len(sm.learnerNodes()) != len(s.Metadata.ConfState.Nodes)+len(s.Metadata.ConfState.Learners) {
t.Errorf("sm.Nodes = %+v, length not equal with %+v", sg, s.Metadata.ConfState)
}
for _, n := range s.Metadata.ConfState.Nodes {
Expand Down Expand Up @@ -2873,6 +2873,7 @@ func TestAddLearner(t *testing.T) {
t.Errorf("pendingConf = %v, want false", r.pendingConf)
}
nodes := r.nodes()
nodes = append(nodes, r.learnerNodes()...)
wnodes := []uint64{1, 2}
if !reflect.DeepEqual(nodes, wnodes) {
t.Errorf("nodes = %v, want %v", nodes, wnodes)
Expand Down
6 changes: 4 additions & 2 deletions raft/rawnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ func (rn *RawNode) ProposeConfChange(cc pb.ConfChange) error {
func (rn *RawNode) ApplyConfChange(cc pb.ConfChange) *pb.ConfState {
if cc.ReplicaID == None {
rn.raft.resetPendingConf()
return &pb.ConfState{Nodes: rn.raft.nodes(), Groups: rn.raft.groups()}
return &pb.ConfState{Nodes: rn.raft.nodes(), Groups: rn.raft.groups(),
Learners: rn.raft.learnerNodes(), LearnerGroups: rn.raft.learnerGroups()}
}
switch cc.Type {
case pb.ConfChangeAddNode:
Expand All @@ -188,7 +189,8 @@ func (rn *RawNode) ApplyConfChange(cc pb.ConfChange) *pb.ConfState {
default:
panic("unexpected conf type")
}
return &pb.ConfState{Nodes: rn.raft.nodes(), Groups: rn.raft.groups()}
return &pb.ConfState{Nodes: rn.raft.nodes(), Groups: rn.raft.groups(),
Learners: rn.raft.learnerNodes(), LearnerGroups: rn.raft.learnerGroups()}
}

// Step advances the state machine using the given message.
Expand Down
4 changes: 4 additions & 0 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,7 @@ func TestRestartCluster(t *testing.T) {
// TODO:
// stop all nodes in cluster and start one by one
}

func TestRestartWithForceAlone(t *testing.T) {
// TODO: test force restart with alone
}