Skip to content

Commit

Permalink
Only keep 1 WAL entry (#22)
Browse files Browse the repository at this point in the history
* Only keep 1 WAL entry; #21

* cleanup and remove more unused code
  • Loading branch information
Preetam authored Jan 7, 2017
1 parent 95719a7 commit 4ee9e5a
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 98 deletions.
44 changes: 16 additions & 28 deletions lm2.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,8 @@ type Collection struct {
}

type fileHeader struct {
Head int64
LastCommit int64
LastValidLogEntry int64
Head int64
LastCommit int64
}

func (h fileHeader) bytes() []byte {
Expand Down Expand Up @@ -611,13 +610,11 @@ func (c *Collection) Update(wb *WriteBatch) (int64, error) {
// out to the WAL.
c.LastCommit = currentOffset
walEntry.Push(newWALRecord(0, c.fileHeader.bytes()))
logCommit, err := c.wal.Append(walEntry)
_, err = c.wal.Append(walEntry)
if err != nil {
return 0, err
}

c.LastValidLogEntry = logCommit

// Update + fsync data file header.

for _, walRec := range walEntry.records {
Expand Down Expand Up @@ -668,7 +665,7 @@ func NewCollection(file string, cacheSize int) (*Collection, error) {

// write file header
c.fileHeader.Head = 0
c.fileHeader.LastCommit = int64(8 * 3)
c.fileHeader.LastCommit = int64(8 * 2)
c.f.Seek(0, 0)
err = binary.Write(c.f, binary.LittleEndian, c.fileHeader)
if err != nil {
Expand Down Expand Up @@ -710,7 +707,7 @@ func OpenCollection(file string, cacheSize int) (*Collection, error) {
}
c.cache.c = c

// read file header
// Read file header.
c.f.Seek(0, 0)
err = binary.Read(c.f, binary.LittleEndian, &c.fileHeader)
if err != nil {
Expand All @@ -722,26 +719,9 @@ func OpenCollection(file string, cacheSize int) (*Collection, error) {
lastEntry, err := c.wal.ReadLastEntry()
if err != nil {
// Maybe latest WAL write didn't succeed.
// Read the last known good one.
err = c.wal.SetOffset(c.LastValidLogEntry)
if err != nil {
// Nothing else to do. Bail out.
c.Close()
return nil, err
}
lastEntry, err = c.wal.ReadEntry()
if err != nil {
if c.wal.fileSize > 0 {
// Nothing else to do. Bail out.
c.Close()
return nil, err
}
} else {
c.wal.Truncate()
}
}

if lastEntry != nil {
// Truncate.
c.wal.Truncate()
} else {
// Apply last WAL entry again.
for _, walRec := range lastEntry.records {
n, err := c.f.WriteAt(walRec.Data, walRec.Offset)
Expand All @@ -754,6 +734,14 @@ func OpenCollection(file string, cacheSize int) (*Collection, error) {
return nil, errors.New("lm2: incomplete data write")
}
}

// Reread file header because it could have been updated
c.f.Seek(0, 0)
err = binary.Read(c.f, binary.LittleEndian, &c.fileHeader)
if err != nil {
c.Close()
return nil, fmt.Errorf("lm2: error reading file header: %v", err)
}
}

c.f.Truncate(c.LastCommit)
Expand Down
84 changes: 14 additions & 70 deletions wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ const (
)

type wal struct {
f *os.File
fileSize int64
lastGoodOffset int64
f *os.File
}

type walEntryHeader struct {
Expand All @@ -25,8 +23,7 @@ type walEntryHeader struct {
}

type walEntryFooter struct {
Magic uint32
StartOffset int64
Magic uint32
}

type walEntry struct {
Expand All @@ -52,8 +49,7 @@ func newWALEntry() *walEntry {
NumRecords: 0,
},
walEntryFooter: walEntryFooter{
Magic: walFooterMagic,
StartOffset: 0,
Magic: walFooterMagic,
},
}
}
Expand Down Expand Up @@ -86,15 +82,8 @@ func openWAL(filename string) (*wal, error) {
return nil, err
}

stat, err := f.Stat()
if err != nil {
f.Close()
return nil, err
}

return &wal{
f: f,
fileSize: stat.Size(),
f: f,
}, nil
}

Expand All @@ -114,13 +103,6 @@ func newWAL(filename string) (*wal, error) {
}

func (w *wal) Append(entry *walEntry) (int64, error) {
startOffset, err := w.f.Seek(0, 2)
if err != nil {
w.Truncate()
return 0, errors.New("lm2: couldn't get offset")
}
entry.StartOffset = startOffset

buf := bytes.NewBuffer(nil)
for _, rec := range entry.records {
buf.Write(rec.Bytes())
Expand All @@ -129,71 +111,40 @@ func (w *wal) Append(entry *walEntry) (int64, error) {

binary.Write(buf, binary.LittleEndian, entry.walEntryFooter)

err = binary.Write(w.f, binary.LittleEndian, entry.walEntryHeader)
if err != nil {
w.Truncate()
return 0, err
}
headerBuf := bytes.NewBuffer(nil)
binary.Write(headerBuf, binary.LittleEndian, entry.walEntryHeader)
entryBytes := append(headerBuf.Bytes(), buf.Bytes()...)

n, err := w.f.Write(buf.Bytes())
n, err := w.f.WriteAt(entryBytes, 0)
if err != nil {
w.Truncate()
return 0, err
}

if n != buf.Len() {
if n != len(entryBytes) {
w.Truncate()
return 0, errors.New("lm2: incomplete WAL write")
}

currentOffset, err := w.f.Seek(0, 2)
if err != nil {
w.Truncate()
return 0, errors.New("lm2: couldn't get offset")
}

err = w.f.Sync()
if err != nil {
w.Truncate()
return 0, err
}

w.lastGoodOffset = currentOffset
return startOffset, nil
return 0, nil
}

func (w *wal) ReadLastEntry() (*walEntry, error) {
// Read footer
const footerSize = 12
_, err := w.f.Seek(-footerSize, 2)
if err != nil {
return nil, errors.New("lm2: error seeking to WAL footer")
}
footer := walEntryFooter{}
err = binary.Read(w.f, binary.LittleEndian, &footer)
if err != nil {
return nil, errors.New("lm2: error reading WAL footer")
}
if footer.Magic != walFooterMagic {
return nil, errors.New("lm2: invalid WAL footer magic")
}

// Read entry.

_, err = w.f.Seek(footer.StartOffset, 0)
_, err := w.f.Seek(0, 0)
if err != nil {
return nil, errors.New("lm2: error seeking to WAL entry start")
}

return w.ReadEntry()
}

func (w *wal) SetOffset(offset int64) error {
_, err := w.f.Seek(offset, 0)
return err
return w.readEntry()
}

func (w *wal) ReadEntry() (*walEntry, error) {
func (w *wal) readEntry() (*walEntry, error) {
entry := newWALEntry()

err := binary.Read(w.f, binary.LittleEndian, &entry.walEntryHeader)
Expand Down Expand Up @@ -243,18 +194,11 @@ func (w *wal) ReadEntry() (*walEntry, error) {
return nil, errors.New("lm2: invalid WAL footer magic")
}

currentOffset, err := w.f.Seek(0, 2)
if err != nil {
return nil, errors.New("lm2: couldn't get offset")
}

w.lastGoodOffset = currentOffset

return entry, nil
}

func (w *wal) Truncate() error {
return w.f.Truncate(w.lastGoodOffset)
return w.f.Truncate(0)
}

func (w *wal) Close() {
Expand Down

0 comments on commit 4ee9e5a

Please sign in to comment.