Skip to content

Commit

Permalink
chore: cosmetic fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
mr-karan committed Dec 7, 2022
1 parent 7630fa4 commit e54c7c3
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 79 deletions.
24 changes: 7 additions & 17 deletions TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,6 @@ barrel.Open(dir="/data/barrel")

Create a `barrel.db` file inside `/data/barrel` which is the working data directory.

- timer.Timer -> check if file needs to be rotated (5MB)
- rename current active file
- create new file

- timer.Timer -> merge all these files 30 minutes
- loop over all inactive files
- delete records not required


.Put("hello") -> "world"

.Put("hello) -> "bye"


### Writing

- [x] Encode the header
Expand All @@ -38,12 +24,16 @@ Create a `barrel.db` file inside `/data/barrel` which is the working data direct
### Background

- [ ] Merge old files
- [ ] Hints file
- [ ] GC cleanup of old/expired/deleted keys

- [x] Hints file
- [x] GC cleanup of old/expired/deleted keys
- [ ] Compaction routine
- [ ] Rotate file if size increases
### Starting program

- [ ] Load data from hints file for faster boot time

### Misc

- [ ] Create specific mutex for different things

## Test Cases
6 changes: 2 additions & 4 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func main() {
mux.HandleFunc("get", app.get)
mux.HandleFunc("del", app.delete)

err = redcon.ListenAndServe(addr,
if err := redcon.ListenAndServe(addr,
mux.ServeRESP,
func(conn redcon.Conn) bool {
// use this function to accept or deny the connection.
Expand All @@ -51,9 +51,7 @@ func main() {
func(conn redcon.Conn, err error) {
// this is called when the connection has been closed
},
)
if err != nil {
); err != nil {
lo.Fatal("error starting server: %w", err)
}

}
3 changes: 2 additions & 1 deletion internal/datafile/datafile.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (d *DataFile) Read(pos int, size int) ([]byte, error) {
// Read the file with the given offset.
n, err := d.reader.ReadAt(record, start)
if err != nil {
return nil, fmt.Errorf("error reading data from file: %v", err)
return nil, err
}

// Check if the size of bytes read matches the record size.
Expand Down Expand Up @@ -115,6 +115,7 @@ func (d *DataFile) Close() error {
if err := d.writer.Close(); err != nil {
return err
}

if err := d.reader.Close(); err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/barrel/barrel.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ func Init(opts Opts) (*Barrel, error) {
// opts.MergeInterval = time.Second * 5
// }
// go barrel.MergeFiles(opts.MergeInterval)
// // Spawn a goroutine which checks for the file size of the active file at periodic interval.
// go barrel.ExamineFileSize(time.Minute * 1)
// Spawn a goroutine which checks for the file size of the active file at periodic interval.
go barrel.ExamineFileSize(time.Minute * 1)

return barrel, nil
}
Expand Down
45 changes: 42 additions & 3 deletions pkg/barrel/compact.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
package barrel

import (
"path/filepath"
"time"

"github.com/mr-karan/barreldb/internal/datafile"
)

// ExamineFileSize runs cleanup operations at every configured interval.
// ExamineFileSize checks for file size at a periodic interval.
// It examines the file size of the active db file and marks it as stale
// if the file size exceeds the configured size.
// Additionally it runs a merge operation which compacts the stale files.
// This produces a hints file as well which is used for faster startup time.
func (b *Barrel) ExamineFileSize(evalInterval time.Duration) {
var (
evalTicker = time.NewTicker(evalInterval).C
Expand All @@ -36,6 +35,7 @@ func (b *Barrel) rotateDF() error {
}

// If the file is below the threshold of max size, do no action.
b.lo.Debug("checking if db file has exceeded max_size", "current_size", size, "max_size", b.opts.MaxFileSize)
if size < b.opts.MaxFileSize {
return nil
}
Expand All @@ -61,3 +61,42 @@ func (b *Barrel) rotateDF() error {

return nil
}

// GenerateHints encodes the contents of the in-memory hashtable
// as `gob` and writes the data to a hints file.
func (b *Barrel) GenerateHints() error {
b.Lock()
defer b.Unlock()

path := filepath.Join(b.opts.Dir, HINTS_FILE)
if err := b.keydir.Encode(path); err != nil {
return err
}

return nil
}

// CleanupExpired removes the expired keys.
func (b *Barrel) CleanupExpired() error {
b.Lock()
defer b.Unlock()

// Iterate over all keys and delete all keys which are expired.
for k := range b.keydir {
record, err := b.get(k)
if err != nil {
b.lo.Error("error fetching key", "key", k, "error", err)
continue
}
if record.isExpired() {
b.lo.Debug("deleting key since it's expired", "key", k)
// Delete the key.
if err := b.delete(k); err != nil {
b.lo.Error("error deleting key", "key", k, "error", err)
continue
}
}
}

return nil
}
25 changes: 0 additions & 25 deletions pkg/barrel/expiry.go

This file was deleted.

17 changes: 0 additions & 17 deletions pkg/barrel/hints.go

This file was deleted.

36 changes: 26 additions & 10 deletions pkg/barrel/ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"hash/crc32"
"time"

"github.com/mr-karan/barreldb/internal/datafile"
)

func (b *Barrel) get(k string) (Record, error) {
Expand All @@ -17,22 +19,37 @@ func (b *Barrel) get(k string) (Record, error) {
var (
// Header object for decoding the binary data into it.
header Header
reader *datafile.DataFile
)

// Set the current file ID as the default.
reader = b.df

// Check if the ID is different from the current ID.
if meta.FileID != b.df.ID() {
reader, ok = b.stale[meta.FileID]
if !ok {
return Record{}, fmt.Errorf("error looking up for the db file for the given id: %d", meta.FileID)
}
reader.Open()
defer reader.Close()
}

// Read the file with the given offset.
data, err := b.df.Read(meta.RecordPos, meta.RecordSize)
data, err := reader.Read(meta.RecordPos, meta.RecordSize)
if err != nil {
return Record{}, fmt.Errorf("error reading data from file: %v", err)
}

// Decode the header.
header.decode(data)

// Get the offset position in record to start reading the value from.
valPos := meta.RecordSize - int(header.ValSize)

// Read the value from the record.
val := data[valPos:]
var (
// Get the offset position in record to start reading the value from.
valPos = meta.RecordSize - int(header.ValSize)
// Read the value from the record.
val = data[valPos:]
)

record := Record{
Header: header,
Expand All @@ -52,7 +69,7 @@ func (b *Barrel) put(k string, val []byte, expiry *time.Time) error {
ValSize: uint32(len(val)),
}

// Check for expiry
// Check for expiry.
if expiry != nil {
header.Expiry = uint32(expiry.Unix())
} else {
Expand All @@ -65,11 +82,10 @@ func (b *Barrel) put(k string, val []byte, expiry *time.Time) error {
Value: val,
}

// Create a buffer for writing data to it.
// buf := bytes.NewBuffer([]byte{})

// Get the buffer from the pool for writing data.
buf := b.bufPool.Get().(*bytes.Buffer)
defer b.bufPool.Put(buf)
// Resetting the buffer is important since the length of bytes written should be reset on each `set` operation.
defer buf.Reset()

// Encode header.
Expand Down

0 comments on commit e54c7c3

Please sign in to comment.