Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize transfer leader #32

Merged
merged 15 commits into from
Dec 20, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ go:
- 1.7.x
- 1.8.x
env:
- GOARCH=amd64
- GOARCH=386
- TEST_RACE=true
- TEST_RACE=false
- GOARCH=amd64 TEST_RACE=false
- GOARCH=amd64 TEST_RACE=true
- GOARCH=386 TEST_RACE=false
- GOARCH=386 TEST_RACE=true
sudo: required
addons:
apt:
Expand Down
6 changes: 4 additions & 2 deletions apps/placedriver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ package main
import (
"flag"
"fmt"
"github.com/absolute8511/ZanRedisDB/common"
"github.com/absolute8511/ZanRedisDB/pdserver"
"log"
"os"
"path/filepath"
"syscall"
"time"

"github.com/absolute8511/ZanRedisDB/common"
"github.com/absolute8511/ZanRedisDB/pdserver"

"github.com/BurntSushi/toml"
"github.com/absolute8511/glog"
"github.com/judwhite/go-svc/svc"
Expand All @@ -35,6 +36,7 @@ var (

logLevel = flagSet.Int("log-level", 1, "log verbose level")
logDir = flagSet.String("log-dir", "", "directory for log file")
dataDir = flagSet.String("data-dir", "", "directory for data")
balanceInterval = common.StringArray{}
)

Expand Down
1 change: 1 addition & 0 deletions cluster/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,4 +248,5 @@ type Options struct {
AutoBalanceAndMigrate bool
BalanceStart int
BalanceEnd int
DataDir string
}
69 changes: 61 additions & 8 deletions cluster/datanode_coord/data_node_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ var (
ErrNamespaceNotReady = cluster.NewCoordErr("namespace node is not ready", cluster.CoordLocalErr)
ErrNamespaceInvalid = errors.New("namespace name is invalid")
ErrNamespaceNotFound = errors.New("namespace is not found")
TransferLeaderWait = time.Second * 20
)

const (
Expand Down Expand Up @@ -423,22 +424,55 @@ func (dc *DataCoordinator) getNamespaceRaftLeader(nsInfo *cluster.PartitionMetaI
return m.NodeID
}

func (dc *DataCoordinator) transferMyNamespaceLeader(nsInfo *cluster.PartitionMetaInfo, nid string) {
func (dc *DataCoordinator) transferMyNamespaceLeader(nsInfo *cluster.PartitionMetaInfo, nid string, force bool) bool {
nsNode := dc.localNSMgr.GetNamespaceNode(nsInfo.GetDesp())
if nsNode == nil {
return
return false
}
toRaftID, ok := nsInfo.RaftIDs[nid]
if !ok {
cluster.CoordLog().Warningf("transfer namespace %v leader to %v failed for missing raft id: %v",
nsInfo.GetDesp(), nid, nsInfo.RaftIDs)
return
return false
}

if !force {
if time.Now().UnixNano()-nsNode.GetLastLeaderChangedTime() < time.Minute.Nanoseconds() {
return false
}
if !dc.isReplicaReadyForRaft(nsNode, toRaftID, nid) {
return false
}
}
cluster.CoordLog().Infof("begin transfer namespace %v leader to %v", nsInfo.GetDesp(), nid)
err := nsNode.TransferMyLeader(cluster.ExtractRegIDFromGenID(nid), toRaftID)
if err != nil {
cluster.CoordLog().Infof("failed to transfer namespace %v leader to %v: %v", nsInfo.GetDesp(), nid, err)
return false
}
return true
}

// check on leader if the replica ready for raft, which means this replica is most updated
// and have the nearly the newest raft logs.
func (dc *DataCoordinator) isReplicaReadyForRaft(nsNode *node.NamespaceNode, toRaftID uint64, nodeID string) bool {
if nsNode.IsReady() && nsNode.Node.IsReplicaRaftReady(toRaftID) {
nip, _, _, httpPort := cluster.ExtractNodeInfoFromID(nodeID)
code, err := common.APIRequest("GET",
"http://"+net.JoinHostPort(nip, httpPort)+common.APINodeAllReady,
nil, time.Second, nil)
if err != nil {
cluster.CoordLog().Infof("not ready from %v for transfer leader: %v, %v", nip, code, err.Error())
return false
}
return true
}
if nsNode.IsReady() {
stats := nsNode.Node.GetRaftStatus()
cluster.CoordLog().Infof("namespace %v raft status still not ready for node:%v, %v",
nsNode.FullName(), toRaftID, stats)
}
return false
}

func (dc *DataCoordinator) checkForUnsyncedNamespaces() {
Expand All @@ -449,6 +483,8 @@ func (dc *DataCoordinator) checkForUnsyncedNamespaces() {
m common.MemberInfo
}
pendingRemovings := make(map[string]map[uint64]pendingRemoveInfo)
// avoid transfer too much partitions in the same time
lastTransferCheckedTime := time.Now()
doWork := func() {
if atomic.LoadInt32(&dc.stopping) == 1 {
return
Expand Down Expand Up @@ -537,8 +573,13 @@ func (dc *DataCoordinator) checkForUnsyncedNamespaces() {
if cluster.FindSlice(isrList, dc.GetMyID()) == -1 {
cluster.CoordLog().Infof("namespace %v leader is not in isr: %v, maybe removing",
namespaceMeta.GetDesp(), isrList)
if time.Now().UnixNano()-localNamespace.GetLastLeaderChangedTime() > time.Minute.Nanoseconds() {
dc.transferMyNamespaceLeader(namespaceMeta, isrList[0])
done := false
if time.Since(lastTransferCheckedTime) >= TransferLeaderWait {
done = dc.transferMyNamespaceLeader(namespaceMeta, isrList[0], false)
lastTransferCheckedTime = time.Now()
}
if !done {
go dc.tryCheckNamespacesIn(TransferLeaderWait)
}
continue
}
Expand All @@ -549,8 +590,14 @@ func (dc *DataCoordinator) checkForUnsyncedNamespaces() {
// all the cluster nodes.
// avoid transfer while some node is leaving, so wait enough time to
// allow node leaving
if time.Now().UnixNano()-localNamespace.GetLastLeaderChangedTime() > time.Minute.Nanoseconds() {
dc.transferMyNamespaceLeader(namespaceMeta, isrList[0])
// also we should avoid transfer leader while some node is catchuping while recover from restart
done := false
if time.Since(lastTransferCheckedTime) >= TransferLeaderWait {
done = dc.transferMyNamespaceLeader(namespaceMeta, isrList[0], false)
lastTransferCheckedTime = time.Now()
}
if !done {
go dc.tryCheckNamespacesIn(TransferLeaderWait)
}
} else {
// check if any replica is not joined to members
Expand Down Expand Up @@ -658,6 +705,7 @@ func (dc *DataCoordinator) checkForUnsyncedNamespaces() {
return
case <-dc.tryCheckUnsynced:
doWork()
time.Sleep(time.Millisecond * 100)
case <-ticker.C:
doWork()
case <-nsChangedChan:
Expand Down Expand Up @@ -780,6 +828,11 @@ func (dc *DataCoordinator) requestJoinNamespaceGroup(raftID uint64, nsInfo *clus
return nil
}

func (dc *DataCoordinator) tryCheckNamespacesIn(wait time.Duration) {
time.Sleep(wait)
dc.tryCheckNamespaces()
}

func (dc *DataCoordinator) tryCheckNamespaces() {
time.Sleep(time.Second)
select {
Expand Down Expand Up @@ -1085,7 +1138,7 @@ func (dc *DataCoordinator) prepareLeavingCluster() {
if newLeader == dc.GetMyID() {
continue
}
dc.transferMyNamespaceLeader(nsInfo.GetCopy(), newLeader)
dc.transferMyNamespaceLeader(nsInfo.GetCopy(), newLeader, true)
break
}
}
Expand Down
53 changes: 52 additions & 1 deletion cluster/pdnode_coord/pd_coordinator.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package pdnode_coord

import (
"encoding/json"
"errors"
"io/ioutil"
"path"
"runtime"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/absolute8511/ZanRedisDB/cluster"
"github.com/absolute8511/ZanRedisDB/common"
)

var (
Expand Down Expand Up @@ -52,6 +57,7 @@ type PDCoordinator struct {
doChecking int32
autoBalance bool
stableNodeNum int32
dataDir string
}

func NewPDCoordinator(clusterID string, n *cluster.NodeInfo, opts *cluster.Options) *PDCoordinator {
Expand All @@ -64,11 +70,12 @@ func NewPDCoordinator(clusterID string, n *cluster.NodeInfo, opts *cluster.Optio
checkNamespaceFailChan: make(chan cluster.NamespaceNameInfo, 3),
stopChan: make(chan struct{}),
monitorChan: make(chan struct{}),
autoBalance: opts.AutoBalanceAndMigrate,
}
coord.dpm = NewDataPlacement(coord)
if opts != nil {
coord.dpm.SetBalanceInterval(opts.BalanceStart, opts.BalanceEnd)
coord.autoBalance = opts.AutoBalanceAndMigrate
coord.dataDir = opts.DataDir
}
return coord
}
Expand Down Expand Up @@ -152,6 +159,40 @@ func (pdCoord *PDCoordinator) handleLeadership() {
}
}

func (pdCoord *PDCoordinator) saveClusterToFile(newNamespaces map[string]map[int]cluster.PartitionMetaInfo) {
cluster.CoordLog().Infof("begin save namespace meta ")
d, _ := json.Marshal(newNamespaces)
dataDir := pdCoord.dataDir
if dataDir == "" {
var err error
dataDir, err = ioutil.TempDir("", "zankv-meta")
if err != nil {
cluster.CoordLog().Infof("init temp dir failed: %v", err.Error())
return
}
}
prefix := strconv.Itoa(int(time.Now().UnixNano()))
prefix = path.Join(dataDir, prefix)
ioutil.WriteFile(prefix+".meta.ns", d, common.FILE_PERM)
meta, err := pdCoord.register.GetClusterMetaInfo()
if err != nil {
cluster.CoordLog().Infof("get cluster meta failed: %v", err.Error())
} else {
d, _ := json.Marshal(meta)
ioutil.WriteFile(prefix+".meta.cluster", d, common.FILE_PERM)
}
for ns, _ := range newNamespaces {
schemas, err := pdCoord.register.GetNamespaceSchemas(ns)
if err != nil {
cluster.CoordLog().Infof("namespace schemas failed: %v", err.Error())
continue
}
d, _ = json.Marshal(schemas)
ioutil.WriteFile(prefix+".schemas."+ns, d, common.FILE_PERM)
}
cluster.CoordLog().Infof("namespace meta saved to : %v", dataDir)
}

func (pdCoord *PDCoordinator) notifyLeaderChanged(monitorChan chan struct{}) {
if pdCoord.leaderNode.GetID() != pdCoord.myNode.GetID() {
cluster.CoordLog().Infof("I am slave (%v). Leader is: %v", pdCoord.myNode, pdCoord.leaderNode)
Expand All @@ -175,6 +216,8 @@ func (pdCoord *PDCoordinator) notifyLeaderChanged(monitorChan chan struct{}) {
cluster.CoordLog().Errorf("load namespace info failed: %v", err)
} else {
cluster.CoordLog().Infof("namespace loaded : %v", len(newNamespaces))
// save to file in case of etcd data disaster
pdCoord.saveClusterToFile(newNamespaces)
}
}

Expand Down Expand Up @@ -456,12 +499,20 @@ func (pdCoord *PDCoordinator) checkNamespaces(monitorChan chan struct{}) {
if pdCoord.register == nil {
return
}
lastSaved := time.Now()
for {
select {
case <-monitorChan:
return
case <-ticker.C:
pdCoord.doCheckNamespaces(monitorChan, nil, waitingMigrateNamespace, true)
if time.Since(lastSaved) > time.Hour*12 {
allNamespaces, _, err := pdCoord.register.GetAllNamespaces()
if err == nil {
lastSaved = time.Now()
pdCoord.saveClusterToFile(allNamespaces)
}
}
case failedInfo := <-pdCoord.checkNamespaceFailChan:
pdCoord.doCheckNamespaces(monitorChan, &failedInfo, waitingMigrateNamespace, failedInfo.NamespaceName == "")
}
Expand Down
3 changes: 2 additions & 1 deletion cluster/pdnode_coord/place_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,9 +461,10 @@ func (dp *DataPlacement) rebalanceNamespace(monitorChan chan struct{}) (bool, bo
}
}
}
// wait raft leader election
select {
case <-monitorChan:
case <-time.After(time.Second):
case <-time.After(time.Second * 5):
}
}
}
Expand Down
1 change: 1 addition & 0 deletions cluster/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ type PDRegister interface {
Unregister(nodeData *NodeInfo) error
// the cluster root modify index
GetClusterEpoch() (EpochType, error)
GetClusterMetaInfo() (ClusterMetaInfo, error)
AcquireAndWatchLeader(leader chan *NodeInfo, stop chan struct{})

GetDataNodes() ([]NodeInfo, error)
Expand Down
13 changes: 13 additions & 0 deletions cluster/register_etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,19 @@ func (etcdReg *PDEtcdRegister) PrepareNamespaceMinGID() (int64, error) {
return clusterMeta.MaxGID, err
}

func (etcdReg *PDEtcdRegister) GetClusterMetaInfo() (ClusterMetaInfo, error) {
var clusterMeta ClusterMetaInfo
rsp, err := etcdReg.client.Get(etcdReg.getClusterMetaPath(), false, false)
if err != nil {
if client.IsKeyNotFound(err) {
return clusterMeta, ErrKeyNotFound
}
return clusterMeta, err
}
err = json.Unmarshal([]byte(rsp.Node.Value), &clusterMeta)
return clusterMeta, err
}

func (etcdReg *PDEtcdRegister) GetClusterEpoch() (EpochType, error) {
rsp, err := etcdReg.client.Get(etcdReg.clusterPath, false, false)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion common/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ var (
var (
ErrInvalidCommand = errors.New("invalid command")
ErrStopped = errors.New("the node stopped")
ErrTimeout = errors.New("queue request timeout")
ErrQueueTimeout = errors.New("queue request timeout")
ErrInvalidArgs = errors.New("invalid arguments")
ErrInvalidRedisKey = errors.New("invalid redis key")
ErrInvalidScanType = errors.New("invalid scan type")
Expand Down
1 change: 1 addition & 0 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const (
APIGetLeader = "/cluster/leader"
APICheckBackup = "/cluster/checkbackup"
APIGetIndexes = "/schema/indexes"
APINodeAllReady = "/node/allready"
)

const (
Expand Down
Loading