Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tatanka/client: separate network parts from mesh protocol parts #3176

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions dex/lexi/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,18 @@ func TestIndex(t *testing.T) {
if i != 74 {
t.Fatal("never reached 74")
}

// Make sure we can iterate the table directly
i = 0
if err := tbl.Iterate(nil, func(it *Iter) error {
i++
return nil
}); err != nil {
t.Fatalf("Error iterating table: %v", err)
}
if i != 50 {
t.Fatal("table didn't have 50")
}
}

func TestDatum(t *testing.T) {
Expand Down
77 changes: 47 additions & 30 deletions dex/lexi/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,14 +108,15 @@ func (idx *Index) UseDefaultIterationOptions(optss ...IterationOption) {
}
}

// Iter is an entry in the Index. The caller can use Iter to access and delete
// data associated with the index entry and it's datum.
// Iter is an entry in the Index or Table. The caller can use Iter to access and
// delete data associated with the entry and it's datum.
type Iter struct {
idx *Index
item *badger.Item
txn *badger.Txn
dbID DBID
d *datum
table *Table
isIndex bool
item *badger.Item
txn *badger.Txn
dbID DBID
d *datum
}

// V gives access to the datum bytes. The byte slice passed to f is only valid
Expand All @@ -138,26 +139,27 @@ func (i *Iter) K() ([]byte, error) {
return item.ValueCopy(nil)
}

// Entry is the actual index entry. These are the bytes returned by the
// generator passed to AddIndex.
// Entry is the actual index entry when iterating an Index. When iterating a
// Table, this method doesn't really have a use, so we'll just return the DBID.
func (i *Iter) Entry(f func(idxB []byte) error) error {
k := i.item.Key()
if len(k) < prefixSize+DBIDSize {
return fmt.Errorf("index entry too small. length = %d", len(k))
if i.isIndex {
if len(k) < prefixSize+DBIDSize {
return fmt.Errorf("index entry too small. length = %d", len(k))
}
return f(k[prefixSize : len(k)-DBIDSize])
}
if len(k) < prefixSize {
return fmt.Errorf("table key too small. length = %d", len(k))
}
return f(k[prefixSize : len(k)-DBIDSize])
return f(k[prefixSize:])
}

func (i *Iter) datum() (_ *datum, err error) {
if i.d != nil {
return i.d, nil
}
k := i.item.Key()
if len(k) < prefixSize+DBIDSize {
return nil, fmt.Errorf("invalid index entry length %d", len(k))
}
dbID := newDBIDFromBytes(k[len(k)-DBIDSize:])
i.d, err = i.idx.table.get(i.txn, dbID)
i.d, err = i.table.get(i.txn, i.dbID)
return i.d, err
}

Expand All @@ -167,7 +169,7 @@ func (i *Iter) Delete() error {
if err != nil {
return err
}
return i.idx.table.deleteDatum(i.txn, i.dbID, d)
return i.table.deleteDatum(i.txn, i.dbID, d)
}

// IndexBucket is any one of a number of common types whose binary encoding is
Expand Down Expand Up @@ -195,38 +197,53 @@ func parseIndexBucket(i IndexBucket) (b []byte, err error) {
// Iterate iterates the index, providing access to the index entry, datum, and
// datum key via the Iter.
func (idx *Index) Iterate(prefixI IndexBucket, f func(*Iter) error, iterOpts ...IterationOption) error {
return idx.iterate(idx.prefix, idx.table, idx.defaultIterationOptions, true, prefixI, f, iterOpts...)
}

// iterate iterates a table or index.
func (db *DB) iterate(keyPfix keyPrefix, table *Table, io iteratorOpts, isIndex bool, prefixI IndexBucket, f func(*Iter) error, iterOpts ...IterationOption) error {
prefix, err := parseIndexBucket(prefixI)
if err != nil {
return err
}
io := idx.defaultIterationOptions
for i := range iterOpts {
iterOpts[i](&io)
}
iterFunc := iteratePrefix
if io.reverse {
iterFunc = reverseIteratePrefix
}
viewUpdate := idx.View
viewUpdate := db.View
if io.update {
viewUpdate = idx.Update
viewUpdate = db.Update
}
var seek []byte
if len(io.seek) > 0 {
seek = prefixedKey(idx.prefix, io.seek)
seek = prefixedKey(keyPfix, io.seek)
}
return viewUpdate(func(txn *badger.Txn) error {
return iterFunc(txn, prefixedKey(idx.prefix, prefix), seek, func(iter *badger.Iterator) error {
return iterFunc(txn, prefixedKey(keyPfix, prefix), seek, func(iter *badger.Iterator) error {
item := iter.Item()
k := item.Key()
if len(k) < prefixSize+DBIDSize {
return fmt.Errorf("invalid index entry length %d", len(k))

var dbID DBID
if isIndex {
if len(k) < prefixSize+DBIDSize {
return fmt.Errorf("invalid index entry length %d", len(k))
}
dbID = newDBIDFromBytes(k[len(k)-DBIDSize:])
} else {
if len(k) != prefixSize+DBIDSize {
return fmt.Errorf("invalid table key length %d", len(k))
}
copy(dbID[:], k)
}
return f(&Iter{
idx: idx,
item: iter.Item(),
txn: txn,
dbID: newDBIDFromBytes(k[len(k)-DBIDSize:]),
isIndex: isIndex,
table: table,
item: iter.Item(),
txn: txn,
dbID: dbID,
})
})
})
Expand Down
41 changes: 41 additions & 0 deletions dex/lexi/json.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// This code is available on the terms of the project LICENSE.md file,
// also available online at https://blueoakcouncil.org/license/1.0.0.

package lexi

import (
"encoding"
"encoding/json"
)

// lexiJSON is just a wrapper for something that is JSON-encodable.
type lexiJSON struct {
thing any
}

type BinaryMarshal interface {
encoding.BinaryMarshaler
encoding.BinaryUnmarshaler
}

// JSON can be used to encode JSON-encodable things.
func JSON(thing any) BinaryMarshal {
return &lexiJSON{thing}
}

// UnJSON can be used in index entry generator functions for some syntactic
// sugar.
func UnJSON(thing any) interface{} {
if lj, is := thing.(*lexiJSON); is {
return lj.thing
}
return struct{}{}
}

func (p *lexiJSON) MarshalBinary() ([]byte, error) {
return json.Marshal(p.thing)
}

func (p *lexiJSON) UnmarshalBinary(b []byte) error {
return json.Unmarshal(b, p.thing)
}
21 changes: 17 additions & 4 deletions dex/lexi/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ import (
// lookup and iteration.
type Table struct {
*DB
name string
prefix keyPrefix
indexes []*Index
defaultSetOptions setOpts
name string
prefix keyPrefix
indexes []*Index
defaultSetOptions setOpts
defaultIterationOptions iteratorOpts
}

// Table constructs a new table in the DB.
Expand Down Expand Up @@ -189,3 +190,15 @@ func (t *Table) deleteDatum(txn *badger.Txn, dbID DBID, d *datum) error {
}
return t.deleteDBID(txn, dbID)
}

// UseDefaultIterationOptions sets default options for Iterate.
func (t *Table) UseDefaultIterationOptions(optss ...IterationOption) {
for i := range optss {
optss[i](&t.defaultIterationOptions)
}
}

// Iterate iterates the table.
func (t *Table) Iterate(prefixI IndexBucket, f func(*Iter) error, iterOpts ...IterationOption) error {
return t.iterate(t.prefix, t, t.defaultIterationOptions, false, prefixI, f, iterOpts...)
}
Loading
Loading