Skip to content

Commit

Permalink
separate network parts from mesh protocol parts
Browse files Browse the repository at this point in the history
Separate the networking logic from the mesh protocol logic by
splitting the old TankaClient into two parts. MeshConn handles
connections to both tatanka nodes and peers. Mesh handles the
client mesh protocol.

Mesh has its own database. That is where things like order info
and bonds will be stored.
  • Loading branch information
buck54321 committed Feb 4, 2025
1 parent 777762a commit 59589d4
Show file tree
Hide file tree
Showing 13 changed files with 1,353 additions and 962 deletions.
12 changes: 12 additions & 0 deletions dex/lexi/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,18 @@ func TestIndex(t *testing.T) {
if i != 74 {
t.Fatal("never reached 74")
}

// Make sure we can iterate the table directly
i = 0
if err := tbl.Iterate(nil, func(it *Iter) error {
i++
return nil
}); err != nil {
t.Fatalf("Error iterating table: %v", err)
}
if i != 50 {
t.Fatal("table didn't have 50")
}
}

func TestDatum(t *testing.T) {
Expand Down
77 changes: 47 additions & 30 deletions dex/lexi/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,14 +108,15 @@ func (idx *Index) UseDefaultIterationOptions(optss ...IterationOption) {
}
}

// Iter is an entry in the Index. The caller can use Iter to access and delete
// data associated with the index entry and it's datum.
// Iter is an entry in the Index or Table. The caller can use Iter to access and
// delete data associated with the entry and it's datum.
type Iter struct {
idx *Index
item *badger.Item
txn *badger.Txn
dbID DBID
d *datum
table *Table
isIndex bool
item *badger.Item
txn *badger.Txn
dbID DBID
d *datum
}

// V gives access to the datum bytes. The byte slice passed to f is only valid
Expand All @@ -138,26 +139,27 @@ func (i *Iter) K() ([]byte, error) {
return item.ValueCopy(nil)
}

// Entry is the actual index entry. These are the bytes returned by the
// generator passed to AddIndex.
// Entry is the actual index entry when iterating an Index. When iterating a
// Table, this method doesn't really have a use, so we'll just return the DBID.
func (i *Iter) Entry(f func(idxB []byte) error) error {
k := i.item.Key()
if len(k) < prefixSize+DBIDSize {
return fmt.Errorf("index entry too small. length = %d", len(k))
if i.isIndex {
if len(k) < prefixSize+DBIDSize {
return fmt.Errorf("index entry too small. length = %d", len(k))
}
return f(k[prefixSize : len(k)-DBIDSize])
}
if len(k) < prefixSize {
return fmt.Errorf("table key too small. length = %d", len(k))
}
return f(k[prefixSize : len(k)-DBIDSize])
return f(k[prefixSize:])
}

func (i *Iter) datum() (_ *datum, err error) {
if i.d != nil {
return i.d, nil
}
k := i.item.Key()
if len(k) < prefixSize+DBIDSize {
return nil, fmt.Errorf("invalid index entry length %d", len(k))
}
dbID := newDBIDFromBytes(k[len(k)-DBIDSize:])
i.d, err = i.idx.table.get(i.txn, dbID)
i.d, err = i.table.get(i.txn, i.dbID)
return i.d, err
}

Expand All @@ -167,7 +169,7 @@ func (i *Iter) Delete() error {
if err != nil {
return err
}
return i.idx.table.deleteDatum(i.txn, i.dbID, d)
return i.table.deleteDatum(i.txn, i.dbID, d)
}

// IndexBucket is any one of a number of common types whose binary encoding is
Expand Down Expand Up @@ -195,38 +197,53 @@ func parseIndexBucket(i IndexBucket) (b []byte, err error) {
// Iterate iterates the index, providing access to the index entry, datum, and
// datum key via the Iter.
func (idx *Index) Iterate(prefixI IndexBucket, f func(*Iter) error, iterOpts ...IterationOption) error {
return idx.iterate(idx.prefix, idx.table, idx.defaultIterationOptions, true, prefixI, f, iterOpts...)
}

// iterate iterates a table or index.
func (db *DB) iterate(keyPfix keyPrefix, table *Table, io iteratorOpts, isIndex bool, prefixI IndexBucket, f func(*Iter) error, iterOpts ...IterationOption) error {
prefix, err := parseIndexBucket(prefixI)
if err != nil {
return err
}
io := idx.defaultIterationOptions
for i := range iterOpts {
iterOpts[i](&io)
}
iterFunc := iteratePrefix
if io.reverse {
iterFunc = reverseIteratePrefix
}
viewUpdate := idx.View
viewUpdate := db.View
if io.update {
viewUpdate = idx.Update
viewUpdate = db.Update
}
var seek []byte
if len(io.seek) > 0 {
seek = prefixedKey(idx.prefix, io.seek)
seek = prefixedKey(keyPfix, io.seek)
}
return viewUpdate(func(txn *badger.Txn) error {
return iterFunc(txn, prefixedKey(idx.prefix, prefix), seek, func(iter *badger.Iterator) error {
return iterFunc(txn, prefixedKey(keyPfix, prefix), seek, func(iter *badger.Iterator) error {
item := iter.Item()
k := item.Key()
if len(k) < prefixSize+DBIDSize {
return fmt.Errorf("invalid index entry length %d", len(k))

var dbID DBID
if isIndex {
if len(k) < prefixSize+DBIDSize {
return fmt.Errorf("invalid index entry length %d", len(k))
}
dbID = newDBIDFromBytes(k[len(k)-DBIDSize:])
} else {
if len(k) != prefixSize+DBIDSize {
return fmt.Errorf("invalid table key length %d", len(k))
}
copy(dbID[:], k)
}
return f(&Iter{
idx: idx,
item: iter.Item(),
txn: txn,
dbID: newDBIDFromBytes(k[len(k)-DBIDSize:]),
isIndex: isIndex,
table: table,
item: iter.Item(),
txn: txn,
dbID: dbID,
})
})
})
Expand Down
41 changes: 41 additions & 0 deletions dex/lexi/json.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// This code is available on the terms of the project LICENSE.md file,
// also available online at https://blueoakcouncil.org/license/1.0.0.

package lexi

import (
"encoding"
"encoding/json"
)

// lexiJSON is just a wrapper for something that is JSON-encodable.
type lexiJSON struct {
thing any
}

type BinaryMarshal interface {
encoding.BinaryMarshaler
encoding.BinaryUnmarshaler
}

// JSON can be used to encode JSON-encodable things.
func JSON(thing any) BinaryMarshal {
return &lexiJSON{thing}
}

// UnJSON can be used in index entry generator functions for some syntactic
// sugar.
func UnJSON(thing any) interface{} {
if lj, is := thing.(*lexiJSON); is {
return lj.thing
}
return struct{}{}
}

func (p *lexiJSON) MarshalBinary() ([]byte, error) {
return json.Marshal(p.thing)
}

func (p *lexiJSON) UnmarshalBinary(b []byte) error {
return json.Unmarshal(b, p.thing)
}
21 changes: 17 additions & 4 deletions dex/lexi/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ import (
// lookup and iteration.
type Table struct {
*DB
name string
prefix keyPrefix
indexes []*Index
defaultSetOptions setOpts
name string
prefix keyPrefix
indexes []*Index
defaultSetOptions setOpts
defaultIterationOptions iteratorOpts
}

// Table constructs a new table in the DB.
Expand Down Expand Up @@ -189,3 +190,15 @@ func (t *Table) deleteDatum(txn *badger.Txn, dbID DBID, d *datum) error {
}
return t.deleteDBID(txn, dbID)
}

// UseDefaultIterationOptions sets default options for Iterate.
func (t *Table) UseDefaultIterationOptions(optss ...IterationOption) {
for i := range optss {
optss[i](&t.defaultIterationOptions)
}
}

// Iterate iterates the table.
func (t *Table) Iterate(prefixI IndexBucket, f func(*Iter) error, iterOpts ...IterationOption) error {
return t.iterate(t.prefix, t, t.defaultIterationOptions, false, prefixI, f, iterOpts...)
}
Loading

0 comments on commit 59589d4

Please sign in to comment.