Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize delete range #44

Merged
merged 2 commits into from
Apr 24, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions node/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ func (nd *KVNode) scanCommand(cmd redcon.Command) (interface{}, error) {
// here cursor is the scan key for start, (table:key)
// and the response will return the next start key for next scan,
// (note: it is not the "0" as the redis scan to indicate the end of scan)
// advscan will stop if crossing table
func (nd *KVNode) advanceScanCommand(cmd redcon.Command) (interface{}, error) {
if len(cmd.Args) < 3 {
return &common.ScanResult{Keys: nil, NextCursor: nil, PartionId: "", Error: common.ErrInvalidArgs}, common.ErrInvalidArgs
Expand Down
2 changes: 1 addition & 1 deletion rockredis/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ var (
const (
defaultScanCount int = 100
MAX_BATCH_NUM = 5000
RANGE_DELETE_NUM = 100000
RangeDeleteNum = 100
)

var (
Expand Down
1 change: 1 addition & 0 deletions rockredis/fullscan.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ func (db *RockDB) buildFullScanIterator(storeDataType byte, table,
return nil, err
}

dbLog.Debugf("full scan range: %v, %v, %v, %v", minKey, maxKey, string(minKey), string(maxKey))
// minKey = minKey[:0]
it, err := NewDBRangeLimitIterator(db.eng, minKey, maxKey, common.RangeOpen, 0, count+1, false)
if err != nil {
Expand Down
9 changes: 9 additions & 0 deletions rockredis/rockredis.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,16 @@ func (r *RockDB) DeleteTableRange(dt string, table string, start []byte, end []b
return nil
}

func (r *RockDB) GetTablesSizes(tables []string) []int64 {
// try all data types for each table
return nil
}

func (r *RockDB) GetTableSizeInRange(dt string, table string, start []byte, end []byte) int64 {
//rg := getTableRange(dt, table, start, end)
//rgs := make([]gorocksdb.Range, 0, 1)
//rgs = append(rgs, rg)
//s := r.eng.GetApproximateSizes(rgs)
return 0
}

Expand Down
2 changes: 2 additions & 0 deletions rockredis/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ func checkScanCount(count int) int {
return count
}

// note: this scan will not stop while cross table, it will scan begin from key until count or no more in db.
func (db *RockDB) scanGenericUseBuffer(storeDataType byte, key []byte, count int,
match string, inputBuffer [][]byte) ([][]byte, error) {
r, err := buildMatchRegexp(match)
Expand All @@ -166,6 +167,7 @@ func (db *RockDB) scanGenericUseBuffer(storeDataType byte, key []byte, count int
if err != nil {
return nil, err
}
dbLog.Debugf("scan range: %v, %v", minKey, maxKey)
count = checkScanCount(count)

it, err := db.buildScanIterator(minKey, maxKey)
Expand Down
27 changes: 19 additions & 8 deletions rockredis/t_hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,8 +454,12 @@ func (db *RockDB) hDeleteAll(hkey []byte, wb *gorocksdb.WriteBatch, tableIndexes
}
start := hEncodeStartKey(table, rk)
stop := hEncodeStopKey(table, rk)
hlen, err := db.HLen(hkey)
if err != nil {
return err
}

if tableIndexes != nil {
if tableIndexes != nil || hlen <= RangeDeleteNum {
it, err := NewDBRangeIterator(db.eng, start, stop, common.RangeROpen, false)
if err != nil {
return err
Expand All @@ -464,17 +468,24 @@ func (db *RockDB) hDeleteAll(hkey []byte, wb *gorocksdb.WriteBatch, tableIndexes

for ; it.Valid(); it.Next() {
rawk := it.RefKey()
_, _, field, _ := hDecodeHashKey(rawk)
if hindex := tableIndexes.GetHIndexNoLock(string(field)); hindex != nil {
oldV := it.RefValue()
if len(oldV) >= tsLen {
oldV = oldV[:len(oldV)-tsLen]
if hlen <= RangeDeleteNum {
wb.Delete(rawk)
}
if tableIndexes != nil {
_, _, field, _ := hDecodeHashKey(rawk)
if hindex := tableIndexes.GetHIndexNoLock(string(field)); hindex != nil {
oldV := it.RefValue()
if len(oldV) >= tsLen {
oldV = oldV[:len(oldV)-tsLen]
}
hindex.RemoveRec(oldV, hkey, wb)
}
hindex.RemoveRec(oldV, hkey, wb)
}
}
}
wb.DeleteRange(start, stop)
if hlen > RangeDeleteNum {
wb.DeleteRange(start, stop)
}
wb.Delete(sk)
return nil
}
Expand Down
47 changes: 39 additions & 8 deletions rockredis/t_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,11 +410,23 @@ func (db *RockDB) ltrim2(ts int64, key []byte, startP, stopP int64) error {
}

if start > 0 {
wb.DeleteRange(lEncodeListKey(table, rk, headSeq), lEncodeListKey(table, rk, headSeq+start))
if start > RangeDeleteNum {
wb.DeleteRange(lEncodeListKey(table, rk, headSeq), lEncodeListKey(table, rk, headSeq+start))
} else {
for i := int64(0); i < start; i++ {
wb.Delete(lEncodeListKey(table, rk, headSeq+i))
}
}
}
if stop < int64(llen-1) {
wb.DeleteRange(lEncodeListKey(table, rk, headSeq+int64(stop+1)),
lEncodeListKey(table, rk, headSeq+llen))
if llen-stop > RangeDeleteNum {
wb.DeleteRange(lEncodeListKey(table, rk, headSeq+int64(stop+1)),
lEncodeListKey(table, rk, headSeq+llen))
} else {
for i := int64(stop + 1); i < llen; i++ {
wb.Delete(lEncodeListKey(table, rk, headSeq+i))
}
}
}

newLen, err := db.lSetMeta(ek, headSeq+start, headSeq+stop, ts, wb)
Expand Down Expand Up @@ -475,10 +487,17 @@ func (db *RockDB) ltrim(ts int64, key []byte, trimSize, whereSeq int64) (int64,
tailSeq = trimStartSeq - 1
}

itemStartKey := lEncodeListKey(table, rk, trimStartSeq)
itemEndKey := lEncodeListKey(table, rk, trimEndSeq)
wb.DeleteRange(itemStartKey, itemEndKey)
wb.Delete(itemEndKey)
if trimEndSeq-trimStartSeq > RangeDeleteNum {
itemStartKey := lEncodeListKey(table, rk, trimStartSeq)
itemEndKey := lEncodeListKey(table, rk, trimEndSeq)
wb.DeleteRange(itemStartKey, itemEndKey)
wb.Delete(itemEndKey)
} else {
for trimSeq := trimStartSeq; trimSeq <= trimEndSeq; trimSeq++ {
itemKey := lEncodeListKey(table, rk, trimSeq)
wb.Delete(itemKey)
}
}

size, err = db.lSetMeta(metaKey, headSeq, tailSeq, ts, wb)
if err != nil {
Expand Down Expand Up @@ -519,8 +538,20 @@ func (db *RockDB) lDelete(key []byte, wb *gorocksdb.WriteBatch) int64 {
startKey := lEncodeListKey(table, rk, headSeq)
stopKey := lEncodeListKey(table, rk, tailSeq)

wb.DeleteRange(startKey, stopKey)
if size > RangeDeleteNum {
wb.DeleteRange(startKey, stopKey)
} else {
rit, err := NewDBRangeIterator(db.eng, startKey, stopKey, common.RangeClose, false)
if err != nil {
return 0
}
for ; rit.Valid(); rit.Next() {
wb.Delete(rit.RefKey())
}
rit.Close()
}
// delete range is [left, right), so we need delete end

wb.Delete(stopKey)
if size > 0 {
db.IncrTableKeyCount(table, -1, wb)
Expand Down
13 changes: 12 additions & 1 deletion rockredis/t_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,18 @@ func (db *RockDB) sDelete(key []byte, wb *gorocksdb.WriteBatch) int64 {
if err != nil {
return 0
}
wb.DeleteRange(start, stop)
if num > RangeDeleteNum {
wb.DeleteRange(start, stop)
} else {
it, err := NewDBRangeIterator(db.eng, start, stop, common.RangeROpen, false)
if err != nil {
return 0
}
for ; it.Valid(); it.Next() {
wb.Delete(it.RefKey())
}
it.Close()
}
if num > 0 {
db.IncrTableKeyCount(table, -1, wb)
db.delExpire(SetType, key, wb)
Expand Down
38 changes: 22 additions & 16 deletions rockredis/t_zset.go
Original file line number Diff line number Diff line change
Expand Up @@ -678,7 +678,7 @@ func (db *RockDB) zrank(key []byte, member []byte, reverse bool) (int64, error)
return -1, nil
}

func (db *RockDB) zRemAll(key []byte, wb *gorocksdb.WriteBatch) (int64, error) {
func (db *RockDB) zRemAll(ts int64, key []byte, wb *gorocksdb.WriteBatch) (int64, error) {
num, err := db.ZCard(key)
if err != nil {
return 0, err
Expand All @@ -687,19 +687,25 @@ func (db *RockDB) zRemAll(key []byte, wb *gorocksdb.WriteBatch) (int64, error) {
if err != nil {
return 0, err
}
sk := zEncodeSizeKey(key)

minKey := zEncodeStartKey(table, rk)
maxKey := zEncodeStopKey(table, rk)
wb.DeleteRange(minKey, maxKey)

minSetKey := zEncodeStartSetKey(table, rk)
maxSetKey := zEncodeStopSetKey(table, rk)
wb.DeleteRange(minSetKey, maxSetKey)
if num > 0 {
db.IncrTableKeyCount(table, -1, wb)
db.delExpire(ZSetType, key, wb)
if num > RangeDeleteNum {
sk := zEncodeSizeKey(key)
wb.DeleteRange(minKey, maxKey)

minSetKey := zEncodeStartSetKey(table, rk)
maxSetKey := zEncodeStopSetKey(table, rk)
wb.DeleteRange(minSetKey, maxSetKey)
if num > 0 {
db.IncrTableKeyCount(table, -1, wb)
db.delExpire(ZSetType, key, wb)
}
wb.Delete(sk)
} else {
rmCnt, err := db.zRemRangeBytes(ts, key, minKey, maxKey, 0, -1, wb)
return rmCnt, err
}
wb.Delete(sk)
return num, nil
}

Expand All @@ -712,7 +718,7 @@ func (db *RockDB) zRemRangeBytes(ts int64, key []byte, minKey []byte, maxKey []b
if offset == 0 {
total, err := db.ZCard(key)
if err == nil && int64(count) >= total {
return db.zRemAll(key, wb)
return db.zRemAll(ts, key, wb)
}
}
if count >= MAX_BATCH_NUM {
Expand Down Expand Up @@ -884,7 +890,7 @@ func (db *RockDB) zParseLimit(key []byte, start int, stop int) (offset int, coun
func (db *RockDB) ZClear(key []byte) (int64, error) {
db.wb.Clear()

rmCnt, err := db.zRemAll(key, db.wb)
rmCnt, err := db.zRemAll(0, key, db.wb)
if err == nil {
err = db.eng.Write(db.defaultWriteOpts, db.wb)
}
Expand All @@ -900,7 +906,7 @@ func (db *RockDB) ZMclear(keys ...[]byte) (int64, error) {
// note: the zRemAll can not be batched, so we need clear and commit
// after each key.
db.wb.Clear()
if _, err := db.zRemAll(key, db.wb); err != nil {
if _, err := db.zRemAll(0, key, db.wb); err != nil {
return deleted, err
}
err := db.eng.Write(db.defaultWriteOpts, db.wb)
Expand All @@ -918,7 +924,7 @@ func (db *RockDB) zMclearWithBatch(wb *gorocksdb.WriteBatch, keys ...[]byte) err
return errTooMuchBatchSize
}
for _, key := range keys {
if _, err := db.zRemAll(key, wb); err != nil {
if _, err := db.zRemAll(0, key, wb); err != nil {
return err
}
}
Expand Down Expand Up @@ -1089,7 +1095,7 @@ func (db *RockDB) ZRemRangeByLex(ts int64, key []byte, min []byte, max []byte, r
wb := db.wb
wb.Clear()
if min == nil && max == nil {
cnt, err := db.zRemAll(key, wb)
cnt, err := db.zRemAll(ts, key, wb)
if err != nil {
return 0, err
}
Expand Down