Skip to content

Commit

Permalink
Merge branch 'check-clean-expired' into 'master'
Browse files Browse the repository at this point in the history
fix scan namespaces and use magic code to handle the deleted namespace

See merge request !15
  • Loading branch information
absolute8511 committed Nov 26, 2020
2 parents 9e37072 + 7c60866 commit b217934
Show file tree
Hide file tree
Showing 9 changed files with 514 additions and 133 deletions.
9 changes: 0 additions & 9 deletions cluster/datanode_coord/data_learner_coord.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,22 +91,13 @@ func (dc *DataCoordinator) loadLocalNamespaceForLearners() error {
if namespaceName == "" {
continue
}
checkErr := dc.checkLocalNamespaceMagicCode(&nsInfo, true)
if checkErr != nil {
cluster.CoordLog().Errorf("failed to check namespace :%v, err:%v", nsInfo.GetDesp(), checkErr)
continue
}

localNamespace, coordErr := dc.updateLocalNamespace(&nsInfo, false)
if coordErr != nil {
cluster.CoordLog().Errorf("failed to init/update local namespace %v: %v", nsInfo.GetDesp(), coordErr)
continue
}

dyConf := &node.NamespaceDynamicConf{
nsInfo.Replica,
}
localNamespace.SetDynamicInfo(*dyConf)
localErr := dc.checkAndFixLocalNamespaceData(&nsInfo, localNamespace)
if localErr != nil {
cluster.CoordLog().Errorf("check local namespace %v data need to be fixed:%v", nsInfo.GetDesp(), localErr)
Expand Down
48 changes: 26 additions & 22 deletions cluster/datanode_coord/data_node_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,30 @@ func (dc *DataCoordinator) Start() error {
}
if dc.localNSMgr != nil {
dc.localNSMgr.Start()
localNsMagics := dc.localNSMgr.CheckLocalNamespaces()
checkFailed := false
for ns, localMagic := range localNsMagics {
namespace, _ := common.GetNamespaceAndPartition(ns)
if namespace == "" {
continue
}
// check if the magic code mismatch or if already removed by cluster
nsMeta, err := dc.register.GetNamespaceMetaInfo(namespace)
if err != nil && err != cluster.ErrKeyNotFound {
cluster.CoordLog().Warningf("failed to get namespace meta %s from register : %v", ns, err.Error())
return err
}
if err == cluster.ErrKeyNotFound {
dc.localNSMgr.CleanSharedNsFiles(namespace)
} else if nsMeta.MagicCode > 0 && localMagic > 0 && localMagic != nsMeta.MagicCode {
cluster.CoordLog().Errorf("clean left namespace %v data, since magic code not match : %v, %v", ns, localMagic, nsMeta.MagicCode)
// we can not clean shared namespace data here, since it may only parts of namespace mismatched
checkFailed = true
}
}
if checkFailed {
return errors.New("start failed since local data check failed")
}
}
dc.wg.Add(1)
go dc.watchPD()
Expand Down Expand Up @@ -265,18 +289,6 @@ func (dc *DataCoordinator) watchPD() {
}
}

func (dc *DataCoordinator) checkLocalNamespaceMagicCode(nsInfo *cluster.PartitionMetaInfo, tryFix bool) error {
if nsInfo.MagicCode <= 0 {
return nil
}
err := dc.localNSMgr.CheckMagicCode(nsInfo.GetDesp(), nsInfo.MagicCode, tryFix)
if err != nil {
cluster.CoordLog().Infof("namespace %v check magic code error: %v", nsInfo.GetDesp(), err)
return err
}
return nil
}

func (dc *DataCoordinator) loadLocalNamespaceData() error {
if dc.localNSMgr == nil {
cluster.CoordLog().Infof("no namespace manager")
Expand Down Expand Up @@ -320,26 +332,17 @@ func (dc *DataCoordinator) loadLocalNamespaceData() error {
cluster.CoordLog().Debugf("%v namespace %v already loaded", dc.GetMyID(), nsInfo.GetDesp())
continue
}
cluster.CoordLog().Infof("loading namespace: %v", nsInfo.GetDesp())
cluster.CoordLog().Infof("mynode %v loading namespace: %v, %v", dc.GetMyID(), nsInfo.GetDesp(), nsInfo)
if namespaceName == "" {
continue
}
checkErr := dc.checkLocalNamespaceMagicCode(&nsInfo, true)
if checkErr != nil {
cluster.CoordLog().Errorf("failed to check namespace :%v, err:%v", nsInfo.GetDesp(), checkErr)
continue
}

localNamespace, coordErr := dc.updateLocalNamespace(&nsInfo, false)
if coordErr != nil {
cluster.CoordLog().Errorf("failed to init/update local namespace %v: %v", nsInfo.GetDesp(), coordErr)
continue
}

dyConf := &node.NamespaceDynamicConf{
nsInfo.Replica,
}
localNamespace.SetDynamicInfo(*dyConf)
localErr := dc.checkAndFixLocalNamespaceData(&nsInfo, localNamespace)
if localErr != nil {
cluster.CoordLog().Errorf("check local namespace %v data need to be fixed:%v", nsInfo.GetDesp(), localErr)
Expand Down Expand Up @@ -717,6 +720,7 @@ func (dc *DataCoordinator) checkForUnsyncedNamespaces() {
_, err = dc.register.GetNamespaceMetaInfo(namespace)
if err == cluster.ErrKeyNotFound {
dc.forceRemoveLocalNamespace(localNamespace)
dc.localNSMgr.CleanSharedNsFiles(namespace)
}
} else {
dc.tryCheckNamespacesIn(time.Second * 5)
Expand Down
92 changes: 57 additions & 35 deletions cluster/register_etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,18 +109,19 @@ func exchangeNodeValue(c *EtcdClient, nodePath string, initValue string,
type EtcdRegister struct {
nsMutex sync.Mutex

client *EtcdClient
clusterID string
namespaceRoot string
clusterPath string
pdNodeRootPath string
allNamespaceInfos map[string]map[int]PartitionMetaInfo
nsEpoch EpochType
ifNamespaceChanged int32
watchNamespaceStopCh chan struct{}
nsChangedChan chan struct{}
triggerScanCh chan struct{}
wg sync.WaitGroup
client *EtcdClient
clusterID string
namespaceRoot string
clusterPath string
pdNodeRootPath string
allNamespaceInfos map[string]map[int]PartitionMetaInfo
nsEpoch EpochType
ifNamespaceChanged int32
watchNamespaceStopCh chan struct{}
nsChangedChan chan struct{}
triggerScanCh chan struct{}
wg sync.WaitGroup
watchedNsClusterIndex uint64
}

func NewEtcdRegister(host string) (*EtcdRegister, error) {
Expand Down Expand Up @@ -148,6 +149,11 @@ func (etcdReg *EtcdRegister) InitClusterID(id string) {

func (etcdReg *EtcdRegister) Start() {
etcdReg.watchNamespaceStopCh = make(chan struct{})
etcdReg.nsMutex.Lock()
etcdReg.ifNamespaceChanged = 1
etcdReg.allNamespaceInfos = make(map[string]map[int]PartitionMetaInfo)
etcdReg.nsEpoch = 0
etcdReg.nsMutex.Unlock()
etcdReg.wg.Add(2)
go func() {
defer etcdReg.wg.Done()
Expand Down Expand Up @@ -240,6 +246,7 @@ func (etcdReg *EtcdRegister) refreshNamespaces(stopC <-chan struct{}) {
case <-etcdReg.triggerScanCh:
if atomic.LoadInt32(&etcdReg.ifNamespaceChanged) == 1 {
etcdReg.scanNamespaces()
time.Sleep(time.Millisecond * 100)
}
case <-ticker.C:
if atomic.LoadInt32(&etcdReg.ifNamespaceChanged) == 1 {
Expand All @@ -259,30 +266,33 @@ func (etcdReg *EtcdRegister) watchNamespaces(stopC <-chan struct{}) {
}
}()
for {
_, err := watcher.Next(ctx)
rsp, err := watcher.Next(ctx)
if err != nil {
if err == context.Canceled {
coordLog.Infof("watch key[%s] canceled.", etcdReg.namespaceRoot)
return
}
atomic.StoreInt32(&etcdReg.ifNamespaceChanged, 1)
coordLog.Errorf("watcher key[%s] error: %s", etcdReg.namespaceRoot, err.Error())
coordLog.Warningf("watcher key[%s] error: %s", etcdReg.namespaceRoot, err.Error())
if IsEtcdWatchExpired(err) {
rsp, err := etcdReg.client.Get(etcdReg.namespaceRoot, false, true)
if err != nil {
coordLog.Errorf("rewatch and get key[%s] error: %s", etcdReg.namespaceRoot, err.Error())
time.Sleep(time.Second)
continue
}
coordLog.Errorf("watch expired key[%s] : %v", etcdReg.namespaceRoot, rsp)
coordLog.Infof("watch expired, rewatch namespace change at cluster index %v", rsp.Index)
atomic.StoreUint64(&etcdReg.watchedNsClusterIndex, rsp.Index)
watcher = etcdReg.client.Watch(etcdReg.namespaceRoot, rsp.Index+1, true)
// watch expired should be treated as changed of node
} else {
time.Sleep(5 * time.Second)
continue
}
} else {
atomic.StoreUint64(&etcdReg.watchedNsClusterIndex, rsp.Index)
coordLog.Infof("namespace changed %v, cluster index %v", rsp, rsp.Index)
}
coordLog.Debugf("namespace changed.")
atomic.StoreInt32(&etcdReg.ifNamespaceChanged, 1)
select {
case etcdReg.triggerScanCh <- struct{}{}:
Expand All @@ -301,7 +311,7 @@ func (etcdReg *EtcdRegister) scanNamespaces() (map[string]map[int]PartitionMetaI

// since the scan is triggered by watch, we need get newest from quorum
// to avoid get the old data from follower and no any more update event on leader.
rsp, err := etcdReg.client.GetNewest(etcdReg.namespaceRoot, true, true)
rsp, err := etcdReg.client.Get(etcdReg.namespaceRoot, false, true)
if err != nil {
atomic.StoreInt32(&etcdReg.ifNamespaceChanged, 1)
if client.IsKeyNotFound(err) {
Expand All @@ -318,12 +328,18 @@ func (etcdReg *EtcdRegister) scanNamespaces() (map[string]map[int]PartitionMetaI
metaMap := make(map[string]NamespaceMetaInfo)
replicasMap := make(map[string]map[string]PartitionReplicaInfo)
leaderMap := make(map[string]map[string]RealLeader)
maxEpoch := etcdReg.processNamespaceNode(rsp.Node.Nodes, metaMap, replicasMap, leaderMap)
err = etcdReg.processNamespaceNode(rsp.Node.Nodes, metaMap, replicasMap, leaderMap)
if err != nil {
atomic.StoreInt32(&etcdReg.ifNamespaceChanged, 1)
etcdReg.nsMutex.Lock()
nsInfos := etcdReg.allNamespaceInfos
nsEpoch := etcdReg.nsEpoch
etcdReg.nsMutex.Unlock()
coordLog.Infof("refreshing namespaces failed: %v, use old info instead", err)
return nsInfos, nsEpoch, err
}

nsInfos := make(map[string]map[int]PartitionMetaInfo)
if EpochType(rsp.Node.ModifiedIndex) > maxEpoch {
maxEpoch = EpochType(rsp.Node.ModifiedIndex)
}
for k, v := range replicasMap {
meta, ok := metaMap[k]
if !ok {
Expand Down Expand Up @@ -356,11 +372,22 @@ func (etcdReg *EtcdRegister) scanNamespaces() (map[string]map[int]PartitionMetaI
}

etcdReg.nsMutex.Lock()
etcdReg.allNamespaceInfos = nsInfos
if maxEpoch != etcdReg.nsEpoch {
coordLog.Infof("ns epoch changed from %v to : %v ", etcdReg.nsEpoch, maxEpoch)
// here we must use the cluster index for the whole, since the delete for the node may decrease the modified index for the whole tree
maxEpoch := EpochType(rsp.Index)
if maxEpoch > etcdReg.nsEpoch {
etcdReg.allNamespaceInfos = nsInfos
coordLog.Infof("ns epoch changed from %v to : %v , watched: %v", etcdReg.nsEpoch, maxEpoch, atomic.LoadUint64(&etcdReg.watchedNsClusterIndex))
etcdReg.nsEpoch = maxEpoch
} else {
coordLog.Infof("ns epoch changed %v not newer than current: %v , watched: %v", maxEpoch, etcdReg.nsEpoch, atomic.LoadUint64(&etcdReg.watchedNsClusterIndex))
nsInfos = etcdReg.allNamespaceInfos
maxEpoch = etcdReg.nsEpoch
}
if rsp.Index < atomic.LoadUint64(&etcdReg.watchedNsClusterIndex) {
// need scan next time since the cluster index is older than the watched newest
// note it may happen the reponse index larger than watched, because other non-namespace root changes may increase the cluster index
atomic.StoreInt32(&etcdReg.ifNamespaceChanged, 1)
}
etcdReg.nsEpoch = maxEpoch
etcdReg.nsMutex.Unlock()

return nsInfos, maxEpoch, nil
Expand All @@ -369,19 +396,14 @@ func (etcdReg *EtcdRegister) scanNamespaces() (map[string]map[int]PartitionMetaI
func (etcdReg *EtcdRegister) processNamespaceNode(nodes client.Nodes,
metaMap map[string]NamespaceMetaInfo,
replicasMap map[string]map[string]PartitionReplicaInfo,
leaderMap map[string]map[string]RealLeader) EpochType {
maxEpoch := EpochType(0)
leaderMap map[string]map[string]RealLeader) error {
for _, node := range nodes {
if node.Nodes != nil {
newEpoch := etcdReg.processNamespaceNode(node.Nodes, metaMap, replicasMap, leaderMap)
if newEpoch > maxEpoch {
maxEpoch = newEpoch
err := etcdReg.processNamespaceNode(node.Nodes, metaMap, replicasMap, leaderMap)
if err != nil {
return err
}
}
if EpochType(node.ModifiedIndex) > maxEpoch {
maxEpoch = EpochType(node.ModifiedIndex)
}

if node.Dir {
continue
}
Expand Down Expand Up @@ -444,7 +466,7 @@ func (etcdReg *EtcdRegister) processNamespaceNode(nodes client.Nodes,
}
}
}
return maxEpoch
return nil
}

func (etcdReg *EtcdRegister) GetNamespacePartInfo(ns string, partition int) (*PartitionMetaInfo, error) {
Expand Down
Loading

0 comments on commit b217934

Please sign in to comment.