Skip to content

Commit

Permalink
create ErrInternal and check for inconsistent internal state (#26)
Browse files Browse the repository at this point in the history
  • Loading branch information
Preetam authored Mar 5, 2017
1 parent 9d9ca95 commit 8f9b8f1
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 1 deletion.
15 changes: 15 additions & 0 deletions cursor.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package lm2

import "sync/atomic"

// Cursor represents a snapshot cursor.
type Cursor struct {
collection *Collection
Expand All @@ -11,6 +13,10 @@ type Cursor struct {
// NewCursor returns a new cursor with a snapshot view of the
// current collection state.
func (c *Collection) NewCursor() (*Cursor, error) {
if atomic.LoadUint32(&c.internalState) != 0 {
return nil, ErrInternal
}

c.metaLock.RLock()
defer c.metaLock.RUnlock()
if c.Head == 0 {
Expand Down Expand Up @@ -62,6 +68,11 @@ func (c *Cursor) Valid() bool {
// Next moves the cursor to the next record. It returns true
// if it lands on a valid record.
func (c *Cursor) Next() bool {
if atomic.LoadUint32(&c.collection.internalState) != 0 {
c.current = nil
return false
}

if !c.Valid() {
return false
}
Expand Down Expand Up @@ -120,6 +131,10 @@ func (c *Cursor) Value() string {
// Seek positions the cursor at the last key less than
// or equal to the provided key.
func (c *Cursor) Seek(key string) {
if atomic.LoadUint32(&c.collection.internalState) != 0 {
return
}

var rec *record
var err error
offset := c.collection.cache.findLastLessThan(key)
Expand Down
34 changes: 33 additions & 1 deletion lm2.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"os"
"sort"
"sync"
"sync/atomic"
)

const sentinelMagic = 0xDEAD10CC
Expand All @@ -18,6 +19,9 @@ var (
// ErrDoesNotExist is returned when a collection's data file
// doesn't exist.
ErrDoesNotExist = errors.New("lm2: does not exist")
// ErrInternal is returned when the internal state of the collection
// is invalid. The collection should be closed and reopened.
ErrInternal = errors.New("lm2: internal error")
)

// Collection represents an ordered linked list map.
Expand All @@ -28,6 +32,9 @@ type Collection struct {
cache *recordCache
stats Stats

// internalState is 0 if OK, 1 if inconsistent.
internalState uint32

metaLock sync.RWMutex
}

Expand Down Expand Up @@ -412,6 +419,10 @@ func (c *Collection) findLastLessThanOrEqual(key string, startingOffset int64) (
// Update atomically and durably applies a WriteBatch (a set of updates) to the collection.
// It returns the new version (on success) and an error.
func (c *Collection) Update(wb *WriteBatch) (int64, error) {
if atomic.LoadUint32(&c.internalState) != 0 {
return 0, ErrInternal
}

c.metaLock.Lock()
defer c.metaLock.Unlock()

Expand Down Expand Up @@ -483,6 +494,7 @@ func (c *Collection) Update(wb *WriteBatch) (int64, error) {

// NOTE: we shouldn't be reading any more records after this point.
// TODO: assert it.
// We'll assume that any errors after this point result in invalid internal state.

walEntry := newWALEntry()

Expand All @@ -492,6 +504,7 @@ func (c *Collection) Update(wb *WriteBatch) (int64, error) {
appendBuf := bytes.NewBuffer(nil)
currentOffset, err := c.f.Seek(0, 2)
if err != nil {
atomic.StoreUint32(&c.internalState, 1)
return 0, errors.New("lm2: couldn't get current file offset")
}
for _, key := range keys {
Expand Down Expand Up @@ -526,6 +539,7 @@ func (c *Collection) Update(wb *WriteBatch) (int64, error) {
newRecordOffset := currentOffset + int64(appendBuf.Len())
err = writeRecord(rec, newRecordOffset, appendBuf)
if err != nil {
atomic.StoreUint32(&c.internalState, 1)
return 0, err
}
c.Head = newRecordOffset
Expand All @@ -535,6 +549,7 @@ func (c *Collection) Update(wb *WriteBatch) (int64, error) {
}
prevRec, err := c.readRecord(offset)
if err != nil {
atomic.StoreUint32(&c.internalState, 1)
return 0, err
}
{
Expand All @@ -548,6 +563,7 @@ func (c *Collection) Update(wb *WriteBatch) (int64, error) {
if maxLessThan != "" {
prevRec, err = c.readRecord(newlyInserted[maxLessThan])
if err != nil {
atomic.StoreUint32(&c.internalState, 1)
return 0, err
}
}
Expand All @@ -562,6 +578,7 @@ func (c *Collection) Update(wb *WriteBatch) (int64, error) {
newRecordOffset := currentOffset + int64(appendBuf.Len())
err = writeRecord(rec, newRecordOffset, appendBuf)
if err != nil {
atomic.StoreUint32(&c.internalState, 1)
return 0, err
}
newlyInserted[key] = newRecordOffset
Expand All @@ -576,22 +593,26 @@ func (c *Collection) Update(wb *WriteBatch) (int64, error) {
}
n, err := c.f.Write(appendBuf.Bytes())
if err != nil {
atomic.StoreUint32(&c.internalState, 1)
return 0, errors.New("lm2: appending records failed")
}
if n != appendBuf.Len() {
atomic.StoreUint32(&c.internalState, 1)
return 0, errors.New("lm2: partial write")
}

// Write sentinel record.

currentOffset, err = c.writeSentinel()
if err != nil {
atomic.StoreUint32(&c.internalState, 1)
return 0, err
}

// fsync data file.
err = c.f.Sync()
if err != nil {
atomic.StoreUint32(&c.internalState, 1)
return 0, err
}

Expand All @@ -605,6 +626,7 @@ func (c *Collection) Update(wb *WriteBatch) (int64, error) {
}
rec, err := c.readRecord(offset)
if err != nil {
atomic.StoreUint32(&c.internalState, 1)
return 0, err
}
if rec.Deleted == 0 {
Expand All @@ -616,6 +638,7 @@ func (c *Collection) Update(wb *WriteBatch) (int64, error) {
for _, offset := range overwrittenRecords {
rec, err := c.readRecord(offset)
if err != nil {
atomic.StoreUint32(&c.internalState, 1)
return 0, err
}
rec.Deleted = currentOffset
Expand All @@ -628,6 +651,7 @@ func (c *Collection) Update(wb *WriteBatch) (int64, error) {
walEntry.Push(newWALRecord(0, c.fileHeader.bytes()))
_, err = c.wal.Append(walEntry)
if err != nil {
atomic.StoreUint32(&c.internalState, 1)
return 0, err
}

Expand All @@ -636,15 +660,23 @@ func (c *Collection) Update(wb *WriteBatch) (int64, error) {
for _, walRec := range walEntry.records {
n, err := c.f.WriteAt(walRec.Data, walRec.Offset)
if err != nil {
atomic.StoreUint32(&c.internalState, 1)
return 0, err
}
if int64(n) != walRec.Size {
atomic.StoreUint32(&c.internalState, 1)
return 0, errors.New("lm2: incomplete data write")
}
}

c.stats.incRecordsWritten(uint64(len(newlyInserted)))
return c.LastCommit, c.f.Sync()
err = c.f.Sync()
if err != nil {
atomic.StoreUint32(&c.internalState, 1)
return 0, err
}

return c.LastCommit, nil
}

// NewCollection creates a new collection with a data file at file.
Expand Down

0 comments on commit 8f9b8f1

Please sign in to comment.