Skip to content

Commit

Permalink
Merge pull request #35 from youzan/fix-checkbackup-timeout
Browse files Browse the repository at this point in the history
Optimize the sync between clusters
  • Loading branch information
absolute8511 authored Jun 29, 2019
2 parents d2b9563 + 0682924 commit 68c64f3
Show file tree
Hide file tree
Showing 23 changed files with 682 additions and 93 deletions.
6 changes: 3 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ required = [

[[constraint]]
name = "github.com/youzan/go-zanredisdb"
version = "0.3.4"
version = "0.3.5"

[[constraint]]
branch = "master"
Expand Down
8 changes: 6 additions & 2 deletions cluster/datanode_coord/data_learner_coord.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ func (dc *DataCoordinator) loadLocalNamespaceForLearners() error {
continue
}

dyConf := &node.NamespaceDynamicConf{}
dyConf := &node.NamespaceDynamicConf{
nsInfo.Replica,
}
localNamespace.SetDynamicInfo(*dyConf)
localErr := dc.checkAndFixLocalNamespaceData(&nsInfo, localNamespace)
if localErr != nil {
Expand All @@ -90,7 +92,9 @@ func (dc *DataCoordinator) ensureJoinNamespaceGroupForLearner(nsInfo cluster.Par
localNamespace.SwitchForLearnerLeader(false)
}

dyConf := &node.NamespaceDynamicConf{}
dyConf := &node.NamespaceDynamicConf{
nsInfo.Replica,
}
localNamespace.SetDynamicInfo(*dyConf)
if localNamespace.IsDataNeedFix() {
// clean local data
Expand Down
26 changes: 19 additions & 7 deletions cluster/datanode_coord/data_node_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (dc *DataCoordinator) Start() error {

dc.wg.Add(1)
go dc.checkForUnsyncedNamespaces()
} else if dc.learnerRole == common.LearnerRoleLogSyncer {
} else if common.IsRoleLogSyncer(dc.learnerRole) {
dc.loadLocalNamespaceForLearners()
dc.wg.Add(1)
go dc.checkForUnsyncedLogSyncers()
Expand Down Expand Up @@ -286,7 +286,9 @@ func (dc *DataCoordinator) loadLocalNamespaceData() error {
continue
}

dyConf := &node.NamespaceDynamicConf{}
dyConf := &node.NamespaceDynamicConf{
nsInfo.Replica,
}
localNamespace.SetDynamicInfo(*dyConf)
localErr := dc.checkAndFixLocalNamespaceData(&nsInfo, localNamespace)
if localErr != nil {
Expand Down Expand Up @@ -885,7 +887,8 @@ func (dc *DataCoordinator) getRaftAddrForNode(nid string) (string, *cluster.Coor
return node.RaftTransportAddr, nil
}

func (dc *DataCoordinator) prepareNamespaceConf(nsInfo *cluster.PartitionMetaInfo, raftID uint64, join bool) (*node.NamespaceConfig, *cluster.CoordErr) {
func (dc *DataCoordinator) prepareNamespaceConf(nsInfo *cluster.PartitionMetaInfo, raftID uint64,
join bool, forceStandaloneCluster bool) (*node.NamespaceConfig, *cluster.CoordErr) {
var err *cluster.CoordErr
nsConf := node.NewNSConfig()
nsConf.BaseName = nsInfo.Name
Expand Down Expand Up @@ -931,7 +934,12 @@ func (dc *DataCoordinator) prepareNamespaceConf(nsInfo *cluster.PartitionMetaInf
}
if !join && len(nsConf.RaftGroupConf.SeedNodes) <= nsInfo.Replica/2 {
cluster.CoordLog().Warningf("seed nodes for namespace %v not enough: %v", nsInfo.GetDesp(), nsConf.RaftGroupConf)
return nil, cluster.ErrNamespaceConfInvalid
// we should allow single node as new raft cluster while in disaster re-init mode.
// In this case, we only have one seed node while init. (Notice, it may not allow write while init
// since the raft node is small than half of the replicator )
if !forceStandaloneCluster {
return nil, cluster.ErrNamespaceConfInvalid
}
}
return nsConf, nil
}
Expand Down Expand Up @@ -1006,7 +1014,9 @@ func (dc *DataCoordinator) ensureJoinNamespaceGroup(nsInfo cluster.PartitionMeta
return cluster.ErrCatchupRunningBusy
}

dyConf := &node.NamespaceDynamicConf{}
dyConf := &node.NamespaceDynamicConf{
nsInfo.Replica,
}
localNamespace.SetDynamicInfo(*dyConf)
if localNamespace.IsDataNeedFix() {
// clean local data
Expand Down Expand Up @@ -1128,7 +1138,7 @@ func (dc *DataCoordinator) updateLocalNamespace(nsInfo *cluster.PartitionMetaInf
if forceStandaloneCluster {
join = false
}
nsConf, err := dc.prepareNamespaceConf(nsInfo, raftID, join)
nsConf, err := dc.prepareNamespaceConf(nsInfo, raftID, join, forceStandaloneCluster)
if err != nil {
go dc.tryCheckNamespaces()
cluster.CoordLog().Warningf("prepare join namespace %v failed: %v", nsInfo.GetDesp(), err)
Expand Down Expand Up @@ -1157,7 +1167,9 @@ func (dc *DataCoordinator) updateLocalNamespace(nsInfo *cluster.PartitionMetaInf
cluster.CoordLog().Warningf("local namespace %v init magic code failed: %v", nsInfo.GetDesp(), localErr)
return localNode, cluster.ErrLocalInitNamespaceFailed
}
dyConf := &node.NamespaceDynamicConf{}
dyConf := &node.NamespaceDynamicConf{
nsConf.Replicator,
}
localNode.SetDynamicInfo(*dyConf)
if err := localNode.Start(forceStandaloneCluster); err != nil {
return nil, cluster.ErrLocalInitNamespaceFailed
Expand Down
2 changes: 1 addition & 1 deletion cluster/register_etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -1016,7 +1016,7 @@ func NewDNEtcdRegister(host string) (*DNEtcdRegister, error) {

func (etcdReg *DNEtcdRegister) Register(nodeData *NodeInfo) error {
if nodeData.LearnerRole != "" &&
nodeData.LearnerRole != common.LearnerRoleLogSyncer &&
!common.IsRoleLogSyncer(nodeData.LearnerRole) &&
nodeData.LearnerRole != common.LearnerRoleSearcher {
return ErrLearnerRoleUnsupported
}
Expand Down
87 changes: 87 additions & 0 deletions common/dynamic_conf.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package common

import (
"fmt"
"sort"
"strconv"
"sync"
"sync/atomic"
)

const (
ConfCheckSnapTimeout = "check_snap_timeout"
ConfCheckRaftTimeout = "check_raft_timeout"
ConfIgnoreStartupNoBackup = "ignore_startup_nobackup"
ConfIgnoreRemoteFileSync = "ignore_remote_file_sync"
ConfMaxRemoteRecover = "max_remote_recover"
)

var intConfMap map[string]*int64
var strConfMap sync.Map

func init() {
intConfMap = make(map[string]*int64)
snapCheckTimeout := int64(60)
intConfMap[ConfCheckSnapTimeout] = &snapCheckTimeout
raftCheckTimeout := int64(5)
intConfMap[ConfCheckRaftTimeout] = &raftCheckTimeout
emptyInt := int64(0)
intConfMap["empty_int"] = &emptyInt
maxRemoteRecover := int64(2)
intConfMap[ConfMaxRemoteRecover] = &maxRemoteRecover

strConfMap.Store("test_str", "test_str")
}

func DumpDynamicConf() []string {
cfs := make([]string, 0, len(intConfMap)*2)
for k, v := range intConfMap {
iv := atomic.LoadInt64(v)
cfs = append(cfs, k+":"+strconv.Itoa(int(iv)))
}
strConfMap.Range(func(k, v interface{}) bool {
cfs = append(cfs, fmt.Sprintf("%v:%v", k, v))
return true
})
sort.Sort(sort.StringSlice(cfs))
return cfs
}

func SetIntDynamicConf(k string, newV int) {
v, ok := intConfMap[k]
if ok {
atomic.StoreInt64(v, int64(newV))
}
}

func IsConfSetted(k string) bool {
iv := GetIntDynamicConf(k)
if iv != 0 {
return true
}
sv := GetStrDynamicConf(k)
if sv != "" {
return true
}
return false
}

func GetIntDynamicConf(k string) int {
v, ok := intConfMap[k]
if ok {
return int(atomic.LoadInt64(v))
}
return 0
}

func SetStrDynamicConf(k string, newV string) {
strConfMap.Store(k, newV)
}

func GetStrDynamicConf(k string) string {
v, ok := strConfMap.Load(k)
if !ok {
return ""
}
return v.(string)
}
150 changes: 150 additions & 0 deletions common/dynamic_conf_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package common

import (
"reflect"
"strconv"
"sync"
"testing"
)

func TestDumpDynamicConf(t *testing.T) {
tests := []struct {
name string
want []string
}{
// Add test cases.
{"dump", []string{"check_raft_timeout:5", "check_snap_timeout:60",
"empty_int:0",
"max_remote_recover:2",
"test_str:test_str"}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := DumpDynamicConf(); !reflect.DeepEqual(got, tt.want) {
t.Errorf("DumpDynamicConf() = %v, want %v", got, tt.want)
}
})
}
}

func TestGetIntDynamicConf(t *testing.T) {
type args struct {
k string
}
tests := []struct {
name string
args args
want int
}{
//Add test cases.
{"get default check_snap_timeout", args{ConfCheckSnapTimeout}, 60},
{"get changed check_snap_timeout", args{ConfCheckSnapTimeout}, 2},
{"get non exist", args{"noexist"}, 0},
{"get after set non exist", args{"noexist-set"}, 0},
}
SetIntDynamicConf("noexist-set", 2)
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := GetIntDynamicConf(tt.args.k); got != tt.want {
t.Errorf("GetIntDynamicConf() = %v, want %v", got, tt.want)
}
SetIntDynamicConf(ConfCheckSnapTimeout, 2)
})
}
}

func TestGetStrDynamicConf(t *testing.T) {
type args struct {
k string
}
tests := []struct {
name string
args args
want string
}{
{"get default test_str", args{"test_str"}, "test_str"},
{"get changed test_str", args{"test_str"}, "test_str_changed"},
{"get non exist", args{"noexist"}, ""},
{"get after set non exist", args{"noexist-set"}, "set-noexist"},
}
SetStrDynamicConf("noexist-set", "set-noexist")
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := GetStrDynamicConf(tt.args.k); got != tt.want {
t.Errorf("GetStrDynamicConf() = %v, want %v", got, tt.want)
}
SetStrDynamicConf("test_str", "test_str_changed")
})
}
}

func TestIsConfSetted(t *testing.T) {
type args struct {
k string
}
tests := []struct {
pre func()
name string
args args
want bool
}{
{nil, "check default check_snap_timeout", args{"check_snap_timeout"}, true},
{func() { SetIntDynamicConf("check_snap_timeout", 0) }, "check empty check_snap_timeout", args{"check_snap_timeout"}, false},
{nil, "check non exist", args{"noexist"}, false},
{nil, "check empty str conf", args{"empty_str"}, false},
{nil, "check empty int conf", args{"empty_int"}, false},
{nil, "check non exist str", args{"noexist-set-str"}, false},
{func() { SetStrDynamicConf("noexist-set-str", "v") }, "check after set non exist str", args{"noexist-set-str"}, true},
}
SetStrDynamicConf("empty_str", "")
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.pre != nil {
tt.pre()
}
if got := IsConfSetted(tt.args.k); got != tt.want {
t.Errorf("IsConfSetted() = %v, want %v", got, tt.want)
}
})
}
}

func TestConfRace(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 1000000; i++ {
GetIntDynamicConf("check_snap_timeout")
GetIntDynamicConf("check_raft_timeout")
GetStrDynamicConf("test_str")
GetIntDynamicConf("noexist")
GetStrDynamicConf("noexist")
}
}()

wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 1000000; i++ {
SetIntDynamicConf("check_snap_timeout", i)
SetIntDynamicConf("check_raft_timeout", i)
SetStrDynamicConf("test_str", strconv.Itoa(i))
SetIntDynamicConf("noexist", i)
SetStrDynamicConf("noexist", "v")
}
}()

wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 1000000; i++ {
GetIntDynamicConf("check_snap_timeout")
GetIntDynamicConf("check_raft_timeout")
GetStrDynamicConf("test_str")
GetIntDynamicConf("noexist")
GetStrDynamicConf("noexist")
}
}()
wg.Wait()
}
4 changes: 4 additions & 0 deletions common/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ const (
LearnerRoleSearcher = "role_searcher"
)

func IsRoleLogSyncer(role string) bool {
return strings.HasPrefix(role, LearnerRoleLogSyncer)
}

var (
SCAN_CURSOR_SEP = []byte(";")
SCAN_NODE_SEP = []byte(":")
Expand Down
Loading

0 comments on commit 68c64f3

Please sign in to comment.