diff --git a/cursor.go b/cursor.go index 85711f6..ae00820 100644 --- a/cursor.go +++ b/cursor.go @@ -1,5 +1,7 @@ package lm2 +import "sync/atomic" + // Cursor represents a snapshot cursor. type Cursor struct { collection *Collection @@ -11,6 +13,10 @@ type Cursor struct { // NewCursor returns a new cursor with a snapshot view of the // current collection state. func (c *Collection) NewCursor() (*Cursor, error) { + if atomic.LoadUint32(&c.internalState) != 0 { + return nil, ErrInternal + } + c.metaLock.RLock() defer c.metaLock.RUnlock() if c.Head == 0 { @@ -62,6 +68,11 @@ func (c *Cursor) Valid() bool { // Next moves the cursor to the next record. It returns true // if it lands on a valid record. func (c *Cursor) Next() bool { + if atomic.LoadUint32(&c.collection.internalState) != 0 { + c.current = nil + return false + } + if !c.Valid() { return false } @@ -120,6 +131,10 @@ func (c *Cursor) Value() string { // Seek positions the cursor at the last key less than // or equal to the provided key. func (c *Cursor) Seek(key string) { + if atomic.LoadUint32(&c.collection.internalState) != 0 { + return + } + var rec *record var err error offset := c.collection.cache.findLastLessThan(key) diff --git a/lm2.go b/lm2.go index ffcd241..6da3718 100644 --- a/lm2.go +++ b/lm2.go @@ -10,6 +10,7 @@ import ( "os" "sort" "sync" + "sync/atomic" ) const sentinelMagic = 0xDEAD10CC @@ -18,6 +19,9 @@ var ( // ErrDoesNotExist is returned when a collection's data file // doesn't exist. ErrDoesNotExist = errors.New("lm2: does not exist") + // ErrInternal is returned when the internal state of the collection + // is invalid. The collection should be closed and reopened. + ErrInternal = errors.New("lm2: internal error") ) // Collection represents an ordered linked list map. @@ -28,6 +32,9 @@ type Collection struct { cache *recordCache stats Stats + // internalState is 0 if OK, 1 if inconsistent. + internalState uint32 + metaLock sync.RWMutex } @@ -412,6 +419,10 @@ func (c *Collection) findLastLessThanOrEqual(key string, startingOffset int64) ( // Update atomically and durably applies a WriteBatch (a set of updates) to the collection. // It returns the new version (on success) and an error. func (c *Collection) Update(wb *WriteBatch) (int64, error) { + if atomic.LoadUint32(&c.internalState) != 0 { + return 0, ErrInternal + } + c.metaLock.Lock() defer c.metaLock.Unlock() @@ -483,6 +494,7 @@ func (c *Collection) Update(wb *WriteBatch) (int64, error) { // NOTE: we shouldn't be reading any more records after this point. // TODO: assert it. + // We'll assume that any errors after this point result in invalid internal state. walEntry := newWALEntry() @@ -492,6 +504,7 @@ func (c *Collection) Update(wb *WriteBatch) (int64, error) { appendBuf := bytes.NewBuffer(nil) currentOffset, err := c.f.Seek(0, 2) if err != nil { + atomic.StoreUint32(&c.internalState, 1) return 0, errors.New("lm2: couldn't get current file offset") } for _, key := range keys { @@ -526,6 +539,7 @@ func (c *Collection) Update(wb *WriteBatch) (int64, error) { newRecordOffset := currentOffset + int64(appendBuf.Len()) err = writeRecord(rec, newRecordOffset, appendBuf) if err != nil { + atomic.StoreUint32(&c.internalState, 1) return 0, err } c.Head = newRecordOffset @@ -535,6 +549,7 @@ func (c *Collection) Update(wb *WriteBatch) (int64, error) { } prevRec, err := c.readRecord(offset) if err != nil { + atomic.StoreUint32(&c.internalState, 1) return 0, err } { @@ -548,6 +563,7 @@ func (c *Collection) Update(wb *WriteBatch) (int64, error) { if maxLessThan != "" { prevRec, err = c.readRecord(newlyInserted[maxLessThan]) if err != nil { + atomic.StoreUint32(&c.internalState, 1) return 0, err } } @@ -562,6 +578,7 @@ func (c *Collection) Update(wb *WriteBatch) (int64, error) { newRecordOffset := currentOffset + int64(appendBuf.Len()) err = writeRecord(rec, newRecordOffset, appendBuf) if err != nil { + atomic.StoreUint32(&c.internalState, 1) return 0, err } newlyInserted[key] = newRecordOffset @@ -576,9 +593,11 @@ func (c *Collection) Update(wb *WriteBatch) (int64, error) { } n, err := c.f.Write(appendBuf.Bytes()) if err != nil { + atomic.StoreUint32(&c.internalState, 1) return 0, errors.New("lm2: appending records failed") } if n != appendBuf.Len() { + atomic.StoreUint32(&c.internalState, 1) return 0, errors.New("lm2: partial write") } @@ -586,12 +605,14 @@ func (c *Collection) Update(wb *WriteBatch) (int64, error) { currentOffset, err = c.writeSentinel() if err != nil { + atomic.StoreUint32(&c.internalState, 1) return 0, err } // fsync data file. err = c.f.Sync() if err != nil { + atomic.StoreUint32(&c.internalState, 1) return 0, err } @@ -605,6 +626,7 @@ func (c *Collection) Update(wb *WriteBatch) (int64, error) { } rec, err := c.readRecord(offset) if err != nil { + atomic.StoreUint32(&c.internalState, 1) return 0, err } if rec.Deleted == 0 { @@ -616,6 +638,7 @@ func (c *Collection) Update(wb *WriteBatch) (int64, error) { for _, offset := range overwrittenRecords { rec, err := c.readRecord(offset) if err != nil { + atomic.StoreUint32(&c.internalState, 1) return 0, err } rec.Deleted = currentOffset @@ -628,6 +651,7 @@ func (c *Collection) Update(wb *WriteBatch) (int64, error) { walEntry.Push(newWALRecord(0, c.fileHeader.bytes())) _, err = c.wal.Append(walEntry) if err != nil { + atomic.StoreUint32(&c.internalState, 1) return 0, err } @@ -636,15 +660,23 @@ func (c *Collection) Update(wb *WriteBatch) (int64, error) { for _, walRec := range walEntry.records { n, err := c.f.WriteAt(walRec.Data, walRec.Offset) if err != nil { + atomic.StoreUint32(&c.internalState, 1) return 0, err } if int64(n) != walRec.Size { + atomic.StoreUint32(&c.internalState, 1) return 0, errors.New("lm2: incomplete data write") } } c.stats.incRecordsWritten(uint64(len(newlyInserted))) - return c.LastCommit, c.f.Sync() + err = c.f.Sync() + if err != nil { + atomic.StoreUint32(&c.internalState, 1) + return 0, err + } + + return c.LastCommit, nil } // NewCollection creates a new collection with a data file at file.