Skip to content

Commit

Permalink
Merge pull request #41 from absolute8511/log-syncer
Browse files Browse the repository at this point in the history
Log syncer
  • Loading branch information
absolute8511 authored Apr 8, 2018
2 parents e7e3888 + 0c8433e commit 83341e9
Show file tree
Hide file tree
Showing 62 changed files with 6,278 additions and 592 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ script:
- wget -c https://github.com/coreos/etcd/releases/download/v2.3.8/etcd-v2.3.8-linux-amd64.tar.gz
- tar -xvzf etcd-v2.3.8-linux-amd64.tar.gz
- ./etcd-v2.3.8-linux-amd64/etcd -name=test-etcd0 -initial-advertise-peer-urls=http://127.0.0.1:2380 -listen-client-urls=http://127.0.0.1:2379 -advertise-client-urls=http://127.0.0.1:2379 -listen-peer-urls=http://127.0.0.1:2380 -initial-cluster="test-etcd0=http://127.0.0.1:2380" -initial-cluster-state=new --data-dir ./test-etcd > etcd.log 2>&1 &
- go get -u golang.org/x/sys/...
- ./test.sh
notifications:
email: false
Expand Down
1 change: 1 addition & 0 deletions Godeps
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ github.com/absolute8511/glog
github.com/absolute8511/go-zanredisdb
github.com/absolute8511/xlock2
github.com/gogo/protobuf 342cbe0a04158f6dcb03ca0079991a51a4248c02
google.golang.org/grpc 7cea4cc846bcf00cbb27595b07da5de875ef7de9
github.com/ugorji/go 708a42d246822952f38190a8d8c4e6b16a0e600c
github.com/coreos/etcd dd0d5902177b6336b8cf344e6cbf1962b5981dde
github.com/coreos/go-semver/semver
Expand Down
1 change: 1 addition & 0 deletions apps/placedriver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ var (
logLevel = flagSet.Int("log-level", 1, "log verbose level")
logDir = flagSet.String("log-dir", "", "directory for log file")
dataDir = flagSet.String("data-dir", "", "directory for data")
learnerRole = flagSet.String("learner-role", "", "learner role for pd")
balanceInterval = common.StringArray{}
)

Expand Down
4 changes: 2 additions & 2 deletions build-pb.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
set -e
GOGOROOT="${GOPATH}/src/github.com/gogo/protobuf"
GOGOPATH="${GOGOROOT}:${GOGOROOT}/protobuf"
DIRS="./node ./raft/raftpb ./snap/snappb ./wal/walpb ./rockredis"
DIRS="./node ./raft/raftpb ./snap/snappb ./wal/walpb ./rockredis ./syncerpb"
echo $GOROOT
echo $GOPATH
for dir in ${DIRS}; do
pushd ${dir}
protoc --proto_path=$GOPATH:$GOGOPATH:./ --gofast_out=. *.proto
protoc --proto_path=$GOPATH:$GOGOPATH:./ --gofast_out=plugins=grpc:. *.proto
popd
done
5 changes: 4 additions & 1 deletion cluster/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,10 @@ func GenNodeID(n *NodeInfo, extra string) string {
tmpbuf.WriteString(":")
tmpbuf.WriteString(n.NodeIP)
tmpbuf.WriteString(":")
tmpbuf.WriteString(n.RpcPort)
// rpc port is not used in older version,
// in order to make it compatible we should keep rpc port empty
// it is enough to identify the node id with ip+redisport+httpport
//tmpbuf.WriteString(n.RpcPort)
tmpbuf.WriteString(":")
tmpbuf.WriteString(n.RedisPort)
tmpbuf.WriteString(":")
Expand Down
242 changes: 242 additions & 0 deletions cluster/datanode_coord/data_learner_coord.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
package datanode_coord

import (
"sort"
"sync/atomic"
"time"

"github.com/absolute8511/ZanRedisDB/cluster"
"github.com/absolute8511/ZanRedisDB/common"
node "github.com/absolute8511/ZanRedisDB/node"
)

func (dc *DataCoordinator) loadLocalNamespaceForLearners() error {
if dc.localNSMgr == nil {
cluster.CoordLog().Infof("no namespace manager")
return nil
}
namespaceMap, _, err := dc.register.GetAllNamespaces()
if err != nil {
if err == cluster.ErrKeyNotFound {
return nil
}
return err
}
sortedParts := make(PartitionList, 0)
for namespaceName, namespaceParts := range namespaceMap {
sortedParts = sortedParts[:0]
for _, part := range namespaceParts {
sortedParts = append(sortedParts, part)
}
sort.Sort(sortedParts)
for _, nsInfo := range sortedParts {
localNamespace := dc.localNSMgr.GetNamespaceNode(nsInfo.GetDesp())
if localNamespace != nil {
// already loaded
joinErr := dc.ensureJoinNamespaceGroupForLearner(nsInfo, localNamespace, false)
if joinErr != nil && joinErr != cluster.ErrNamespaceConfInvalid {
// we ensure join group as order for partitions
break
}
if cluster.CoordLog().Level() >= common.LOG_DETAIL {
cluster.CoordLog().Debugf("%v namespace %v already loaded", dc.GetMyID(), nsInfo.GetDesp())
}
continue
}
if !dc.isInLearnerGroup(nsInfo, nil) {
continue
}
cluster.CoordLog().Infof("loading namespace as learner: %v", nsInfo.GetDesp())
if namespaceName == "" {
continue
}
checkErr := dc.checkLocalNamespaceMagicCode(&nsInfo, true)
if checkErr != nil {
cluster.CoordLog().Errorf("failed to check namespace :%v, err:%v", nsInfo.GetDesp(), checkErr)
continue
}

localNamespace, coordErr := dc.updateLocalNamespace(&nsInfo, false)
if coordErr != nil {
cluster.CoordLog().Errorf("failed to init/update local namespace %v: %v", nsInfo.GetDesp(), coordErr)
continue
}

dyConf := &node.NamespaceDynamicConf{}
localNamespace.SetDynamicInfo(*dyConf)
localErr := dc.checkAndFixLocalNamespaceData(&nsInfo, localNamespace)
if localErr != nil {
cluster.CoordLog().Errorf("check local namespace %v data need to be fixed:%v", nsInfo.GetDesp(), localErr)
localNamespace.SetDataFixState(true)
}
joinErr := dc.ensureJoinNamespaceGroupForLearner(nsInfo, localNamespace, true)
if joinErr != nil && joinErr != cluster.ErrNamespaceConfInvalid {
// we ensure join group as order for partitions
break
}
}
}
return nil
}

func (dc *DataCoordinator) ensureJoinNamespaceGroupForLearner(nsInfo cluster.PartitionMetaInfo,
localNamespace *node.NamespaceNode, firstLoad bool) *cluster.CoordErr {

lrns := nsInfo.LearnerNodes[dc.learnerRole]
if len(lrns) > 0 && lrns[0] == dc.GetMyID() {
localNamespace.SwitchForLearnerLeader(true)
} else {
localNamespace.SwitchForLearnerLeader(false)
}

dyConf := &node.NamespaceDynamicConf{}
localNamespace.SetDynamicInfo(*dyConf)
if localNamespace.IsDataNeedFix() {
// clean local data
}
localNamespace.SetDataFixState(false)
if !dc.isInLearnerGroup(nsInfo, localNamespace) {
return nil
}
raftID, ok := nsInfo.RaftIDs[dc.GetMyID()]
if !ok {
cluster.CoordLog().Warningf("namespace %v failed to get raft id %v while check join", nsInfo.GetDesp(),
nsInfo.RaftIDs)
return cluster.ErrNamespaceConfInvalid
}
var joinErr *cluster.CoordErr
retry := 0
startCheck := time.Now()
requestJoined := make(map[string]bool)
for time.Since(startCheck) < EnsureJoinCheckWait {
mems := localNamespace.GetLearners()
alreadyJoined := false
for _, m := range mems {
if m.NodeID == dc.GetMyRegID() &&
m.GroupName == nsInfo.GetDesp() &&
m.ID == raftID {
alreadyJoined = true
break
}
}
if alreadyJoined {
joinErr = nil
break
} else {
joinErr = cluster.ErrNamespaceWaitingSync
var remote string
cnt := 0
isr := nsInfo.GetISR()
if len(isr) == 0 {
isr = nsInfo.RaftNodes
}
for cnt <= len(isr) {
remote = isr[retry%len(isr)]
retry++
cnt++
if remote == dc.GetMyID() {
continue
}
}
time.Sleep(time.Millisecond * 100)
if _, ok := requestJoined[remote]; !ok {
err := dc.requestJoinNamespaceGroup(raftID, &nsInfo, localNamespace, remote, true)
if err == nil {
requestJoined[remote] = true
}
}
select {
case <-dc.stopChan:
return cluster.ErrNamespaceExiting
case <-time.After(time.Second / 2):
}
}
}
if joinErr != nil {
dc.tryCheckNamespaces()
cluster.CoordLog().Infof("local namespace join failed: %v, retry later: %v", joinErr, nsInfo.GetDesp())
} else if retry > 0 {
cluster.CoordLog().Infof("local namespace join done: %v", nsInfo.GetDesp())
}
return joinErr
}

func (dc *DataCoordinator) isInLearnerGroup(nsInfo cluster.PartitionMetaInfo, localNamespace *node.NamespaceNode) bool {
// removing node can stop local raft only when all the other members
// are notified to remove this node
// Mostly, the remove node proposal will handle the raft node stop, however
// there are some situations to be checked here.
inMeta := false
for _, nid := range nsInfo.LearnerNodes[dc.learnerRole] {
if nid == dc.GetMyID() {
if localNamespace == nil {
inMeta = true
break
}
replicaID := nsInfo.RaftIDs[nid]
if replicaID == localNamespace.GetRaftID() {
inMeta = true
break
}
}
}
return inMeta
}

func (dc *DataCoordinator) checkForUnsyncedLogSyncers() {
ticker := time.NewTicker(CheckUnsyncedLearnerInterval)
cluster.CoordLog().Infof("%v begin to check for unsynced log syncers", dc.GetMyID())
defer cluster.CoordLog().Infof("%v check for unsynced log syncers quit", dc.GetMyID())
defer dc.wg.Done()

doWork := func() {
if atomic.LoadInt32(&dc.stopping) == 1 {
return
}
// try load local namespace if any namespace raft group changed
err := dc.loadLocalNamespaceForLearners()
if err != nil {
cluster.CoordLog().Infof("%v load local error : %v", dc.GetMyID(), err.Error())
}

// check local namespaces with cluster to remove the unneed data
tmpChecks := dc.localNSMgr.GetNamespaces()
for name, localNamespace := range tmpChecks {
namespace, pid := common.GetNamespaceAndPartition(name)
if namespace == "" {
cluster.CoordLog().Warningf("namespace invalid: %v", name)
continue
}
nsInfo, err := dc.register.GetNamespacePartInfo(namespace, pid)
if err != nil {
cluster.CoordLog().Infof("got namespace %v meta failed: %v", name, err)
if err == cluster.ErrKeyNotFound {
dc.forceRemoveLocalNamespace(localNamespace)
} else {
dc.tryCheckNamespacesIn(time.Second * 5)
}
continue
}

if !dc.isInLearnerGroup(*nsInfo, localNamespace) {
cluster.CoordLog().Infof("namespace %v removed since not in learner group", name, nsInfo.LearnerNodes)
dc.forceRemoveLocalNamespace(localNamespace)
continue
}
}
}

nsChangedChan := dc.register.GetNamespacesNotifyChan()
for {
select {
case <-dc.stopChan:
return
case <-dc.tryCheckUnsynced:
doWork()
time.Sleep(time.Millisecond * 100)
case <-ticker.C:
doWork()
case <-nsChangedChan:
}
}
}
Loading

0 comments on commit 83341e9

Please sign in to comment.