From 20ae36f62c28abee5e8f7527f68d487c509bd2ed Mon Sep 17 00:00:00 2001 From: Yusuke Hata Date: Thu, 11 May 2023 16:43:34 +0900 Subject: [PATCH] tidy --- bitcask.go | 2 +- merge.go | 24 ++++++++++++------------ 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/bitcask.go b/bitcask.go index 0ff2927..85e5253 100644 --- a/bitcask.go +++ b/bitcask.go @@ -816,7 +816,7 @@ func Open(path string, funcs ...OptionFunc) (*Bitcask, error) { metadata: meta, repliEmit: repliEmitter, repliRecv: repliReciver, - merger: newMerger(), + merger: newMerger(opt, path), } if err := repliEmitter.Start(bitcask.repliSource(), opt.RepliBindIP, opt.RepliBindPort); err != nil { diff --git a/merge.go b/merge.go index cd5893e..cbecb01 100644 --- a/merge.go +++ b/merge.go @@ -33,6 +33,8 @@ const ( type merger struct { mutex *sync.RWMutex + opt *option + basedir string merging bool } @@ -162,15 +164,14 @@ func (m *merger) snapshotIndexer(b *Bitcask, lim *priorate.Limiter) (*snapshotTr b.mu.RLock() defer b.mu.RUnlock() - st, err := openSnapshotTrie(b.opt.TempDir) + st, err := openSnapshotTrie(m.opt.TempDir) if err != nil { return nil, errors.Wrap(err, "failed open snapshot trie") } var lastErr error b.trie.ForEach(func(node art.Node) bool { - r := lim.ReserveN(priorate.Low, time.Now(), indexer.FilerByteSize) - if r.OK() { + if r := lim.ReserveN(priorate.Low, time.Now(), indexer.FilerByteSize); r.OK() { if d := r.Delay(); 0 < d { time.Sleep(d) } @@ -189,7 +190,7 @@ func (m *merger) snapshotIndexer(b *Bitcask, lim *priorate.Limiter) (*snapshotTr } func (m *merger) renewMergedDB(b *Bitcask, mergeFileIds []datafile.FileID, st *snapshotTrie, lim *priorate.Limiter) (*mergeTempDB, []indexer.MergeFiler, error) { - temp, err := openMergeTempDB(b.path, b.opt) + temp, err := openMergeTempDB(m.opt, m.basedir) if err != nil { return nil, nil, errors.WithStack(err) } @@ -278,9 +279,11 @@ func (m *merger) moveDBFiles(b *Bitcask, fromDBPath string) error { return nil } -func newMerger() *merger { +func newMerger(opt *option, basedir string) *merger { return &merger{ mutex: new(sync.RWMutex), + opt: opt, + basedir: basedir, merging: false, } } @@ -342,8 +345,7 @@ func (t *mergeTempDB) mergeDatafileLocked(st *snapshotTrie, m map[datafile.FileI return nil } - rr := lim.ReserveN(priorate.Low, time.Now(), int(filer.Size)) - if rr.OK() { + if rr := lim.ReserveN(priorate.Low, time.Now(), int(filer.Size)); rr.OK() { if d := rr.Delay(); 0 < d { time.Sleep(d) } @@ -360,8 +362,7 @@ func (t *mergeTempDB) mergeDatafileLocked(st *snapshotTrie, m map[datafile.FileI return nil } - rw := lim.ReserveN(priorate.Low, time.Now(), int(e.TotalSize)) - if rw.OK() { + if rw := lim.ReserveN(priorate.Low, time.Now(), int(e.TotalSize)); rw.OK() { if d := rw.Delay(); 0 < d { time.Sleep(d) } @@ -420,7 +421,7 @@ func finalizeMergeTempDB(t *mergeTempDB) { t.Destroy(nil) } -func openMergeTempDB(basedir string, opt *option) (*mergeTempDB, error) { +func openMergeTempDB(opt *option, basedir string) (*mergeTempDB, error) { tempDir, err := os.MkdirTemp(basedir, mergeDirPattern) if err != nil { return nil, errors.WithStack(err) @@ -551,8 +552,7 @@ func truncate(path string, size int64, lim *priorate.Limiter) { for i := int64(0); i < truncateCount; i += 1 { nextSize := threshold * (truncateCount - i) - r := lim.ReserveN(priorate.Low, time.Now(), int(nextSize)) - if r.OK() { + if r := lim.ReserveN(priorate.Low, time.Now(), int(nextSize)); r.OK() { if d := r.Delay(); 0 < d { time.Sleep(d + defaultSlowTruncateWait) }