Skip to content

Commit

Permalink
format
Browse files Browse the repository at this point in the history
  • Loading branch information
qichengzx committed Feb 27, 2023
1 parent 1f1730b commit c579b4e
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 40 deletions.
55 changes: 32 additions & 23 deletions db_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,18 @@ func (bf *BitFile) populateFilesMap(dir string) (uint32, error) {
return 0, err
}

found := make(map[uint32]struct{})
var maxFid uint32 = 0
var (
found = make(map[uint32]struct{}, len(files))
maxFid uint32 = 0
)

for _, file := range files {
fid, err := getFid(file.Name())
if err != nil {
return 0, err
}
if _, ok := found[fid]; ok {
return 0, errors.New("Duplicate file found.")
return 0, errors.New("duplicate file found")
}
found[fid] = struct{}{}
if maxFid < fid {
Expand All @@ -65,14 +68,14 @@ func (bf *BitFile) write(key, value []byte) (*entry, error) {

offset := bf.offset + uint64(HeaderSize+keySize)

_, err := bf.fp.WriteAt(buf, int64(bf.offset))
n, err := bf.fp.Write(buf)
if err != nil {
return nil, err
}

bf.offset += uint64(entrySize)
bf.offset += uint64(n)

entry := newEntry(bf.fid, valueSize, offset, uint64(ts))
entry := newEntry(bf.fid, keySize, valueSize, offset, uint64(ts))
return entry, nil
}

Expand Down Expand Up @@ -108,7 +111,8 @@ func (bf *BitFile) openFile(dir string) (*os.File, error) {
if err != nil {
return nil, err
}
fp, err := os.OpenFile(file, os.O_CREATE|os.O_RDWR, 0644)

fp, err := os.OpenFile(file, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0644)
if err != nil {
return nil, err
}
Expand All @@ -134,6 +138,10 @@ func (bf *BitFile) newFid() string {
return fmt.Sprintf("%06d", bf.fid)
}

func (bf *BitFile) newEntryFromBuf(offset int64) (*entry, uint32) {
return newEntryFromBuf(bf.fp, bf.fid, offset)
}

func read(fp *os.File, offset int64, size uint32) ([]byte, error) {
buf := make([]byte, size)
if _, err := fp.ReadAt(buf, offset); err != nil {
Expand Down Expand Up @@ -161,7 +169,7 @@ func getFid(name string) (uint32, error) {
fsz := len(name)
fid, err := strconv.ParseUint(name[:fsz-5], 10, 32)
if err != nil {
return 0, errors.New("Unable to parse file id.")
return 0, errors.New("unable to parse file id")
}

return uint32(fid), nil
Expand All @@ -170,7 +178,7 @@ func getFid(name string) (uint32, error) {
func scanOldFiles(dir string) ([]os.DirEntry, error) {
files, err := os.ReadDir(dir)
if err != nil {
return nil, errors.New("Unable to open dir.")
return nil, errors.New("unable to open dir")
}
var entry []os.DirEntry
for _, file := range files {
Expand Down Expand Up @@ -208,29 +216,30 @@ func (bf *BitFiles) add(fid uint32, fp *BitFile) {
}

const (
lockFileName = "bitcask.lock"
mergeFileExt = ".merge"
)

func lock(dir string) (*os.File, error) {
return os.OpenFile(filepath.Join(dir, lockFileName), os.O_EXCL|os.O_CREATE|os.O_RDWR, os.ModePerm)
}

func newEntryFromBuf(fp *os.File, fid uint32, offset int64) (*entry, uint32, uint32) {
func newEntryFromBuf(fp *os.File, fid uint32, offset int64) (*entry, uint32) {
buf, err := read(fp, offset, HeaderSize)
if err != nil {
if err == io.EOF {
return nil, 0, 0
return nil, 0
}
}
ts := binary.BigEndian.Uint32(buf[4:8])
keySize := binary.BigEndian.Uint32(buf[8:12])
valueSize := binary.BigEndian.Uint32(buf[12:HeaderSize])

entrySize := getSize(keySize, valueSize)
keyByte := make([]byte, keySize)
if _, err := fp.ReadAt(keyByte, offset+HeaderSize); err != nil {
return nil, 0
}

entry := newEntry(fid, keySize, valueSize, uint64(offset)+uint64(HeaderSize+keySize), uint64(ts))
entry.key = keyByte

entry := newEntry(fid, valueSize, uint64(offset)+uint64(HeaderSize+keySize), uint64(ts))
return entry, keySize, entrySize
return entry, entrySize
}

func newMergeFileName(dir string, fid uint32) string {
Expand Down Expand Up @@ -284,30 +293,30 @@ func (b *Bitcask) merge() {
mergeOffset int64 = 0
)
for {
entry, keySize, entrySize := newEntryFromBuf(fp, fid, offset)
entry, entrySize := newEntryFromBuf(fp, fid, offset)
if entry == nil {
break
}

readOffset := offset + HeaderSize
offset += int64(entrySize)
keyByte, err := read(fp, readOffset, keySize)
keyByte, err := read(fp, readOffset, entry.keySize)
if err != nil {
continue
}

//check if the key was deleted
e, err := b.index.get(keyByte)
e, _ := b.index.get(keyByte)
if e == nil || entry.valueSize == 0 {
b.index.del(string(keyByte))
continue
}
valByte, err := read(fp, readOffset+int64(keySize), entry.valueSize)
valByte, err := read(fp, readOffset+int64(entry.keySize), entry.valueSize)
if err != nil {
continue
}

buf, _ := encode(keyByte, valByte, keySize, entry.valueSize, uint32(entry.timestamp), entrySize)
buf, _ := encode(keyByte, valByte, entry.keySize, entry.valueSize, uint32(entry.timestamp), entrySize)
_, err = mergeFp.WriteAt(buf, mergeOffset)
mergeOffset += int64(entrySize)
if err != nil {
Expand Down
9 changes: 6 additions & 3 deletions entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,19 @@ import (

type entry struct {
fileID uint32
keySize uint32
valueSize uint32
valueOffset uint64
timestamp uint64
key []byte
}

const HeaderSize = 16

func newEntry(fid, valueSize uint32, valueOffset, timestamp uint64) *entry {
func newEntry(fid, keySize, valueSize uint32, valueOffset, timestamp uint64) *entry {
return &entry{
fileID: fid,
keySize: keySize,
valueSize: valueSize,
valueOffset: valueOffset,
timestamp: timestamp,
Expand All @@ -25,11 +28,11 @@ func newEntry(fid, valueSize uint32, valueOffset, timestamp uint64) *entry {

func encode(key, value []byte, keySize, valueSize, ts, entrySize uint32) ([]byte, error) {
// crc32 | timestamp | keySize | valueSize | key | value
// 4 | 4 | 4 | 4 | 4 | 4
// 4 | 4 | 4 | 4 | 4 | 4
buf := make([]byte, entrySize)
binary.BigEndian.PutUint32(buf[4:8], ts)
binary.BigEndian.PutUint32(buf[8:12], keySize)
binary.BigEndian.PutUint32(buf[12:16], valueSize)
binary.BigEndian.PutUint32(buf[12:HeaderSize], valueSize)
copy(buf[HeaderSize:HeaderSize+keySize], key)
copy(buf[HeaderSize+keySize:HeaderSize+keySize+valueSize], value)

Expand Down
80 changes: 80 additions & 0 deletions hint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package bitcask

import (
"encoding/binary"
"fmt"
"os"
)

func putHint(fp *os.File, key []byte, keySize, valueSize, valueOffset uint32) (int, error) {
buf := encodeHintData(key, keySize, valueSize, valueOffset)

return fp.Write(buf)
}

type HintHeader struct {
ksize uint32
vsize uint32
valueOffset uint32
timestamp uint64
key []byte
}

const (
HintHeaderSize = 16
)

/*
keySize : valueSize : valueOffset : key
4 : 4 : 8 : xxxxx
*/
func encodeHintData(key []byte, keySize, valueSize, valueOffset uint32) []byte {
buf := make([]byte, HintHeaderSize+len(key))
binary.LittleEndian.PutUint32(buf[0:4], keySize)
binary.LittleEndian.PutUint32(buf[4:8], valueSize)
binary.LittleEndian.PutUint32(buf[8:HintHeaderSize], valueOffset)
copy(buf[HintHeaderSize:], []byte(key))

return buf
}

func decodeHintData(fp *os.File, offset int64) (HintHeader, error) {
buf, err := read(fp, offset, HintHeaderSize)
if err != nil {
return HintHeader{}, err
}

keySize := binary.LittleEndian.Uint32(buf[0:4])

kbuf, err := read(fp, offset+HintHeaderSize, keySize)
if err != nil {
return HintHeader{}, err
}

valueSize := binary.LittleEndian.Uint32(buf[4:8])
valueOffset := binary.LittleEndian.Uint32(buf[8:HintHeaderSize])

return HintHeader{
ksize: keySize,
vsize: valueSize,
valueOffset: valueOffset,
key: kbuf,
}, nil
}

func newHintFile(path string, fid uint32) (*os.File, error) {
fname := getHintFileName(path, fid)
return os.OpenFile(fname, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0644)
}

func openHintFile(path string, fid uint32) (*os.File, error) {
fname := getHintFileName(path, fid)
return os.OpenFile(fname, os.O_RDONLY, 0)
}

const hintFileExt = ".hint"

func getHintFileName(path string, fid uint32) string {
return fmt.Sprintf("%s%s%d%s", path, string(os.PathSeparator), fid, hintFileExt)
}
46 changes: 32 additions & 14 deletions index.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,43 +2,61 @@ package bitcask

import (
"errors"
"io"
"os"
"sync"
)

type index struct {
entries map[string]*entry
*sync.RWMutex
entrys map[string]*entry
mu *sync.RWMutex
}

var (
ErrKeyNotFound = errors.New("Key not found")
ErrKeyNotFound = errors.New("key not found")
)

func newIndex() *index {
return &index{
entries: make(map[string]*entry),
RWMutex: &sync.RWMutex{},
entrys: make(map[string]*entry),
mu: &sync.RWMutex{},
}
}

func (i *index) put(key string, entry *entry) {
i.Lock()
defer i.Unlock()
i.entries[key] = entry
i.mu.Lock()
i.entrys[key] = entry
i.mu.Unlock()
}

func (i *index) get(key []byte) (*entry, error) {
i.Lock()
defer i.Unlock()
if entry, ok := i.entries[string(key)]; ok {
i.mu.RLock()
defer i.mu.RUnlock()
if entry, ok := i.entrys[string(key)]; ok {
return entry, nil
}

return nil, ErrKeyNotFound
}

func (i *index) del(key string) {
i.Lock()
defer i.Unlock()
delete(i.entries, key)
i.mu.Lock()
delete(i.entrys, key)
i.mu.Unlock()
}

func (i *index) buildFromHint(fid uint32, hintFp *os.File) {
var offset int64 = 0
for {
header, err := decodeHintData(hintFp, offset)
if err != nil && err == io.EOF {
//TODO
break
}

entry := newEntry(fid, header.ksize, header.vsize, uint64(header.valueOffset), header.timestamp)
i.put(string(header.key), entry)

offset += int64(header.ksize) + int64(HintHeaderSize)
}
}

0 comments on commit c579b4e

Please sign in to comment.