Skip to content

Commit

Permalink
Merge pull request #37 from youzan/improve-raft-perf
Browse files Browse the repository at this point in the history
fix merged write is not stopped while disallow write and more test case
  • Loading branch information
absolute8511 authored Jul 8, 2019
2 parents 52d0437 + 427b03b commit 1d2dbc6
Show file tree
Hide file tree
Showing 14 changed files with 326 additions and 60 deletions.
57 changes: 33 additions & 24 deletions apps/backup/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ var (
table = flagSet.String("table", "", "table name of backup")
backType = flagSet.String("type", "all", "which type you want to backup,split by ',' for multiple")
qps = flagSet.Int("qps", 1000, "qps")
readable = flagSet.Bool("readable", false, "output can be readable as text")
pass = flagSet.String("pass", "", "password of zankv")
)

Expand Down Expand Up @@ -126,19 +127,23 @@ func backupCommon(tp []byte, ch chan interface{}, file *os.File, f writeFunc) {
lenBuf := make([]byte, 4)
key := item[0].([]byte)
keyLen := len(key)
binary.BigEndian.PutUint32(lenBuf, uint32(keyLen))
n, err := file.Write(lenBuf)
if err != nil {
fmt.Printf("write key's len error.[ns=%s, table=%s, key=%s, err=%v]\n", *ns, *table, string(key), err)
return
}

if n != 4 {
fmt.Printf("write key's len length error. [ns=%s, table=%s, key=%s, len=%d]\n", *ns, *table, string(key), n)
return
}

n, err = file.Write(key)
if *readable {
file.WriteString("\n")
} else {
binary.BigEndian.PutUint32(lenBuf, uint32(keyLen))
n, err := file.Write(lenBuf)
if err != nil {
fmt.Printf("write key's len error.[ns=%s, table=%s, key=%s, err=%v]\n", *ns, *table, string(key), err)
return
}

if n != 4 {
fmt.Printf("write key's len length error. [ns=%s, table=%s, key=%s, len=%d]\n", *ns, *table, string(key), n)
return
}
}

n, err := file.Write(key)
if err != nil {
fmt.Printf("write key error. [ns=%s, table=%s, key=%s, err=%v]\n", *ns, *table, string(key), err)
return
Expand All @@ -165,21 +170,25 @@ func backupCommon(tp []byte, ch chan interface{}, file *os.File, f writeFunc) {
func kvbackup(ch chan interface{}, file *os.File, client *sdk.ZanRedisClient) {
tp := []byte{0}
backupCommon(tp, ch, file, func(key []byte, item []interface{}, file *os.File) error {
lenBuf := make([]byte, 4)
value := item[0].([]byte)
valLen := len(value)
binary.BigEndian.PutUint32(lenBuf, uint32(valLen))
n, err := file.Write(lenBuf)
if err != nil {
fmt.Printf("write val's len error. [ns=%s, table=%s, key=%s, val=%v, err=%v]\n", *ns, *table, string(key), value, err)
return err
}
if n != 4 {
fmt.Printf("write val's len length error. [ns=%s, table=%s, key=%s, val=%v, len=%d]\n", *ns, *table, string(key), value, n)
return errWriteLen
if *readable {
file.WriteString("\n")
} else {
lenBuf := make([]byte, 4)
binary.BigEndian.PutUint32(lenBuf, uint32(valLen))
n, err := file.Write(lenBuf)
if err != nil {
fmt.Printf("write val's len error. [ns=%s, table=%s, key=%s, val=%v, err=%v]\n", *ns, *table, string(key), value, err)
return err
}
if n != 4 {
fmt.Printf("write val's len length error. [ns=%s, table=%s, key=%s, val=%v, len=%d]\n", *ns, *table, string(key), value, n)
return errWriteLen
}
}

n, err = file.Write(value)
n, err := file.Write(value)
if err != nil {
fmt.Printf("write val error. [ns=%s, table=%s, key=%s, val=%v, err=%v]\n", *ns, *table, string(key), value, err)
return err
Expand Down
4 changes: 2 additions & 2 deletions cluster/pdnode_coord/pd_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ var (
)

func ChangeIntervalForTest() {
waitMigrateInterval = time.Second * 3
waitRemoveRemovingNodeInterval = time.Second * 3
waitMigrateInterval = time.Second * 10
waitRemoveRemovingNodeInterval = time.Second * 5
nsCheckInterval = time.Second
balanceCheckInterval = time.Second * 5
}
Expand Down
5 changes: 5 additions & 0 deletions cluster/register_etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -950,6 +950,7 @@ func (etcdReg *PDEtcdRegister) DeleteNamespacePart(ns string, partition int) err
return err
}
}
atomic.StoreInt32(&etcdReg.ifNamespaceChanged, 1)
return nil
}

Expand All @@ -966,13 +967,15 @@ func (etcdReg *PDEtcdRegister) UpdateNamespacePartReplicaInfo(ns string, partiti
return err
}
replicaInfo.epoch = EpochType(rsp.Node.ModifiedIndex)
atomic.StoreInt32(&etcdReg.ifNamespaceChanged, 1)
return nil
}
rsp, err := etcdReg.client.CompareAndSwap(etcdReg.getNamespaceReplicaInfoPath(ns, partition), string(value), 0, "", uint64(oldGen))
if err != nil {
return err
}
replicaInfo.epoch = EpochType(rsp.Node.ModifiedIndex)
atomic.StoreInt32(&etcdReg.ifNamespaceChanged, 1)
return nil
}

Expand All @@ -983,6 +986,7 @@ func (etcdReg *PDEtcdRegister) UpdateNamespaceSchema(ns string, table string, sc
return err
}
schema.Epoch = EpochType(rsp.Node.ModifiedIndex)
atomic.StoreInt32(&etcdReg.ifNamespaceChanged, 1)
return nil
}

Expand All @@ -992,6 +996,7 @@ func (etcdReg *PDEtcdRegister) UpdateNamespaceSchema(ns string, table string, sc
return err
}
schema.Epoch = EpochType(rsp.Node.ModifiedIndex)
atomic.StoreInt32(&etcdReg.ifNamespaceChanged, 1)
return nil
}

Expand Down
10 changes: 9 additions & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1010,7 +1010,15 @@ func (nd *KVNode) applyAll(np *nodeProgress, applyEvent *applyInfo) (bool, bool)
}
firsti := applyEvent.ents[0].Index
if firsti > np.appliedi+1 {
nodeLog.Panicf("first index of committed entry[%d] should <= appliedi[%d] + 1", firsti, np.appliedi)
nodeLog.Errorf("first index of committed entry[%d] should <= appliedi[%d] + 1", firsti, np.appliedi)
go func() {
select {
case <-nd.stopChan:
default:
nd.Stop()
}
}()
return false, false
}
var ents []raftpb.Entry
if np.appliedi+1-firsti < uint64(len(applyEvent.ents)) {
Expand Down
1 change: 1 addition & 0 deletions node/node_cmd_reg.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ func (nd *KVNode) registerHandler() {
nd.router.RegisterMerge("hidx.from", nd.hindexSearchCommand)

nd.router.RegisterMerge("exists", wrapMergeCommandKK(nd.existsCommand))
// make sure the merged write command will be stopped if cluster is not allowed to write
nd.router.RegisterWriteMerge("del", wrapWriteMergeCommandKK(nd, nd.delCommand))
//nd.router.RegisterWriteMerge("mset", nd.msetCommand)
nd.router.RegisterWriteMerge("plset", wrapWriteMergeCommandKVKV(nd, nd.plsetCommand))
Expand Down
12 changes: 6 additions & 6 deletions pdserver/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,8 @@ func getTestClient(t *testing.T, ns string) *zanredisdb.ZanRedisClient {
return c
}

func startTestClusterAndCheck(t *testing.T) (*Server, []dataNodeWrapper, string) {
pd, kvList, tmpDir := startDefaultTestCluster(t, 4)
func startTestClusterAndCheck(t *testing.T, n int) (*Server, []dataNodeWrapper, string) {
pd, kvList, tmpDir := startDefaultTestCluster(t, n)
time.Sleep(time.Second)
pduri := "http://127.0.0.1:" + pdHttpPort
uri := fmt.Sprintf("%s/datanodes", pduri)
Expand Down Expand Up @@ -243,9 +243,9 @@ func startTestClusterAndCheck(t *testing.T) (*Server, []dataNodeWrapper, string)
return pd, kvList, tmpDir
}

func ensureClusterReady(t *testing.T) {
func ensureClusterReady(t *testing.T, n int) {
testOnce.Do(func() {
gpdServer, gkvList, gtmpDir = startTestClusterAndCheck(t)
gpdServer, gkvList, gtmpDir = startTestClusterAndCheck(t, n)
},
)
}
Expand Down Expand Up @@ -379,10 +379,10 @@ func ensureDeleteNamespace(t *testing.T, pduri string, ns string) {
}

func TestClusterInitStart(t *testing.T) {
ensureClusterReady(t)
ensureClusterReady(t, 4)
}
func TestClusterSchemaAddIndex(t *testing.T) {
ensureClusterReady(t)
ensureClusterReady(t, 4)

time.Sleep(time.Second)
ns := "test_schema_ns"
Expand Down
Loading

0 comments on commit 1d2dbc6

Please sign in to comment.