diff --git a/cache.go b/cache.go new file mode 100644 index 0000000..64c91cc --- /dev/null +++ b/cache.go @@ -0,0 +1,93 @@ +package lm2 + +import ( + "math/rand" + "sync" +) + +type recordCache struct { + cache map[int64]*record + maxKeyRecord *record + size int + preventPurge bool + lock sync.RWMutex +} + +func newCache(size int) *recordCache { + return &recordCache{ + cache: map[int64]*record{}, + maxKeyRecord: nil, + size: size, + } +} + +func (rc *recordCache) findLastLessThan(key string) int64 { + rc.lock.RLock() + defer rc.lock.RUnlock() + + if rc.maxKeyRecord != nil { + if rc.maxKeyRecord.Key < key { + return rc.maxKeyRecord.Offset + } + } + max := "" + maxOffset := int64(0) + + for offset, record := range rc.cache { + if record.Key >= key { + continue + } + if record.Key > max { + max = record.Key + maxOffset = offset + } + } + return maxOffset +} + +func (rc *recordCache) push(rec *record) { + rc.lock.RLock() + + if rc.maxKeyRecord == nil || rc.maxKeyRecord.Key < rec.Key { + rc.lock.RUnlock() + + rc.lock.Lock() + if rc.maxKeyRecord == nil || rc.maxKeyRecord.Key < rec.Key { + rc.maxKeyRecord = rec + } + rc.lock.Unlock() + + return + } + + if len(rc.cache) == rc.size && rand.Float32() >= cacheProb { + rc.lock.RUnlock() + return + } + + rc.lock.RUnlock() + rc.lock.Lock() + + rc.cache[rec.Offset] = rec + if !rc.preventPurge { + rc.purge() + } + + rc.lock.Unlock() +} + +func (rc *recordCache) purge() { + purged := 0 + for len(rc.cache) > rc.size { + deletedKey := int64(0) + for k := range rc.cache { + if k == rc.maxKeyRecord.Offset { + continue + } + deletedKey = k + break + } + delete(rc.cache, deletedKey) + purged++ + } +} diff --git a/lm2.go b/lm2.go index 6f2e062..97b05e9 100644 --- a/lm2.go +++ b/lm2.go @@ -7,7 +7,6 @@ import ( "fmt" "math/rand" "os" - "sort" "sync" "sync/atomic" ) @@ -31,114 +30,6 @@ var ( fileVersion = [8]byte{'l', 'm', '2', '_', '0', '0', '1', '\n'} ) -type recordCache struct { - cache map[int64]*record - maxKeyRecord *record - size int - preventPurge bool - lock sync.RWMutex - updatesSinceSave int - - f *os.File -} - -func newCache(size int) (*recordCache, error) { - return &recordCache{ - cache: map[int64]*record{}, - maxKeyRecord: nil, - size: size, - }, nil -} - -func (rc *recordCache) findLastLessThan(key string) int64 { - rc.lock.RLock() - defer rc.lock.RUnlock() - - if rc.maxKeyRecord != nil { - if rc.maxKeyRecord.Key < key { - return rc.maxKeyRecord.Offset - } - } - max := "" - maxOffset := int64(0) - - for offset, record := range rc.cache { - if record.Key >= key { - continue - } - if record.Key > max { - max = record.Key - maxOffset = offset - } - } - return maxOffset -} - -func (rc *recordCache) push(rec *record) { - rc.lock.RLock() - - if rc.maxKeyRecord == nil || rc.maxKeyRecord.Key < rec.Key { - rc.lock.RUnlock() - - rc.lock.Lock() - if rc.maxKeyRecord == nil || rc.maxKeyRecord.Key < rec.Key { - rc.maxKeyRecord = rec - } - rc.lock.Unlock() - - return - } - - if len(rc.cache) == rc.size && rand.Float32() >= cacheProb { - rc.lock.RUnlock() - return - } - - rc.lock.RUnlock() - rc.lock.Lock() - - rc.cache[rec.Offset] = rec - rc.updatesSinceSave++ - if !rc.preventPurge { - rc.purge() - } - - rc.lock.Unlock() -} - -func (rc *recordCache) save() { - _, err := rc.f.Seek(0, 0) - if err != nil { - return - } - b := bytes.NewBuffer(make([]byte, 0, rc.size)) - for offset := range rc.cache { - binary.Write(b, binary.LittleEndian, offset) - } - rc.f.Write(b.Bytes()) - rc.f.Sync() - rc.updatesSinceSave = 0 -} - -func (rc *recordCache) purge() { - purged := 0 - for len(rc.cache) > rc.size { - deletedKey := int64(0) - for k := range rc.cache { - if k == rc.maxKeyRecord.Offset { - continue - } - deletedKey = k - break - } - delete(rc.cache, deletedKey) - purged++ - } - if rc.updatesSinceSave > 4*rc.size { - rc.save() - } -} - // Collection represents an ordered linked list map. type Collection struct { fileHeader @@ -152,7 +43,8 @@ type Collection struct { // internalState is 0 if OK, 1 if inconsistent. internalState uint32 - metaLock sync.RWMutex + metaLock sync.RWMutex + writeLock sync.Mutex readAt func(b []byte, off int64) (n int, err error) writeAt func(b []byte, off int64) (n int, err error) @@ -247,12 +139,9 @@ func (c *Collection) readRecord(offset int64) (*record, error) { c.cache.lock.RUnlock() recordHeaderBytes := [recordHeaderSize]byte{} - n, err := c.readAt(recordHeaderBytes[:], offset) + _, err := c.readAt(recordHeaderBytes[:], offset) if err != nil { - return nil, err - } - if n != recordHeaderSize { - return nil, errors.New("lm2: partial read") + return nil, fmt.Errorf("lm2: partial read (%s)", err) } header := recordHeader{} @@ -262,12 +151,9 @@ func (c *Collection) readRecord(offset int64) (*record, error) { } keyValBuf := make([]byte, int(header.KeyLen)+int(header.ValLen)) - n, err = c.readAt(keyValBuf, offset+recordHeaderSize) + _, err = c.readAt(keyValBuf, offset+recordHeaderSize) if err != nil { - return nil, err - } - if n != len(keyValBuf) { - return nil, errors.New("lm2: partial read") + return nil, fmt.Errorf("lm2: partial read (%s)", err) } key := string(keyValBuf[:int(header.KeyLen)]) @@ -303,324 +189,6 @@ func (c *Collection) nextRecord(rec *record, level int) (*record, error) { return nextRec, nil } -func writeRecord(rec *record, buf *bytes.Buffer) error { - rec.KeyLen = uint16(len(rec.Key)) - rec.ValLen = uint32(len(rec.Value)) - - err := binary.Write(buf, binary.LittleEndian, rec.recordHeader) - if err != nil { - return err - } - - _, err = buf.WriteString(rec.Key) - if err != nil { - return err - } - _, err = buf.WriteString(rec.Value) - if err != nil { - return err - } - - return nil -} - -func (c *Collection) writeSentinel() (int64, error) { - offset, err := c.f.Seek(0, 2) - if err != nil { - return 0, err - } - sentinel := sentinelRecord{ - Magic: sentinelMagic, - Offset: offset, - } - err = binary.Write(c.f, binary.LittleEndian, sentinel) - if err != nil { - return 0, err - } - return offset + 12, nil -} - -func (c *Collection) findLastLessThanOrEqual(key string, startingOffset int64, level int, equal bool) (int64, error) { - offset := startingOffset - - headOffset := atomic.LoadInt64(&c.Next[level]) - if headOffset == 0 { - // Empty collection. - return 0, nil - } - - var rec *record - var err error - if offset == 0 { - // read the head - rec, err = c.readRecord(headOffset) - if err != nil { - return 0, err - } - if rec.Key > key { // we have a new head - return 0, nil - } - - if level == maxLevels-1 { - cacheResult := c.cache.findLastLessThan(key) - if cacheResult != 0 { - rec, err = c.readRecord(cacheResult) - if err != nil { - return 0, err - } - } - } - - offset = rec.Offset - } else { - rec, err = c.readRecord(offset) - if err != nil { - return 0, err - } - } - - for rec != nil { - rec.lock.RLock() - if (!equal && rec.Key == key) || rec.Key > key { - rec.lock.RUnlock() - break - } - offset = rec.Offset - oldRec := rec - rec, err = c.nextRecord(oldRec, level) - if err != nil { - return 0, err - } - oldRec.lock.RUnlock() - } - - return offset, nil -} - -// Update atomically and durably applies a WriteBatch (a set of updates) to the collection. -// It returns the new version (on success) and an error. -func (c *Collection) Update(wb *WriteBatch) (int64, error) { - if atomic.LoadUint32(&c.internalState) != 0 { - return 0, ErrInternal - } - - c.metaLock.Lock() - defer c.metaLock.Unlock() - - c.dirtyLock.Lock() - c.dirty = map[int64]*record{} - c.dirtyLock.Unlock() - defer func() { - c.dirtyLock.Lock() - c.dirty = nil - c.dirtyLock.Unlock() - }() - - // Clean up WriteBatch. - wb.cleanup() - - // Find and load records that will be modified into the cache. - - mergedSetDeleteKeys := map[string]struct{}{} - for key := range wb.sets { - mergedSetDeleteKeys[key] = struct{}{} - } - keys := []string{} - for key := range mergedSetDeleteKeys { - keys = append(keys, key) - } - - // Sort keys to be inserted or deleted. - sort.Strings(keys) - - walEntry := newWALEntry() - appendBuf := bytes.NewBuffer(nil) - currentOffset, err := c.f.Seek(0, 2) - if err != nil { - atomic.StoreUint32(&c.internalState, 1) - return 0, errors.New("lm2: couldn't get current file offset") - } - - overwrittenRecords := []int64{} - startingOffsets := [maxLevels]int64{} - for _, key := range keys { - value := wb.sets[key] - level := generateLevel() - newRecordOffset := currentOffset + int64(appendBuf.Len()) - rec := &record{ - recordHeader: recordHeader{ - Next: [maxLevels]int64{}, - }, - Offset: newRecordOffset, - Key: key, - Value: value, - } - - for i := maxLevels - 1; i > level; i-- { - offset, err := c.findLastLessThanOrEqual(key, startingOffsets[i], i, true) - if err != nil { - return 0, err - } - if offset > 0 { - startingOffsets[i] = offset - if i > 0 { - startingOffsets[i-1] = offset - } - } - } - - for ; level >= 0; level-- { - offset, err := c.findLastLessThanOrEqual(key, startingOffsets[level], level, true) - if err != nil { - return 0, err - } - if offset == 0 { - // Insert at head - atomic.StoreInt64(&rec.Next[level], c.fileHeader.Next[level]) - atomic.StoreInt64(&c.fileHeader.Next[level], newRecordOffset) - } else { - // Have a previous record - var prevRec *record - if prev := c.getDirty(offset); prev != nil { - prevRec = prev - } else { - prevRec, err = c.readRecord(offset) - if err != nil { - atomic.StoreUint32(&c.internalState, 1) - return 0, err - } - } - atomic.StoreInt64(&rec.Next[level], prevRec.Next[level]) - atomic.StoreInt64(&prevRec.Next[level], newRecordOffset) - c.setDirty(prevRec.Offset, prevRec) - walEntry.Push(newWALRecord(prevRec.Offset, prevRec.recordHeader.bytes())) - - if prevRec.Key == key && prevRec.Deleted == 0 { - overwrittenRecords = append(overwrittenRecords, prevRec.Offset) - } - - if level > 0 { - startingOffsets[level-1] = prevRec.Offset - } - } - - startingOffsets[level] = newRecordOffset - - err = writeRecord(rec, appendBuf) - if err != nil { - atomic.StoreUint32(&c.internalState, 1) - return 0, err - } - c.setDirty(newRecordOffset, rec) - } - } - - n, err := c.f.Write(appendBuf.Bytes()) - if err != nil { - atomic.StoreUint32(&c.internalState, 1) - return 0, errors.New("lm2: appending records failed") - } - if n != appendBuf.Len() { - atomic.StoreUint32(&c.internalState, 1) - return 0, errors.New("lm2: partial write") - } - - // Write sentinel record. - - currentOffset, err = c.writeSentinel() - if err != nil { - atomic.StoreUint32(&c.internalState, 1) - return 0, err - } - - // fsync data file. - err = c.f.Sync() - if err != nil { - atomic.StoreUint32(&c.internalState, 1) - return 0, err - } - - c.dirtyLock.Lock() - for _, dirtyRec := range c.dirty { - walEntry.Push(newWALRecord(dirtyRec.Offset, dirtyRec.recordHeader.bytes())) - } - c.dirtyLock.Unlock() - - for key := range wb.deletes { - offset := int64(0) - for level := maxLevels - 1; level >= 0; level-- { - offset, err = c.findLastLessThanOrEqual(key, offset, level, true) - if err != nil { - atomic.StoreUint32(&c.internalState, 1) - return 0, err - } - } - if offset == 0 { - continue - } - rec, err := c.readRecord(offset) - if err != nil { - atomic.StoreUint32(&c.internalState, 1) - return 0, err - } - if rec.Key != key { - continue - } - rec.lock.Lock() - rec.Deleted = currentOffset - rec.lock.Unlock() - walEntry.Push(newWALRecord(rec.Offset, rec.recordHeader.bytes())) - } - - for _, offset := range overwrittenRecords { - var rec *record - if dirtyRec := c.getDirty(offset); dirtyRec != nil { - rec = dirtyRec - } else { - rec, err = c.readRecord(offset) - if err != nil { - atomic.StoreUint32(&c.internalState, 1) - return 0, err - } - } - rec.lock.Lock() - rec.Deleted = currentOffset - rec.lock.Unlock() - walEntry.Push(newWALRecord(rec.Offset, rec.recordHeader.bytes())) - } - - // ^ record changes should have been serialized + buffered. Write those entries - // out to the WAL. - c.LastCommit = currentOffset - walEntry.Push(newWALRecord(0, c.fileHeader.bytes())) - _, err = c.wal.Append(walEntry) - if err != nil { - atomic.StoreUint32(&c.internalState, 1) - return 0, err - } - - // Update + fsync data file header. - for _, walRec := range walEntry.records { - n, err := c.writeAt(walRec.Data, walRec.Offset) - if err != nil { - atomic.StoreUint32(&c.internalState, 1) - return 0, err - } - if int64(n) != walRec.Size { - atomic.StoreUint32(&c.internalState, 1) - return 0, errors.New("lm2: incomplete data write") - } - } - - err = c.f.Sync() - if err != nil { - atomic.StoreUint32(&c.internalState, 1) - return 0, err - } - - return c.LastCommit, nil -} - // NewCollection creates a new collection with a data file at file. // cacheSize represents the size of the collection cache. func NewCollection(file string, cacheSize int) (*Collection, error) { @@ -639,16 +207,10 @@ func NewCollection(file string, cacheSize int) (*Collection, error) { f.Close() return nil, err } - cache, err := newCache(cacheSize) - if err != nil { - f.Close() - wal.Close() - return nil, err - } c := &Collection{ f: f, wal: wal, - cache: cache, + cache: newCache(cacheSize), readAt: f.ReadAt, writeAt: f.WriteAt, } @@ -674,10 +236,25 @@ func OpenCollection(file string, cacheSize int) (*Collection, error) { f, err := os.OpenFile(file, os.O_RDWR, 0666) if err != nil { if os.IsNotExist(err) { + // Check if there's a compacted version. + if _, err = os.Stat(file + ".compact"); err == nil { + // There is. + err = os.Rename(file+".compact", file) + if err != nil { + return nil, fmt.Errorf("lm2: error recovering compacted data file: %v", err) + } + return OpenCollection(file, cacheSize) + } return nil, ErrDoesNotExist } return nil, fmt.Errorf("lm2: error opening data file: %v", err) } + // Check if there's a compacted version. + if _, err = os.Stat(file + ".compact"); err == nil { + // There is. Remove it and its wal. + os.Remove(file + ".compact") + os.Remove(file + ".compact.wal") + } wal, err := openWAL(file + ".wal") if os.IsNotExist(err) { @@ -687,16 +264,11 @@ func OpenCollection(file string, cacheSize int) (*Collection, error) { f.Close() return nil, fmt.Errorf("lm2: error WAL: %v", err) } - cache, err := newCache(cacheSize) - if err != nil { - f.Close() - wal.Close() - return nil, err - } + c := &Collection{ f: f, wal: wal, - cache: cache, + cache: newCache(cacheSize), readAt: f.ReadAt, writeAt: f.WriteAt, } @@ -718,14 +290,10 @@ func OpenCollection(file string, cacheSize int) (*Collection, error) { } else { // Apply last WAL entry again. for _, walRec := range lastEntry.records { - n, err := c.writeAt(walRec.Data, walRec.Offset) + _, err := c.writeAt(walRec.Data, walRec.Offset) if err != nil { c.Close() - return nil, err - } - if int64(n) != walRec.Size { - c.Close() - return nil, errors.New("lm2: incomplete data write") + return nil, fmt.Errorf("lm2: partial write (%s)", err) } } @@ -769,6 +337,7 @@ func (c *Collection) Close() { // Internal state is OK. Safe to delete WAL. c.wal.Destroy() } + atomic.StoreUint32(&c.internalState, 1) } // Version returns the last committed version. @@ -793,3 +362,62 @@ func (c *Collection) Destroy() error { } return nil } + +// Compact rewrites a collection to clean up deleted records and optimize +// data layout on disk. +// NOTE: The collection is closed after compaction, so you'll have to reopen it. +func (c *Collection) Compact() error { + return c.CompactFunc(func(key, value string) (string, string, bool) { + return key, value, true + }) +} + +// CompactFunc compacts with a custom compaction function. f is called with +// each key-value pair, and it should return the new key and value for that record +// if they should be changed, and whether to keep the record. +// Returning false will skip the record. +// NOTE: The collection is closed after compaction, so you'll have to reopen it. +func (c *Collection) CompactFunc(f func(key, value string) (string, string, bool)) error { + c.writeLock.Lock() + defer c.writeLock.Unlock() + newCollection, err := NewCollection(c.f.Name()+".compact", 10) + if err != nil { + return err + } + cur, err := c.NewCursor() + if err != nil { + return err + } + const batchSize = 1000 + remaining := batchSize + wb := NewWriteBatch() + for cur.Next() { + key, val, keep := f(cur.Key(), cur.Value()) + if !keep { + continue + } + wb.Set(key, val) + remaining-- + + if remaining == 0 { + _, err := newCollection.Update(wb) + if err != nil { + return err + } + remaining = batchSize + wb = NewWriteBatch() + } + } + if remaining < batchSize { + _, err := newCollection.Update(wb) + if err != nil { + return err + } + } + err = c.Destroy() + if err != nil { + return err + } + newCollection.Close() + return os.Rename(newCollection.f.Name(), c.f.Name()) +} diff --git a/lm2_test.go b/lm2_test.go index b9bbde1..a038d2f 100644 --- a/lm2_test.go +++ b/lm2_test.go @@ -793,3 +793,206 @@ func TestLm2Log(t *testing.T) { } t.Logf("%+v", c.Stats()) } + +func TestCompact(t *testing.T) { + expected := [][2]string{ + {"key1", "a"}, + {"key2", "2"}, + {"key3", "c"}, + } + + c, err := NewCollection("/tmp/test_compact.lm2", 100) + if err != nil { + t.Fatal(err) + } + + wb := NewWriteBatch() + wb.Set("key2", "2") + t.Log("Set", "key2", "2") + _, err = c.Update(wb) + if err != nil { + t.Fatal(err) + } + + wb = NewWriteBatch() + wb.Set("key1", "1") + t.Log("Set", "key1", "1") + _, err = c.Update(wb) + if err != nil { + t.Fatal(err) + } + + wb = NewWriteBatch() + wb.Set("key3", "3") + t.Log("Set", "key3", "3") + _, err = c.Update(wb) + if err != nil { + t.Fatal(err) + } + + wb = NewWriteBatch() + wb.Set("key1", "a") + t.Log("Set", "key1", "a") + wb.Set("key3", "c") + t.Log("Set", "key3", "c") + _, err = c.Update(wb) + if err != nil { + t.Fatal(err) + } + + verifyOrder(t, c) + + err = c.Compact() + if err != nil { + t.Fatal(err) + } + + c, err = OpenCollection("/tmp/test_compact.lm2", 100) + defer c.Destroy() + + cur, err := c.NewCursor() + if err != nil { + t.Fatal(err) + } + + i := 0 + for cur.Next() { + if i == len(expected) { + t.Fatal("unexpected key", cur.Key()) + } + if cur.Key() != expected[i][0] || cur.Value() != expected[i][1] { + t.Errorf("expected %v => %v, got %v => %v", + expected[i][0], expected[i][1], cur.Key(), cur.Value()) + } else { + t.Logf("got %v => %v", cur.Key(), cur.Value()) + } + i++ + } + if err = cur.Err(); err != nil { + t.Fatal(err) + } + t.Logf("%+v", c.Stats()) +} + +func TestCopyCompact(t *testing.T) { + c, err := NewCollection("/tmp/test_copycompact.lm2", 100) + if err != nil { + t.Fatal(err) + } + + const N = 1000 + firstWriteStart := time.Now() + for i := 0; i < N; i++ { + key := fmt.Sprintf("%019d-%019d-%019d-%019d-%019d-%019d-%019d-%019d", + rand.Int63(), rand.Int63(), rand.Int63(), rand.Int63(), + rand.Int63(), rand.Int63(), rand.Int63(), rand.Int63()) + val := fmt.Sprint(i) + wb := NewWriteBatch() + wb.Set(key, val) + if _, err := c.Update(wb); err != nil { + t.Fatal(err) + } + } + t.Log("First write pass time:", time.Now().Sub(firstWriteStart)) + verifyOrder(t, c) + + compactStart := time.Now() + err = c.Compact() + if err != nil { + c.Destroy() + t.Fatal(err) + } + t.Log("Compact time:", time.Now().Sub(compactStart)) + + c, err = OpenCollection("/tmp/test_copycompact.lm2", 100) + defer c.Destroy() + + count := verifyOrder(t, c) + if count != N { + t.Error("expected count", N, "got", count) + } +} + +func TestCompactSkipKey(t *testing.T) { + expected := [][2]string{ + {"key1", "a"}, + {"key3", "c"}, + } + + c, err := NewCollection("/tmp/test_compactskip.lm2", 100) + if err != nil { + t.Fatal(err) + } + + wb := NewWriteBatch() + wb.Set("key2", "2") + t.Log("Set", "key2", "2") + _, err = c.Update(wb) + if err != nil { + t.Fatal(err) + } + + wb = NewWriteBatch() + wb.Set("key1", "1") + t.Log("Set", "key1", "1") + _, err = c.Update(wb) + if err != nil { + t.Fatal(err) + } + + wb = NewWriteBatch() + wb.Set("key3", "3") + t.Log("Set", "key3", "3") + _, err = c.Update(wb) + if err != nil { + t.Fatal(err) + } + + wb = NewWriteBatch() + wb.Set("key1", "a") + t.Log("Set", "key1", "a") + wb.Set("key3", "c") + t.Log("Set", "key3", "c") + _, err = c.Update(wb) + if err != nil { + t.Fatal(err) + } + + verifyOrder(t, c) + + err = c.CompactFunc(func(key, val string) (string, string, bool) { + if key == "key2" { + return "", "", false + } + return key, val, true + }) + if err != nil { + t.Fatal(err) + } + + c, err = OpenCollection("/tmp/test_compactskip.lm2", 100) + defer c.Destroy() + + cur, err := c.NewCursor() + if err != nil { + t.Fatal(err) + } + + i := 0 + for cur.Next() { + if i == len(expected) { + t.Fatal("unexpected key", cur.Key()) + } + if cur.Key() != expected[i][0] || cur.Value() != expected[i][1] { + t.Errorf("expected %v => %v, got %v => %v", + expected[i][0], expected[i][1], cur.Key(), cur.Value()) + } else { + t.Logf("got %v => %v", cur.Key(), cur.Value()) + } + i++ + } + if err = cur.Err(); err != nil { + t.Fatal(err) + } + t.Logf("%+v", c.Stats()) +} diff --git a/update.go b/update.go new file mode 100644 index 0000000..736ea16 --- /dev/null +++ b/update.go @@ -0,0 +1,324 @@ +package lm2 + +import ( + "bytes" + "encoding/binary" + "errors" + "fmt" + "io" + "sort" + "sync/atomic" +) + +func writeRecord(rec *record, buf *bytes.Buffer) error { + rec.KeyLen = uint16(len(rec.Key)) + rec.ValLen = uint32(len(rec.Value)) + + err := binary.Write(buf, binary.LittleEndian, rec.recordHeader) + if err != nil { + return err + } + + _, err = buf.WriteString(rec.Key) + if err != nil { + return err + } + _, err = buf.WriteString(rec.Value) + if err != nil { + return err + } + + return nil +} + +func (c *Collection) writeSentinel() (int64, error) { + offset, err := c.f.Seek(0, 2) + if err != nil { + return 0, err + } + sentinel := sentinelRecord{ + Magic: sentinelMagic, + Offset: offset, + } + err = binary.Write(c.f, binary.LittleEndian, sentinel) + if err != nil { + return 0, err + } + return offset + 12, nil +} + +func (c *Collection) findLastLessThanOrEqual(key string, startingOffset int64, level int, equal bool) (int64, error) { + offset := startingOffset + + headOffset := atomic.LoadInt64(&c.Next[level]) + if headOffset == 0 { + // Empty collection. + return 0, nil + } + + var rec *record + var err error + if offset == 0 { + // read the head + rec, err = c.readRecord(headOffset) + if err != nil { + return 0, err + } + if rec.Key > key { // we have a new head + return 0, nil + } + + if level == maxLevels-1 { + cacheResult := c.cache.findLastLessThan(key) + if cacheResult != 0 { + rec, err = c.readRecord(cacheResult) + if err != nil { + return 0, err + } + } + } + + offset = rec.Offset + } else { + rec, err = c.readRecord(offset) + if err != nil { + return 0, err + } + } + + for rec != nil { + rec.lock.RLock() + if (!equal && rec.Key == key) || rec.Key > key { + rec.lock.RUnlock() + break + } + offset = rec.Offset + oldRec := rec + rec, err = c.nextRecord(oldRec, level) + if err != nil { + return 0, err + } + oldRec.lock.RUnlock() + } + + return offset, nil +} + +// Update atomically and durably applies a WriteBatch (a set of updates) to the collection. +// It returns the new version (on success) and an error. +func (c *Collection) Update(wb *WriteBatch) (int64, error) { + c.writeLock.Lock() + defer c.writeLock.Unlock() + + if atomic.LoadUint32(&c.internalState) != 0 { + return 0, ErrInternal + } + + c.metaLock.Lock() + defer c.metaLock.Unlock() + + c.dirtyLock.Lock() + c.dirty = map[int64]*record{} + c.dirtyLock.Unlock() + defer func() { + c.dirtyLock.Lock() + c.dirty = nil + c.dirtyLock.Unlock() + }() + + // Clean up WriteBatch. + wb.cleanup() + + // Find and load records that will be modified into the cache. + + mergedSetDeleteKeys := map[string]struct{}{} + for key := range wb.sets { + mergedSetDeleteKeys[key] = struct{}{} + } + keys := []string{} + for key := range mergedSetDeleteKeys { + keys = append(keys, key) + } + + // Sort keys to be inserted or deleted. + sort.Strings(keys) + + walEntry := newWALEntry() + appendBuf := bytes.NewBuffer(nil) + currentOffset, err := c.f.Seek(0, 2) + if err != nil { + atomic.StoreUint32(&c.internalState, 1) + return 0, errors.New("lm2: couldn't get current file offset") + } + + overwrittenRecords := []int64{} + startingOffsets := [maxLevels]int64{} + for _, key := range keys { + value := wb.sets[key] + level := generateLevel() + newRecordOffset := currentOffset + int64(appendBuf.Len()) + rec := &record{ + recordHeader: recordHeader{ + Next: [maxLevels]int64{}, + }, + Offset: newRecordOffset, + Key: key, + Value: value, + } + + for i := maxLevels - 1; i > level; i-- { + offset, err := c.findLastLessThanOrEqual(key, startingOffsets[i], i, true) + if err != nil { + return 0, err + } + if offset > 0 { + startingOffsets[i] = offset + if i > 0 { + startingOffsets[i-1] = offset + } + } + } + + for ; level >= 0; level-- { + offset, err := c.findLastLessThanOrEqual(key, startingOffsets[level], level, true) + if err != nil { + return 0, err + } + if offset == 0 { + // Insert at head + atomic.StoreInt64(&rec.Next[level], c.fileHeader.Next[level]) + atomic.StoreInt64(&c.fileHeader.Next[level], newRecordOffset) + } else { + // Have a previous record + var prevRec *record + if prev := c.getDirty(offset); prev != nil { + prevRec = prev + } else { + prevRec, err = c.readRecord(offset) + if err != nil { + atomic.StoreUint32(&c.internalState, 1) + return 0, err + } + } + atomic.StoreInt64(&rec.Next[level], prevRec.Next[level]) + atomic.StoreInt64(&prevRec.Next[level], newRecordOffset) + c.setDirty(prevRec.Offset, prevRec) + walEntry.Push(newWALRecord(prevRec.Offset, prevRec.recordHeader.bytes())) + + if prevRec.Key == key && prevRec.Deleted == 0 { + overwrittenRecords = append(overwrittenRecords, prevRec.Offset) + } + + if level > 0 { + startingOffsets[level-1] = prevRec.Offset + } + } + + startingOffsets[level] = newRecordOffset + + err = writeRecord(rec, appendBuf) + if err != nil { + atomic.StoreUint32(&c.internalState, 1) + return 0, err + } + c.setDirty(newRecordOffset, rec) + } + } + + _, err = io.Copy(c.f, appendBuf) + if err != nil { + atomic.StoreUint32(&c.internalState, 1) + return 0, fmt.Errorf("lm2: appending records failed (%s)", err) + } + + // Write sentinel record. + + currentOffset, err = c.writeSentinel() + if err != nil { + atomic.StoreUint32(&c.internalState, 1) + return 0, err + } + + // fsync data file. + err = c.f.Sync() + if err != nil { + atomic.StoreUint32(&c.internalState, 1) + return 0, err + } + + c.dirtyLock.Lock() + for _, dirtyRec := range c.dirty { + walEntry.Push(newWALRecord(dirtyRec.Offset, dirtyRec.recordHeader.bytes())) + } + c.dirtyLock.Unlock() + + for key := range wb.deletes { + offset := int64(0) + for level := maxLevels - 1; level >= 0; level-- { + offset, err = c.findLastLessThanOrEqual(key, offset, level, true) + if err != nil { + atomic.StoreUint32(&c.internalState, 1) + return 0, err + } + } + if offset == 0 { + continue + } + rec, err := c.readRecord(offset) + if err != nil { + atomic.StoreUint32(&c.internalState, 1) + return 0, err + } + if rec.Key != key { + continue + } + rec.lock.Lock() + rec.Deleted = currentOffset + rec.lock.Unlock() + walEntry.Push(newWALRecord(rec.Offset, rec.recordHeader.bytes())) + } + + for _, offset := range overwrittenRecords { + var rec *record + if dirtyRec := c.getDirty(offset); dirtyRec != nil { + rec = dirtyRec + } else { + rec, err = c.readRecord(offset) + if err != nil { + atomic.StoreUint32(&c.internalState, 1) + return 0, err + } + } + rec.lock.Lock() + rec.Deleted = currentOffset + rec.lock.Unlock() + walEntry.Push(newWALRecord(rec.Offset, rec.recordHeader.bytes())) + } + + // ^ record changes should have been serialized + buffered. Write those entries + // out to the WAL. + c.LastCommit = currentOffset + walEntry.Push(newWALRecord(0, c.fileHeader.bytes())) + _, err = c.wal.Append(walEntry) + if err != nil { + atomic.StoreUint32(&c.internalState, 1) + return 0, err + } + + // Update + fsync data file header. + for _, walRec := range walEntry.records { + _, err := c.writeAt(walRec.Data, walRec.Offset) + if err != nil { + atomic.StoreUint32(&c.internalState, 1) + return 0, fmt.Errorf("lm2: partial write (%s)", err) + } + } + + err = c.f.Sync() + if err != nil { + atomic.StoreUint32(&c.internalState, 1) + return 0, err + } + + return c.LastCommit, nil +} diff --git a/wal_test.go b/wal_test.go new file mode 100644 index 0000000..419c89d --- /dev/null +++ b/wal_test.go @@ -0,0 +1,44 @@ +package lm2 + +import ( + "bytes" + "testing" +) + +func TestWAL(t *testing.T) { + wal, err := newWAL("/tmp/test.wal") + if err != nil { + t.Fatal(err) + } + entry := newWALEntry() + record := newWALRecord(4321, []byte("test record")) + entry.Push(record) + if entry.NumRecords != 1 { + t.Errorf("expected entry.NumRecords to be %d, got %d", 1, entry.NumRecords) + } + _, err = wal.Append(entry) + if err != nil { + t.Error(err) + } + wal.Close() + + wal, err = openWAL("/tmp/test.wal") + if err != nil { + t.Fatal(err) + } + defer wal.Destroy() + readEntry, err := wal.readEntry() + if err != nil { + t.Fatal(err) + } + if len(readEntry.records) != 1 { + t.Fatalf("expected %d record, got %d", 1, len(readEntry.records)) + } + rec := readEntry.records[0] + if rec.Offset != 4321 { + t.Errorf("expected offset %d, got %d", 4321, rec.Offset) + } + if !bytes.Equal(rec.Data, []byte("test record")) { + t.Errorf("expected record data %v, got %v", []byte("test record"), rec.Data) + } +}