diff --git a/dex/lexi/db_test.go b/dex/lexi/db_test.go index d78c132707..e254d97bea 100644 --- a/dex/lexi/db_test.go +++ b/dex/lexi/db_test.go @@ -222,6 +222,18 @@ func TestIndex(t *testing.T) { if i != 74 { t.Fatal("never reached 74") } + + // Make sure we can iterate the table directly + i = 0 + if err := tbl.Iterate(nil, func(it *Iter) error { + i++ + return nil + }); err != nil { + t.Fatalf("Error iterating table: %v", err) + } + if i != 50 { + t.Fatal("table didn't have 50") + } } func TestDatum(t *testing.T) { diff --git a/dex/lexi/index.go b/dex/lexi/index.go index 13dc56f9b6..141b781241 100644 --- a/dex/lexi/index.go +++ b/dex/lexi/index.go @@ -108,14 +108,15 @@ func (idx *Index) UseDefaultIterationOptions(optss ...IterationOption) { } } -// Iter is an entry in the Index. The caller can use Iter to access and delete -// data associated with the index entry and it's datum. +// Iter is an entry in the Index or Table. The caller can use Iter to access and +// delete data associated with the entry and it's datum. type Iter struct { - idx *Index - item *badger.Item - txn *badger.Txn - dbID DBID - d *datum + table *Table + isIndex bool + item *badger.Item + txn *badger.Txn + dbID DBID + d *datum } // V gives access to the datum bytes. The byte slice passed to f is only valid @@ -138,26 +139,27 @@ func (i *Iter) K() ([]byte, error) { return item.ValueCopy(nil) } -// Entry is the actual index entry. These are the bytes returned by the -// generator passed to AddIndex. +// Entry is the actual index entry when iterating an Index. When iterating a +// Table, this method doesn't really have a use, so we'll just return the DBID. func (i *Iter) Entry(f func(idxB []byte) error) error { k := i.item.Key() - if len(k) < prefixSize+DBIDSize { - return fmt.Errorf("index entry too small. length = %d", len(k)) + if i.isIndex { + if len(k) < prefixSize+DBIDSize { + return fmt.Errorf("index entry too small. length = %d", len(k)) + } + return f(k[prefixSize : len(k)-DBIDSize]) + } + if len(k) < prefixSize { + return fmt.Errorf("table key too small. length = %d", len(k)) } - return f(k[prefixSize : len(k)-DBIDSize]) + return f(k[prefixSize:]) } func (i *Iter) datum() (_ *datum, err error) { if i.d != nil { return i.d, nil } - k := i.item.Key() - if len(k) < prefixSize+DBIDSize { - return nil, fmt.Errorf("invalid index entry length %d", len(k)) - } - dbID := newDBIDFromBytes(k[len(k)-DBIDSize:]) - i.d, err = i.idx.table.get(i.txn, dbID) + i.d, err = i.table.get(i.txn, i.dbID) return i.d, err } @@ -167,7 +169,7 @@ func (i *Iter) Delete() error { if err != nil { return err } - return i.idx.table.deleteDatum(i.txn, i.dbID, d) + return i.table.deleteDatum(i.txn, i.dbID, d) } // IndexBucket is any one of a number of common types whose binary encoding is @@ -195,11 +197,15 @@ func parseIndexBucket(i IndexBucket) (b []byte, err error) { // Iterate iterates the index, providing access to the index entry, datum, and // datum key via the Iter. func (idx *Index) Iterate(prefixI IndexBucket, f func(*Iter) error, iterOpts ...IterationOption) error { + return idx.iterate(idx.prefix, idx.table, idx.defaultIterationOptions, true, prefixI, f, iterOpts...) +} + +// iterate iterates a table or index. +func (db *DB) iterate(keyPfix keyPrefix, table *Table, io iteratorOpts, isIndex bool, prefixI IndexBucket, f func(*Iter) error, iterOpts ...IterationOption) error { prefix, err := parseIndexBucket(prefixI) if err != nil { return err } - io := idx.defaultIterationOptions for i := range iterOpts { iterOpts[i](&io) } @@ -207,26 +213,37 @@ func (idx *Index) Iterate(prefixI IndexBucket, f func(*Iter) error, iterOpts ... if io.reverse { iterFunc = reverseIteratePrefix } - viewUpdate := idx.View + viewUpdate := db.View if io.update { - viewUpdate = idx.Update + viewUpdate = db.Update } var seek []byte if len(io.seek) > 0 { - seek = prefixedKey(idx.prefix, io.seek) + seek = prefixedKey(keyPfix, io.seek) } return viewUpdate(func(txn *badger.Txn) error { - return iterFunc(txn, prefixedKey(idx.prefix, prefix), seek, func(iter *badger.Iterator) error { + return iterFunc(txn, prefixedKey(keyPfix, prefix), seek, func(iter *badger.Iterator) error { item := iter.Item() k := item.Key() - if len(k) < prefixSize+DBIDSize { - return fmt.Errorf("invalid index entry length %d", len(k)) + + var dbID DBID + if isIndex { + if len(k) < prefixSize+DBIDSize { + return fmt.Errorf("invalid index entry length %d", len(k)) + } + dbID = newDBIDFromBytes(k[len(k)-DBIDSize:]) + } else { + if len(k) != prefixSize+DBIDSize { + return fmt.Errorf("invalid table key length %d", len(k)) + } + copy(dbID[:], k) } return f(&Iter{ - idx: idx, - item: iter.Item(), - txn: txn, - dbID: newDBIDFromBytes(k[len(k)-DBIDSize:]), + isIndex: isIndex, + table: table, + item: iter.Item(), + txn: txn, + dbID: dbID, }) }) }) diff --git a/dex/lexi/json.go b/dex/lexi/json.go new file mode 100644 index 0000000000..ea20245d6e --- /dev/null +++ b/dex/lexi/json.go @@ -0,0 +1,41 @@ +// This code is available on the terms of the project LICENSE.md file, +// also available online at https://blueoakcouncil.org/license/1.0.0. + +package lexi + +import ( + "encoding" + "encoding/json" +) + +// lexiJSON is just a wrapper for something that is JSON-encodable. +type lexiJSON struct { + thing any +} + +type BinaryMarshal interface { + encoding.BinaryMarshaler + encoding.BinaryUnmarshaler +} + +// JSON can be used to encode JSON-encodable things. +func JSON(thing any) BinaryMarshal { + return &lexiJSON{thing} +} + +// UnJSON can be used in index entry generator functions for some syntactic +// sugar. +func UnJSON(thing any) interface{} { + if lj, is := thing.(*lexiJSON); is { + return lj.thing + } + return struct{}{} +} + +func (p *lexiJSON) MarshalBinary() ([]byte, error) { + return json.Marshal(p.thing) +} + +func (p *lexiJSON) UnmarshalBinary(b []byte) error { + return json.Unmarshal(b, p.thing) +} diff --git a/dex/lexi/table.go b/dex/lexi/table.go index 1a11c216ed..3afe37b4da 100644 --- a/dex/lexi/table.go +++ b/dex/lexi/table.go @@ -16,10 +16,11 @@ import ( // lookup and iteration. type Table struct { *DB - name string - prefix keyPrefix - indexes []*Index - defaultSetOptions setOpts + name string + prefix keyPrefix + indexes []*Index + defaultSetOptions setOpts + defaultIterationOptions iteratorOpts } // Table constructs a new table in the DB. @@ -189,3 +190,15 @@ func (t *Table) deleteDatum(txn *badger.Txn, dbID DBID, d *datum) error { } return t.deleteDBID(txn, dbID) } + +// UseDefaultIterationOptions sets default options for Iterate. +func (t *Table) UseDefaultIterationOptions(optss ...IterationOption) { + for i := range optss { + optss[i](&t.defaultIterationOptions) + } +} + +// Iterate iterates the table. +func (t *Table) Iterate(prefixI IndexBucket, f func(*Iter) error, iterOpts ...IterationOption) error { + return t.iterate(t.prefix, t, t.defaultIterationOptions, false, prefixI, f, iterOpts...) +} diff --git a/tatanka/client/client.go b/tatanka/client/client.go deleted file mode 100644 index cb512c7c88..0000000000 --- a/tatanka/client/client.go +++ /dev/null @@ -1,885 +0,0 @@ -// This code is available on the terms of the project LICENSE.md file, -// also available online at https://blueoakcouncil.org/license/1.0.0. - -package client - -import ( - "context" - "crypto/rand" - "crypto/rsa" - "encoding/binary" - "encoding/json" - "errors" - "fmt" - "math/big" - "strings" - "sync" - "sync/atomic" - "time" - - "decred.org/dcrdex/dex" - "decred.org/dcrdex/dex/fiatrates" - "decred.org/dcrdex/dex/msgjson" - "decred.org/dcrdex/tatanka/mj" - "decred.org/dcrdex/tatanka/tanka" - tcpclient "decred.org/dcrdex/tatanka/tcp/client" - "github.com/decred/dcrd/crypto/blake256" - "github.com/decred/dcrd/dcrec/secp256k1/v4" -) - -const rsaPrivateKeyLength = 2048 - -// NetworkBackend represents a peer's communication protocol. -type NetworkBackend interface { - Send(*msgjson.Message) error - Request(msg *msgjson.Message, respHandler func(*msgjson.Message)) error -} - -// tatanka is a Tatanka mesh server node. -type tatanka struct { - NetworkBackend - cm *dex.ConnectionMaster - peerID tanka.PeerID - pub *secp256k1.PublicKey - config atomic.Value // *mj.TatankaConfig -} - -type order struct { - *tanka.Order - oid tanka.ID32 - proposed map[tanka.ID32]*tanka.Match - accepted map[tanka.ID32]*tanka.Match -} - -type market struct { - log dex.Logger - - ordsMtx sync.RWMutex - ords map[tanka.ID32]*order -} - -func (m *market) addOrder(ord *tanka.Order) { - m.ordsMtx.Lock() - defer m.ordsMtx.Unlock() - oid := ord.ID() - if _, exists := m.ords[oid]; exists { - // ignore it then - return - } - m.ords[oid] = &order{ - Order: ord, - oid: oid, - proposed: make(map[tanka.ID32]*tanka.Match), - accepted: make(map[tanka.ID32]*tanka.Match), - } -} - -func (m *market) addMatchProposal(match *tanka.Match) { - m.ordsMtx.Lock() - defer m.ordsMtx.Unlock() - ord, found := m.ords[match.OrderID] - if !found { - m.log.Debugf("ignoring match proposal for unknown order %s", match.OrderID) - } - // Make sure it's not already known or accepted - mid := match.ID() - if ord.proposed[mid] != nil { - // Already known - return - } - if ord.accepted[mid] != nil { - // Already accepted - return - } - ord.proposed[mid] = match -} - -func (m *market) addMatchAcceptance(match *tanka.Match) { - m.ordsMtx.Lock() - defer m.ordsMtx.Unlock() - ord, found := m.ords[match.OrderID] - if !found { - m.log.Debugf("ignoring match proposal for unknown order %s", match.OrderID) - } - // Make sure it's not already known or accepted - mid := match.ID() - if ord.proposed[mid] != nil { - delete(ord.proposed, mid) - } - if ord.accepted[mid] != nil { - // Already accepted - return - } - ord.accepted[mid] = match -} - -// peer is a network peer with which we have established encrypted -// communication. -type peer struct { - id tanka.PeerID - pub *secp256k1.PublicKey - decryptionKey *rsa.PrivateKey // ours - encryptionKey *rsa.PublicKey -} - -// wireKey encrypts our RSA public key for transmission. -func (p *peer) wireKey() []byte { - modulusB := p.decryptionKey.PublicKey.N.Bytes() - encryptionKey := make([]byte, 8+len(modulusB)) - binary.BigEndian.PutUint64(encryptionKey[:8], uint64(p.decryptionKey.PublicKey.E)) - copy(encryptionKey[8:], modulusB) - return encryptionKey -} - -// https://stackoverflow.com/a/67035019 -func (p *peer) decryptRSA(enc []byte) ([]byte, error) { - msgLen := len(enc) - hasher := blake256.New() - step := p.decryptionKey.PublicKey.Size() - var b []byte - for start := 0; start < msgLen; start += step { - finish := start + step - if finish > msgLen { - finish = msgLen - } - block, err := rsa.DecryptOAEP(hasher, rand.Reader, p.decryptionKey, enc[start:finish], []byte{}) - if err != nil { - return nil, err - } - b = append(b, block...) - } - return b, nil -} - -func (p *peer) encryptRSA(b []byte) ([]byte, error) { - msgLen := len(b) - hasher := blake256.New() - step := p.encryptionKey.Size() - 2*hasher.Size() - 2 - var enc []byte - for start := 0; start < msgLen; start += step { - finish := start + step - if finish > msgLen { - finish = msgLen - } - block, err := rsa.EncryptOAEP(hasher, rand.Reader, p.encryptionKey, b[start:finish], []byte{}) - if err != nil { - return nil, err - } - enc = append(enc, block...) - } - return enc, nil -} - -// IncomingPeerConnect will be emitted when a peer requests a connection for -// the transmission of tankagrams. -type IncomingPeerConnect struct { - PeerID tanka.PeerID - Reject func() -} - -// IncomingTankagram will be emitted when we receive a tankagram from a -// connected peer. -type IncomingTankagram struct { - Msg *msgjson.Message - Respond func(thing interface{}) error -} - -// NewMarketSubscriber will be emitted when a new client subscribes to a market. -type NewMarketSubscriber struct { - MarketName string - PeerID tanka.PeerID -} - -// Config is the configuration for the TankaClient. -type Config struct { - Logger dex.Logger - PrivateKey *secp256k1.PrivateKey -} - -type TankaClient struct { - wg *sync.WaitGroup - peerID tanka.PeerID - priv *secp256k1.PrivateKey - log dex.Logger - requestHandlers map[string]func(*tatanka, *msgjson.Message) *msgjson.Error - notificationHandlers map[string]func(*tatanka, *msgjson.Message) - payloads chan interface{} - - tankaMtx sync.RWMutex - tankas map[tanka.PeerID]*tatanka - - peersMtx sync.RWMutex - peers map[tanka.PeerID]*peer - - marketsMtx sync.RWMutex - markets map[string]*market - - fiatRatesMtx sync.RWMutex - fiatRates map[string]*fiatrates.FiatRateInfo -} - -func New(cfg *Config) *TankaClient { - var peerID tanka.PeerID - copy(peerID[:], cfg.PrivateKey.PubKey().SerializeCompressed()) - - c := &TankaClient{ - log: cfg.Logger, - peerID: peerID, - priv: cfg.PrivateKey, - tankas: make(map[tanka.PeerID]*tatanka), - peers: make(map[tanka.PeerID]*peer), - markets: make(map[string]*market), - payloads: make(chan interface{}, 128), - fiatRates: make(map[string]*fiatrates.FiatRateInfo), - } - c.prepareHandlers() - return c -} - -func (c *TankaClient) ID() tanka.PeerID { - return c.peerID -} - -func (c *TankaClient) prepareHandlers() { - c.requestHandlers = map[string]func(*tatanka, *msgjson.Message) *msgjson.Error{ - mj.RouteTankagram: c.handleTankagram, - } - - c.notificationHandlers = map[string]func(*tatanka, *msgjson.Message){ - mj.RouteBroadcast: c.handleBroadcast, - mj.RouteRates: c.handleRates, - } -} - -func (c *TankaClient) Next() <-chan interface{} { - return c.payloads -} - -func (c *TankaClient) handleTankagram(tt *tatanka, tankagram *msgjson.Message) *msgjson.Error { - var gram mj.Tankagram - if err := tankagram.Unmarshal(&gram); err != nil { - return msgjson.NewError(mj.ErrBadRequest, "unmarshal error") - } - - c.peersMtx.Lock() - defer c.peersMtx.Unlock() - p, peerExisted := c.peers[gram.From] - if !peerExisted { - // TODO: We should do a little message verification before accepting - // new peers. - if gram.Message == nil || gram.Message.Route != mj.RouteEncryptionKey { - return msgjson.NewError(mj.ErrBadRequest, "where's your key?") - } - pub, err := secp256k1.ParsePubKey(gram.From[:]) - if err != nil { - c.log.Errorf("could not parse pubkey for tankagram from %s: %w", gram.From, err) - return msgjson.NewError(mj.ErrBadRequest, "bad pubkey") - } - priv, err := rsa.GenerateKey(rand.Reader, rsaPrivateKeyLength) - if err != nil { - return msgjson.NewError(mj.ErrInternal, "error generating rsa key: %v", err) - } - p = &peer{ - id: gram.From, - pub: pub, - decryptionKey: priv, - } - - msg := gram.Message - if err := mj.CheckSig(msg, p.pub); err != nil { - c.log.Errorf("%s sent a unencrypted message with a bad signature: %w", p.id, err) - return msgjson.NewError(mj.ErrBadRequest, "bad gram sig") - } - - var b dex.Bytes - if err := msg.Unmarshal(&b); err != nil { - c.log.Errorf("%s tankagram unmarshal error: %w", err) - return msgjson.NewError(mj.ErrBadRequest, "unmarshal key error") - } - - p.encryptionKey, err = decodePubkeyRSA(b) - if err != nil { - c.log.Errorf("error decoding RSA pub key from %s: %v", p.id, err) - return msgjson.NewError(mj.ErrBadRequest, "bad key encoding") - } - - if err := c.sendResult(tt, tankagram.ID, dex.Bytes(p.wireKey())); err != nil { - c.log.Errorf("error responding to encryption key message from peer %s", p.id) - } else { - c.peers[p.id] = p - c.emit(&IncomingPeerConnect{ - PeerID: p.id, - Reject: func() { - c.peersMtx.Lock() - delete(c.peers, p.id) - c.peersMtx.Unlock() - }, - }) - } - return nil - } - - // If this isn't the encryption key, this gram.Message is ignored and this - // is assumed to be encrypted. - if len(gram.EncryptedMsg) == 0 { - c.log.Errorf("%s sent a tankagram with no message or data", p.id) - return msgjson.NewError(mj.ErrBadRequest, "bad gram") - } - - b, err := p.decryptRSA(gram.EncryptedMsg) - if err != nil { - c.log.Errorf("%s sent an enrypted message that didn't decrypt: %v", p.id, err) - return msgjson.NewError(mj.ErrBadRequest, "bad encryption") - } - msg, err := msgjson.DecodeMessage(b) - if err != nil { - c.log.Errorf("%s sent a tankagram that didn't encode a message: %v", p.id, err) - return msgjson.NewError(mj.ErrBadRequest, "where's the message?") - } - - c.emit(&IncomingTankagram{ - Msg: msg, - Respond: func(thing interface{}) error { - b, err := json.Marshal(thing) - if err != nil { - return err - } - enc, err := p.encryptRSA(b) - if err != nil { - return err - } - return c.sendResult(tt, tankagram.ID, dex.Bytes(enc)) - }, - }) - - return nil -} - -func (c *TankaClient) handleBroadcast(tt *tatanka, msg *msgjson.Message) { - var bcast mj.Broadcast - if err := msg.Unmarshal(&bcast); err != nil { - c.log.Errorf("%s broadcast unmarshal error: %w", err) - return - } - switch bcast.Topic { - case mj.TopicMarket: - c.handleMarketBroadcast(tt, &bcast) - } - c.emit(bcast) -} - -func (c *TankaClient) handleRates(tt *tatanka, msg *msgjson.Message) { - var rm mj.RateMessage - if err := msg.Unmarshal(&rm); err != nil { - c.log.Errorf("%s rate message unmarshal error: %w", err) - return - } - switch rm.Topic { - case mj.TopicFiatRate: - c.fiatRatesMtx.Lock() - for ticker, rateInfo := range rm.Rates { - c.fiatRates[strings.ToLower(ticker)] = &fiatrates.FiatRateInfo{ - Value: rateInfo.Value, - LastUpdate: time.Now(), - } - } - c.fiatRatesMtx.Unlock() - } - c.emit(rm) -} - -func (c *TankaClient) SubscribeToFiatRates() error { - msg := mj.MustRequest(mj.RouteSubscribe, &mj.Subscription{ - Topic: mj.TopicFiatRate, - }) - - var nSuccessful int - for _, tt := range c.tankaNodes() { - var ok bool // true is only possible non-error payload. - if err := c.request(tt, msg, &ok); err != nil { - c.log.Errorf("Error subscribing to fiat rates with %s: %v", tt.peerID, err) - continue - } - nSuccessful++ - } - - if nSuccessful == 0 { - return errors.New("failed to subscribe to fiat rates on any servers") - } - - return nil -} - -func (c *TankaClient) FiatRate(assetID uint32) float64 { - c.fiatRatesMtx.RLock() - defer c.fiatRatesMtx.RUnlock() - sym := dex.BipIDSymbol(assetID) - rateInfo := c.fiatRates[sym] - if rateInfo != nil && time.Since(rateInfo.LastUpdate) < fiatrates.FiatRateDataExpiry && rateInfo.Value > 0 { - return rateInfo.Value - } - return 0 -} - -func (c *TankaClient) emit(thing interface{}) { - select { - case c.payloads <- thing: - default: - c.log.Errorf("payload channel is blocking") - } -} - -func (c *TankaClient) handleMarketBroadcast(_ *tatanka, bcast *mj.Broadcast) { - mktName := string(bcast.Subject) - c.marketsMtx.RLock() - mkt, found := c.markets[mktName] - c.marketsMtx.RUnlock() - if !found { - c.log.Debugf("received order notification for unknown market %q", mktName) - return - } - switch bcast.MessageType { - case mj.MessageTypeTrollBox: - var troll mj.Troll - if err := json.Unmarshal(bcast.Payload, &troll); err != nil { - c.log.Errorf("error unmarshaling trollbox message: %v", err) - return - } - fmt.Printf("trollbox message for market %s: %s\n", mktName, troll.Msg) - case mj.MessageTypeNewOrder: - var ord tanka.Order - if err := json.Unmarshal(bcast.Payload, &ord); err != nil { - c.log.Errorf("error unmarshaling new order: %v", err) - return - } - mkt.addOrder(&ord) - case mj.MessageTypeProposeMatch: - var match tanka.Match - if err := json.Unmarshal(bcast.Payload, &match); err != nil { - c.log.Errorf("error unmarshaling match proposal: %v", err) - return - } - mkt.addMatchProposal(&match) - case mj.MessageTypeAcceptMatch: - var match tanka.Match - if err := json.Unmarshal(bcast.Payload, &match); err != nil { - c.log.Errorf("error unmarshaling match proposal: %v", err) - return - } - mkt.addMatchAcceptance(&match) - case mj.MessageTypeNewSubscriber: - var ns mj.NewSubscriber - if err := json.Unmarshal(bcast.Payload, &ns); err != nil { - c.log.Errorf("error decoding new_subscriber payload: %v", err) - } - // c.emit(&NewMarketSubscriber{ - // MarketName: mktName, - // PeerID: bcast.PeerID, - // }) - default: - c.log.Errorf("received broadcast on %s -> %s with unknown message type %s", bcast.Topic, bcast.Subject) - } -} - -func (c *TankaClient) Broadcast(topic tanka.Topic, subject tanka.Subject, msgType mj.BroadcastMessageType, thing interface{}) error { - payload, err := json.Marshal(thing) - if err != nil { - return fmt.Errorf("error marshaling broadcast payload: %v", err) - } - note := mj.MustRequest(mj.RouteBroadcast, &mj.Broadcast{ - PeerID: c.peerID, - Topic: topic, - Subject: subject, - MessageType: msgType, - Payload: payload, - Stamp: time.Now(), - }) - var oks int - for _, tt := range c.tankas { - var ok bool - if err := c.request(tt, note, &ok); err != nil || !ok { - c.log.Errorf("error sending to %s: ok = %t, err = %v", tt.peerID, ok, err) - } else { - oks++ - } - } - if oks == 0 { - return errors.New("broadcast failed for all servers") - } - return nil -} - -func (c *TankaClient) Connect(ctx context.Context) (*sync.WaitGroup, error) { - var wg sync.WaitGroup - c.wg = &wg - - c.log.Infof("Starting TankaClient with peer ID %s", c.peerID) - - wg.Add(1) - go func() { - defer wg.Done() - <-ctx.Done() - c.tankaMtx.Lock() - for _, tt := range c.tankas { - tt.cm.Disconnect() - } - c.tankas = make(map[tanka.PeerID]*tatanka) - c.tankaMtx.Unlock() - }() - - return c.wg, nil -} - -func (c *TankaClient) AddTatankaNode(ctx context.Context, peerID tanka.PeerID, uri string, cert []byte) error { - pub, err := secp256k1.ParsePubKey(peerID[:]) - if err != nil { - return fmt.Errorf("error parsing pubkey from peer ID: %w", err) - } - - log := c.log.SubLogger("TCP") - cl, err := tcpclient.New(&tcpclient.Config{ - Logger: log, - URL: uri + "/ws", - Cert: cert, - HandleMessage: func(msg *msgjson.Message) *msgjson.Error { - return c.handleTatankaMessage(peerID, msg) - }, - }) - - if err != nil { - return fmt.Errorf("error creating connection: %w", err) - } - - cm := dex.NewConnectionMaster(cl) - if err := cm.ConnectOnce(ctx); err != nil { - return fmt.Errorf("error connecting: %w", err) - } - - c.tankaMtx.Lock() - defer c.tankaMtx.Unlock() - - if oldTanka, exists := c.tankas[peerID]; exists { - oldTanka.cm.Disconnect() - log.Infof("replacing existing connection") - } - - c.tankas[peerID] = &tatanka{ - NetworkBackend: cl, - cm: cm, - peerID: peerID, - pub: pub, - } - - return nil -} - -func (c *TankaClient) handleTatankaMessage(peerID tanka.PeerID, msg *msgjson.Message) *msgjson.Error { - if c.log.Level() == dex.LevelTrace { - c.log.Tracef("Client handling message from tatanka node: route = %s, payload = %s", msg.Route, mj.Truncate(msg.Payload)) - } - - c.tankaMtx.RLock() - tt, exists := c.tankas[peerID] - c.tankaMtx.RUnlock() - if !exists { - c.log.Errorf("%q message received from unknown peer %s", msg.Route, peerID) - return msgjson.NewError(mj.ErrAuth, "who the heck are you?") - } - - if err := mj.CheckSig(msg, tt.pub); err != nil { - // DRAFT TODO: Record for reputation somehow, no? - c.log.Errorf("tatanka node %s sent a bad signature. disconnecting", tt.peerID) - return msgjson.NewError(mj.ErrAuth, "bad sig") - } - - switch msg.Type { - case msgjson.Request: - handle, found := c.requestHandlers[msg.Route] - if !found { - // DRAFT NOTE: We should pontentially be more permissive of unknown - // routes in order to support minor network upgrades that add new - // routes. - c.log.Errorf("tatanka node %s sent a request to an unknown route %q", peerID, msg.Route) - return msgjson.NewError(mj.ErrBadRequest, "what is route %q?", msg.Route) - } - c.handleRequest(tt, msg, handle) - case msgjson.Notification: - handle, found := c.notificationHandlers[msg.Route] - if !found { - // DRAFT NOTE: We should pontentially be more permissive of unknown - // routes in order to support minor network upgrades that add new - // routes. - c.log.Errorf("tatanka node %s sent a notification to an unknown route %q", peerID, msg.Route) - return msgjson.NewError(mj.ErrBadRequest, "what is route %q?", msg.Route) - } - handle(tt, msg) - default: - c.log.Errorf("tatanka node %s send a message with an unhandleable type %d", msg.Type) - return msgjson.NewError(mj.ErrBadRequest, "message type %d doesn't work for me", msg.Type) - } - - return nil -} - -func (c *TankaClient) sendResult(tt *tatanka, msgID uint64, result interface{}) error { - resp, err := msgjson.NewResponse(msgID, result, nil) - if err != nil { - return err - } - if err := c.send(tt, resp); err != nil { - return err - } - return nil -} - -func (c *TankaClient) handleRequest(tt *tatanka, msg *msgjson.Message, handle func(*tatanka, *msgjson.Message) *msgjson.Error) { - if msgErr := handle(tt, msg); msgErr != nil { - respMsg := mj.MustResponse(msg.ID, nil, msgErr) - if err := c.send(tt, respMsg); err != nil { - c.log.Errorf("Send error: %v", err) - } - } -} - -func (c *TankaClient) Auth(peerID tanka.PeerID) error { - c.tankaMtx.RLock() - tt, found := c.tankas[peerID] - c.tankaMtx.RUnlock() - if !found { - return fmt.Errorf("cannot auth with unknown server %s", peerID) - } - return c.auth(tt) -} - -func (c *TankaClient) auth(tt *tatanka) error { - connectMsg := mj.MustRequest(mj.RouteConnect, &mj.Connect{ID: c.peerID}) - var cfg *mj.TatankaConfig - if err := c.request(tt, connectMsg, &cfg); err != nil { - return err - } - tt.config.Store(cfg) - return nil -} - -func (c *TankaClient) tankaNodes() []*tatanka { - c.tankaMtx.RLock() - defer c.tankaMtx.RUnlock() - tankas := make([]*tatanka, 0, len(c.tankas)) - for _, tanka := range c.tankas { - tankas = append(tankas, tanka) - } - return tankas -} - -func (c *TankaClient) PostBond(bond *tanka.Bond) error { - msg := mj.MustRequest(mj.RoutePostBond, []*tanka.Bond{bond}) - var success bool - for _, tt := range c.tankaNodes() { - var res bool - if err := c.request(tt, msg, &res); err != nil { - c.log.Errorf("error sending bond to tatanka node %s", tt.peerID) - } else { - success = true - } - } - if success { - return nil - } - return errors.New("failed to report bond to any tatanka nodes") -} - -func (c *TankaClient) SubscribeMarket(baseID, quoteID uint32) error { - mktName, err := dex.MarketName(baseID, quoteID) - if err != nil { - return fmt.Errorf("error constructing market name: %w", err) - } - - msg := mj.MustRequest(mj.RouteSubscribe, &mj.Subscription{ - Topic: mj.TopicMarket, - Subject: tanka.Subject(mktName), - }) - - c.marketsMtx.Lock() - defer c.marketsMtx.Unlock() - - ttNodes := c.tankaNodes() - subscribed := make([]*tatanka, 0, len(ttNodes)) - for _, tt := range ttNodes { - var ok bool // true is only possible non-error payload. - if err := c.request(tt, msg, &ok); err != nil { - c.log.Errorf("Error subscribing to %s market with %s: %w", mktName, tt.peerID, err) - continue - } - subscribed = append(subscribed, tt) - } - - if len(subscribed) == 0 { - return fmt.Errorf("failed to subscribe to market %s on any servers", mktName) - } else { - c.markets[mktName] = &market{ - log: c.log.SubLogger(mktName), - ords: make(map[tanka.ID32]*order), - } - } - - return nil -} - -func (c *TankaClient) send(tt *tatanka, msg *msgjson.Message) error { - mj.SignMessage(c.priv, msg) - return tt.Send(msg) -} - -func (c *TankaClient) request(tt *tatanka, msg *msgjson.Message, resp interface{}) error { - mj.SignMessage(c.priv, msg) - errChan := make(chan error) - if err := tt.Request(msg, func(msg *msgjson.Message) { - errChan <- msg.UnmarshalResult(&resp) - }); err != nil { - errChan <- fmt.Errorf("request error: %w", err) - } - - select { - case err := <-errChan: - if err != nil { - return fmt.Errorf("tankagram error: %w", err) - } - case <-time.After(time.Second * 30): - return errors.New("timed out waiting for tankagram result") - } - return nil -} - -func (c *TankaClient) ConnectPeer(peerID tanka.PeerID, hosts ...tanka.PeerID) (*mj.TankagramResult, error) { - c.peersMtx.Lock() - defer c.peersMtx.Unlock() - p, exists := c.peers[peerID] - if !exists { - priv, err := rsa.GenerateKey(rand.Reader, rsaPrivateKeyLength) - if err != nil { - return nil, fmt.Errorf("error generating rsa key: %v", err) - } - remotePub, err := secp256k1.ParsePubKey(peerID[:]) - if err != nil { - return nil, fmt.Errorf("error parsing remote pubkey: %v", err) - } - p = &peer{ - id: peerID, - pub: remotePub, - decryptionKey: priv, - } - } - - msg := mj.MustNotification(mj.RouteEncryptionKey, dex.Bytes(p.wireKey())) - mj.SignMessage(c.priv, msg) // We sign the embedded message separately. - - req := mj.MustRequest(mj.RouteTankagram, &mj.Tankagram{ - To: peerID, - From: c.peerID, - Message: msg, - }) - - var tts []*tatanka - if len(hosts) == 0 { - tts = c.tankaNodes() - } else { - tts = make([]*tatanka, 0, len(hosts)) - c.tankaMtx.RLock() - for _, host := range hosts { - tt, exists := c.tankas[host] - if !exists { - c.log.Warnf("Requested host %q is not known", host) - continue - } - tts = append(tts, tt) - } - c.tankaMtx.RUnlock() - } - if len(tts) == 0 { - return nil, errors.New("no hosts") - } - - for _, tt := range tts { - var r mj.TankagramResult - if err := c.request(tt, req, &r); err != nil { - c.log.Errorf("error sending rsa key to %s: %v", peerID, err) - continue - } - if r.Result == mj.TRTTransmitted { - // We need to get this to the caller, as a Tankagram result can - // be used as part of an audit request for reporting penalties. - pub, err := decodePubkeyRSA(r.Response) - if err != nil { - return nil, fmt.Errorf("error decoding RSA pub key from %s: %v", p.id, err) - } - p.encryptionKey = pub - c.peers[peerID] = p - return &r, nil - } - return &r, nil - } - return nil, errors.New("no path") -} - -func (c *TankaClient) SendTankagram(peerID tanka.PeerID, msg *msgjson.Message) (_ *mj.TankagramResult, _ json.RawMessage, err error) { - c.peersMtx.RLock() - p, known := c.peers[peerID] - c.peersMtx.RUnlock() - - if !known { - return nil, nil, fmt.Errorf("not connected to peer %s", peerID) - } - - mj.SignMessage(c.priv, msg) - tankaGram := &mj.Tankagram{ - From: c.peerID, - To: peerID, - } - tankaGram.EncryptedMsg, err = c.signAndEncryptTankagram(p, msg) - if err != nil { - return nil, nil, fmt.Errorf("error signing and encrypting tankagram for %s: %w", p.id, err) - } - wrappedMsg := mj.MustRequest(mj.RouteTankagram, tankaGram) - for _, tt := range c.tankaNodes() { - var r mj.TankagramResult - if err := c.request(tt, wrappedMsg, &r); err != nil { - c.log.Errorf("error sending tankagram to %s via %s: %v", p.id, tt.peerID, err) - continue - } - switch r.Result { - case mj.TRTTransmitted: - resB, err := p.decryptRSA(r.Response) - return &r, resB, err - case mj.TRTErrFromPeer, mj.TRTErrBadClient: - return &r, nil, nil - case mj.TRTNoPath, mj.TRTErrFromTanka: - continue - default: - return nil, nil, fmt.Errorf("unknown result %q", r.Result) - } - - } - return nil, nil, fmt.Errorf("no path") -} - -func (c *TankaClient) signAndEncryptTankagram(p *peer, msg *msgjson.Message) ([]byte, error) { - mj.SignMessage(c.priv, msg) - b, err := json.Marshal(msg) - if err != nil { - return nil, fmt.Errorf("error marshaling tankagram: %w", err) - } - return p.encryptRSA(b) -} - -func decodePubkeyRSA(b []byte) (*rsa.PublicKey, error) { - if len(b) < 9 { - return nil, fmt.Errorf("invalid payload length of %d", len(b)) - } - exponentB, modulusB := b[:8], b[8:] - exponent := int(binary.BigEndian.Uint64(exponentB)) - modulus := new(big.Int).SetBytes(modulusB) - return &rsa.PublicKey{ - E: exponent, - N: modulus, - }, nil -} diff --git a/tatanka/client/conn/conn.go b/tatanka/client/conn/conn.go new file mode 100644 index 0000000000..afd0971c12 --- /dev/null +++ b/tatanka/client/conn/conn.go @@ -0,0 +1,681 @@ +// This code is available on the terms of the project LICENSE.md file, +// also available online at https://blueoakcouncil.org/license/1.0.0. + +package conn + +import ( + "context" + "crypto/rand" + "crypto/rsa" + "encoding/binary" + "encoding/json" + "errors" + "fmt" + "math/big" + "sync" + "sync/atomic" + "time" + + "decred.org/dcrdex/dex" + "decred.org/dcrdex/dex/msgjson" + "decred.org/dcrdex/tatanka/mj" + "decred.org/dcrdex/tatanka/tanka" + tcpclient "decred.org/dcrdex/tatanka/tcp/client" + "github.com/decred/dcrd/crypto/blake256" + "github.com/decred/dcrd/dcrec/secp256k1/v4" +) + +const rsaPrivateKeyLength = 2048 + +// NetworkBackend represents a peer's communication protocol. +type NetworkBackend interface { + Send(*msgjson.Message) error + Request(msg *msgjson.Message, respHandler func(*msgjson.Message)) error +} + +// tatanka is a Tatanka mesh server node. +type tatanka struct { + NetworkBackend + cm *dex.ConnectionMaster + url string + peerID tanka.PeerID + pub *secp256k1.PublicKey + config atomic.Value // *mj.TatankaConfig +} + +func (tt *tatanka) String() string { + return fmt.Sprintf("%s @ %s", tt.peerID, tt.url) +} + +// peer is a network peer with which we have established encrypted +// communication. +type peer struct { + id tanka.PeerID + pub *secp256k1.PublicKey + decryptionKey *rsa.PrivateKey // ours + encryptionKey *rsa.PublicKey +} + +// wireKey encrypts our RSA public key for transmission. +func (p *peer) wireKey() []byte { + modulusB := p.decryptionKey.PublicKey.N.Bytes() + encryptionKey := make([]byte, 8+len(modulusB)) + binary.BigEndian.PutUint64(encryptionKey[:8], uint64(p.decryptionKey.PublicKey.E)) + copy(encryptionKey[8:], modulusB) + return encryptionKey +} + +// https://stackoverflow.com/a/67035019 +func (p *peer) decryptRSA(enc []byte) ([]byte, error) { + msgLen := len(enc) + hasher := blake256.New() + step := p.decryptionKey.PublicKey.Size() + var b []byte + for start := 0; start < msgLen; start += step { + finish := start + step + if finish > msgLen { + finish = msgLen + } + block, err := rsa.DecryptOAEP(hasher, rand.Reader, p.decryptionKey, enc[start:finish], []byte{}) + if err != nil { + return nil, err + } + b = append(b, block...) + } + return b, nil +} + +func (p *peer) encryptRSA(b []byte) ([]byte, error) { + msgLen := len(b) + hasher := blake256.New() + step := p.encryptionKey.Size() - 2*hasher.Size() - 2 + var enc []byte + for start := 0; start < msgLen; start += step { + finish := start + step + if finish > msgLen { + finish = msgLen + } + block, err := rsa.EncryptOAEP(hasher, rand.Reader, p.encryptionKey, b[start:finish], []byte{}) + if err != nil { + return nil, err + } + enc = append(enc, block...) + } + return enc, nil +} + +// IncomingPeerConnect will be emitted when a peer requests a connection for +// the transmission of tankagrams. +type IncomingPeerConnect struct { + PeerID tanka.PeerID + Reject func() +} + +// IncomingTankagram will be emitted when we receive a tankagram from a +// connected peer. +type IncomingTankagram struct { + Msg *msgjson.Message + Respond func(thing interface{}) error +} + +// NewMarketSubscriber will be emitted when a new client subscribes to a market. +type NewMarketSubscriber struct { + MarketName string + PeerID tanka.PeerID +} + +// MessageHandlers are handlers for different types of messages from the Mesh. +type MessageHandlers struct { + HandleTatankaRequest func(tanka.PeerID, *msgjson.Message) *msgjson.Error + HandleTatankaNotification func(tanka.PeerID, *msgjson.Message) + HandlePeerMessage func(tanka.PeerID, any) *msgjson.Error +} + +// Config is the configuration for the MeshConn. +type Config struct { + EntryNode *TatankaCredentials + Logger dex.Logger + Handlers *MessageHandlers + PrivateKey *secp256k1.PrivateKey +} + +// TatankaCredentials are the connection credentials for a Tatanka node. +type TatankaCredentials struct { + PeerID tanka.PeerID + Addr string + Cert []byte + NoTLS bool +} + +// MeshConn is a Tatanka Mesh connection manager. MeshConn handles both tatanka +// nodes and regular peers. +type MeshConn struct { + log dex.Logger + handlers *MessageHandlers + entryNode *TatankaCredentials + + peerID tanka.PeerID + priv *secp256k1.PrivateKey + + tankaMtx sync.RWMutex + tatankaNodes map[tanka.PeerID]*tatanka + + peersMtx sync.RWMutex + peers map[tanka.PeerID]*peer +} + +// New is the constructor for a new MeshConn. +func New(cfg *Config) *MeshConn { + var peerID tanka.PeerID + copy(peerID[:], cfg.PrivateKey.PubKey().SerializeCompressed()) + c := &MeshConn{ + log: cfg.Logger, + handlers: cfg.Handlers, + priv: cfg.PrivateKey, + peerID: peerID, + entryNode: cfg.EntryNode, + tatankaNodes: make(map[tanka.PeerID]*tatanka), + peers: make(map[tanka.PeerID]*peer), + } + return c +} + +func (c *MeshConn) handleTankagram(tt *tatanka, tankagram *msgjson.Message) *msgjson.Error { + var gram mj.Tankagram + if err := tankagram.Unmarshal(&gram); err != nil { + return msgjson.NewError(mj.ErrBadRequest, "unmarshal error") + } + + c.peersMtx.Lock() + defer c.peersMtx.Unlock() + p, peerExisted := c.peers[gram.From] + if !peerExisted { + // TODO: We should do a little message verification before accepting + // new peers. + if gram.Message == nil || gram.Message.Route != mj.RouteEncryptionKey { + return msgjson.NewError(mj.ErrBadRequest, "where's your key?") + } + pub, err := secp256k1.ParsePubKey(gram.From[:]) + if err != nil { + c.log.Errorf("could not parse pubkey for tankagram from %s: %w", gram.From, err) + return msgjson.NewError(mj.ErrBadRequest, "bad pubkey") + } + priv, err := rsa.GenerateKey(rand.Reader, rsaPrivateKeyLength) + if err != nil { + return msgjson.NewError(mj.ErrInternal, "error generating rsa key: %v", err) + } + p = &peer{ + id: gram.From, + pub: pub, + decryptionKey: priv, + } + + msg := gram.Message + if err := mj.CheckSig(msg, p.pub); err != nil { + c.log.Errorf("%s sent a unencrypted message with a bad signature: %w", p.id, err) + return msgjson.NewError(mj.ErrBadRequest, "bad gram sig") + } + + var b dex.Bytes + if err := msg.Unmarshal(&b); err != nil { + c.log.Errorf("%s tankagram unmarshal error: %w", err) + return msgjson.NewError(mj.ErrBadRequest, "unmarshal key error") + } + + p.encryptionKey, err = decodePubkeyRSA(b) + if err != nil { + c.log.Errorf("error decoding RSA pub key from %s: %v", p.id, err) + return msgjson.NewError(mj.ErrBadRequest, "bad key encoding") + } + + if err := c.sendResult(tt, tankagram.ID, dex.Bytes(p.wireKey())); err != nil { + c.log.Errorf("error responding to encryption key message from peer %s", p.id) + } else { + c.peers[p.id] = p + c.handlers.HandlePeerMessage(p.id, &IncomingPeerConnect{ + PeerID: p.id, + Reject: func() { + c.peersMtx.Lock() + delete(c.peers, p.id) + c.peersMtx.Unlock() + }, + }) + } + return nil + } + + // If this isn't the encryption key, this gram.Message is ignored and this + // is assumed to be encrypted. + if len(gram.EncryptedMsg) == 0 { + c.log.Errorf("%s sent a tankagram with no message or data", p.id) + return msgjson.NewError(mj.ErrBadRequest, "bad gram") + } + + b, err := p.decryptRSA(gram.EncryptedMsg) + if err != nil { + c.log.Errorf("%s sent an enrypted message that didn't decrypt: %v", p.id, err) + return msgjson.NewError(mj.ErrBadRequest, "bad encryption") + } + msg, err := msgjson.DecodeMessage(b) + if err != nil { + c.log.Errorf("%s sent a tankagram that didn't encode a message: %v", p.id, err) + return msgjson.NewError(mj.ErrBadRequest, "where's the message?") + } + + c.handlers.HandlePeerMessage(p.id, &IncomingTankagram{ + Msg: msg, + Respond: func(thing interface{}) error { + b, err := json.Marshal(thing) + if err != nil { + return err + } + enc, err := p.encryptRSA(b) + if err != nil { + return err + } + return c.sendResult(tt, tankagram.ID, dex.Bytes(enc)) + }, + }) + + return nil +} + +func (c *MeshConn) Connect(ctx context.Context) (*sync.WaitGroup, error) { + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + <-ctx.Done() + c.tankaMtx.Lock() + for peerID, tt := range c.tatankaNodes { + c.log.Infof("Disconnecting old tatanka node %s", tt) + tt.cm.Disconnect() + delete(c.tatankaNodes, peerID) + } + c.tankaMtx.Unlock() + }() + + if err := c.addTatankaNode(ctx, c.entryNode); err != nil { + return nil, err + } + + return &wg, nil +} + +func (c *MeshConn) addTatankaNode(ctx context.Context, creds *TatankaCredentials) error { + peerID, uri, cert := creds.PeerID, "wss://"+creds.Addr, creds.Cert + if creds.NoTLS { + uri = "ws://" + creds.Addr + } + pub, err := secp256k1.ParsePubKey(peerID[:]) + if err != nil { + return fmt.Errorf("error parsing pubkey from peer ID: %w", err) + } + log := c.log.SubLogger("TCP") + cl, err := tcpclient.New(&tcpclient.Config{ + Logger: log, + URL: uri + "/ws", + Cert: cert, + HandleMessage: func(msg *msgjson.Message) *msgjson.Error { + return c.handleTatankaMessage(peerID, msg) + }, + }) + if err != nil { + return fmt.Errorf("error creating connection: %w", err) + } + + cm := dex.NewConnectionMaster(cl) + + c.tankaMtx.Lock() + defer c.tankaMtx.Unlock() + + if tt := c.tatankaNodes[peerID]; tt != nil { + tt.cm.Disconnect() + log.Infof("replacing existing connection for tatanka node %s", tt) + } + + c.tatankaNodes[peerID] = &tatanka{ + NetworkBackend: cl, + cm: cm, + url: uri, + peerID: peerID, + pub: pub, + } + + if err := cm.Connect(ctx); err != nil { + c.log.Errorf("error connecting to tatanka node %s at %s: %v. will keep trying to connect", err) + } + + return nil +} + +// Auth sends our connect message to the tatanka node. +func (c *MeshConn) Auth(tatankaID tanka.PeerID) error { + c.tankaMtx.RLock() + tt, found := c.tatankaNodes[tatankaID] + c.tankaMtx.RUnlock() + if !found { + return fmt.Errorf("cannot auth with unknown server %s", tatankaID) + } + connectMsg := mj.MustRequest(mj.RouteConnect, &mj.Connect{ID: c.peerID}) + mj.SignMessage(c.priv, connectMsg) + var cfg *mj.TatankaConfig + if err := c.requestTT(tt, connectMsg, &cfg, DefaultRequestTimeout); err != nil { + return err + } + tt.config.Store(cfg) + return nil +} + +func (c *MeshConn) tatanka(tatankaID tanka.PeerID) *tatanka { + c.tankaMtx.RLock() + defer c.tankaMtx.RUnlock() + return c.tatankaNodes[tatankaID] +} + +type TatankaSelectionMode string + +const ( + SelectionModeEntryNode TatankaSelectionMode = "EntryNode" + SelectionModeAll TatankaSelectionMode = "All" + SelectionModeAny TatankaSelectionMode = "Any" +) + +// tatankas generates a list of tatanka nodes. +func (c *MeshConn) tatankas(mode TatankaSelectionMode) (tts []*tatanka, _ error) { + c.tankaMtx.RLock() + defer c.tankaMtx.RUnlock() + en := c.tatankaNodes[c.entryNode.PeerID] + switch mode { + case SelectionModeEntryNode: + if en == nil { + return nil, errors.New("no entry node initialized") + } + if !en.cm.On() { + return nil, errors.New("entry node no connected") + } + return []*tatanka{en}, nil + case SelectionModeAll, SelectionModeAny: + tts := make([]*tatanka, 0, len(c.tatankaNodes)) + var skipID tanka.PeerID + // Entry node always goes first, if available. + if en != nil && en.cm.On() { + tts = append(tts, en) + skipID = en.peerID + } + for peerID, tt := range c.tatankaNodes { + if tt.cm.On() && peerID != skipID { + tts = append(tts, tt) + } + } + if len(tts) == 0 { + return nil, errors.New("no tatanka nodes available") + } + return tts, nil + default: + return nil, fmt.Errorf("unknown tatanka selection mode %q", mode) + } +} + +func (c *MeshConn) handleTatankaMessage(tatankaID tanka.PeerID, msg *msgjson.Message) *msgjson.Error { + if c.log.Level() == dex.LevelTrace { + c.log.Tracef("Client handling message from tatanka node: route = %s, payload = %s", msg.Route, mj.Truncate(msg.Payload)) + } + + tt := c.tatanka(tatankaID) + if tt == nil { + c.log.Error("Message received from unknown tatanka node") + return msgjson.NewError(mj.ErrUnknownSender, "who are you?") + } + + if err := mj.CheckSig(msg, tt.pub); err != nil { + // DRAFT TODO: Record for reputation somehow, no? + c.log.Errorf("tatanka node %s sent a bad signature. disconnecting", tt.peerID) + return msgjson.NewError(mj.ErrAuth, "bad sig") + } + + if msg.Type == msgjson.Request && msg.Route == mj.RouteTankagram { + return c.handleTankagram(tt, msg) + } + + switch msg.Type { + case msgjson.Request: + return c.handlers.HandleTatankaRequest(tatankaID, msg) + case msgjson.Notification: + c.handlers.HandleTatankaNotification(tatankaID, msg) + return nil + default: + c.log.Errorf("tatanka node %s send a message with an unhandleable type %d", msg.Type) + return msgjson.NewError(mj.ErrBadRequest, "message type %d doesn't work for me", msg.Type) + } +} + +func (c *MeshConn) sendResult(tt *tatanka, msgID uint64, result interface{}) error { + resp, err := msgjson.NewResponse(msgID, result, nil) + if err != nil { + return err + } + return tt.Send(resp) +} + +const DefaultRequestTimeout = 30 * time.Second + +type requestConfig struct { + timeout time.Duration + errCodeFunc func(code int) + selectionMode TatankaSelectionMode + examineFunc func(tatankaURL string, tatankaID tanka.PeerID) (ok bool) +} + +// RequestOption is an optional modifier to request behavior. +type RequestOption func(cfg *requestConfig) + +// WithTimeout sets the time out for the request. If no timeout is specified +// DefaultRequestTimeout is used. +func WithTimeout(d time.Duration) RequestOption { + return func(cfg *requestConfig) { + cfg.timeout = d + } +} + +// WithErrorCode enables the caller to inspect the error code when messaging +// fails. +func WithErrorCode(f func(code int)) RequestOption { + return func(cfg *requestConfig) { + cfg.errCodeFunc = f + } +} + +// WithSelectionMode set which tatanka nodes are selected for the request. +func WithSelectionMode(mode TatankaSelectionMode) RequestOption { + return func(cfg *requestConfig) { + cfg.selectionMode = mode + } +} + +// WithExamination allows the caller to check the result before returning, and +// continue trying other tatanka nodes if necessary. +func WithExamination(f func(tatankaURL string, tatankaID tanka.PeerID) (resultOK bool)) RequestOption { + return func(cfg *requestConfig) { + cfg.examineFunc = f + } +} + +// RequestMesh sends a request to the Mesh. +func (c *MeshConn) RequestMesh(msg *msgjson.Message, thing any, opts ...RequestOption) error { + cfg := &requestConfig{ + timeout: DefaultRequestTimeout, + selectionMode: SelectionModeEntryNode, + } + for _, opt := range opts { + opt(cfg) + } + + tts, err := c.tatankas(cfg.selectionMode) + if err != nil { + return err + } + + for _, tt := range tts { + if err := c.requestTT(tt, msg, thing, cfg.timeout); err == nil { + var resultOK bool = true + if cfg.examineFunc != nil { + resultOK = cfg.examineFunc(tt.url, tt.peerID) + } + switch cfg.selectionMode { + case SelectionModeAny, SelectionModeEntryNode: + if resultOK { + return nil + } + } + } else { // err != nil + var msgErr *msgjson.Error + if cfg.errCodeFunc != nil && errors.As(err, &msgErr) { + cfg.errCodeFunc(msgErr.Code) + } + switch cfg.selectionMode { + case SelectionModeEntryNode: + return err + } + } + } + + return errors.New("failed to request from any tatanka nodes") +} + +func (c *MeshConn) requestTT(tt *tatanka, msg *msgjson.Message, thing any, timeout time.Duration) (err error) { + errChan := make(chan error) + if err := tt.Request(msg, func(msg *msgjson.Message) { + errChan <- msg.UnmarshalResult(thing) + }); err != nil { + errChan <- fmt.Errorf("request error: %w", err) + } + + select { + case err = <-errChan: + case <-time.After(timeout): + return fmt.Errorf("timed out (%s) waiting for response from %s for route %q", timeout, tt, msg.Route) + } + return err +} + +// ConnectPeer connects to a peer by sending our encryption key and receiving +// theirs. +func (c *MeshConn) ConnectPeer(peerID tanka.PeerID) error { + c.peersMtx.Lock() + defer c.peersMtx.Unlock() + p, exists := c.peers[peerID] + if !exists { + priv, err := rsa.GenerateKey(rand.Reader, rsaPrivateKeyLength) + if err != nil { + return fmt.Errorf("error generating rsa key: %v", err) + } + remotePub, err := secp256k1.ParsePubKey(peerID[:]) + if err != nil { + return fmt.Errorf("error parsing remote pubkey: %v", err) + } + p = &peer{ + id: peerID, + pub: remotePub, + decryptionKey: priv, + } + } + + msg := mj.MustNotification(mj.RouteEncryptionKey, dex.Bytes(p.wireKey())) + mj.SignMessage(c.priv, msg) // We sign the embedded message separately. + + req := mj.MustRequest(mj.RouteTankagram, &mj.Tankagram{ + To: peerID, + From: c.peerID, + Message: msg, + }) + + var r mj.TankagramResult + if err := c.RequestMesh(req, &r, WithSelectionMode(SelectionModeAny), WithExamination(func(tatankaURL string, tatankaID tanka.PeerID) bool { + if r.Result == mj.TRTTransmitted { + return true + } + c.log.Errorf("Tankagram transmission failure connecting to %s via %s @ %s: %q", peerID, tatankaID, tatankaURL, r.Result) + return false + })); err != nil { + return err + } + + // We need to get this to the caller, as a Tankagram result can + // be used as part of an audit request for reporting penalties. + pub, err := decodePubkeyRSA(r.Response) + if err != nil { + return fmt.Errorf("error decoding RSA pub key from %s: %v", p.id, err) + } + p.encryptionKey = pub + c.peers[peerID] = p + return nil +} + +// RequestPeer sends a request to an already-connected peer. +func (c *MeshConn) RequestPeer(peerID tanka.PeerID, msg *msgjson.Message, thing interface{}) (_ *mj.TankagramResult, err error) { + c.peersMtx.RLock() + p, known := c.peers[peerID] + c.peersMtx.RUnlock() + + if !known { + return nil, fmt.Errorf("not connected to peer %s", peerID) + } + + mj.SignMessage(c.priv, msg) + tankaGram := &mj.Tankagram{ + From: c.peerID, + To: peerID, + } + tankaGram.EncryptedMsg, err = c.signAndEncryptTankagram(p, msg) + if err != nil { + return nil, fmt.Errorf("error signing and encrypting tankagram for %s: %w", p.id, err) + } + var res mj.TankagramResult + wrappedMsg := mj.MustRequest(mj.RouteTankagram, tankaGram) + if err := c.RequestMesh(wrappedMsg, &res, WithSelectionMode(SelectionModeAny), WithExamination(func(tatankaURL string, tatankaID tanka.PeerID) bool { + if res.Result == mj.TRTTransmitted { + return true + } + c.log.Errorf("Tankagram transmission failure sending %s to %s via %s @ %s: %q", msg.Route, peerID, tatankaID, tatankaURL, res.Result) + return false + })); err != nil { + return nil, err + } + respB, err := p.decryptRSA(res.Response) + if err != nil { + return nil, fmt.Errorf("message to %s transmitted, but errored while decoding response: %w", p.id, err) + } + if thing != nil { + if len(respB) == 0 { + return nil, fmt.Errorf("empty response from %s when non-empty response expected", p.id) + } + if err = json.Unmarshal(respB, thing); err != nil { + return nil, fmt.Errorf("error unmarshaling result from peer %s: %w", p.id, err) + } + } + return &res, nil +} + +func (c *MeshConn) signAndEncryptTankagram(p *peer, msg *msgjson.Message) ([]byte, error) { + mj.SignMessage(c.priv, msg) + b, err := json.Marshal(msg) + if err != nil { + return nil, fmt.Errorf("error marshaling tankagram: %w", err) + } + return p.encryptRSA(b) +} + +func decodePubkeyRSA(b []byte) (*rsa.PublicKey, error) { + if len(b) < 9 { + return nil, fmt.Errorf("invalid payload length of %d", len(b)) + } + exponentB, modulusB := b[:8], b[8:] + exponent := int(binary.BigEndian.Uint64(exponentB)) + modulus := new(big.Int).SetBytes(modulusB) + return &rsa.PublicKey{ + E: exponent, + N: modulus, + }, nil +} diff --git a/tatanka/client/client_test.go b/tatanka/client/conn/conn_test.go similarity index 98% rename from tatanka/client/client_test.go rename to tatanka/client/conn/conn_test.go index 3569596c86..c9ee8f5d07 100644 --- a/tatanka/client/client_test.go +++ b/tatanka/client/conn/conn_test.go @@ -1,4 +1,4 @@ -package client +package conn import ( "bytes" diff --git a/tatanka/client/mesh/mesh.go b/tatanka/client/mesh/mesh.go new file mode 100644 index 0000000000..31f8788576 --- /dev/null +++ b/tatanka/client/mesh/mesh.go @@ -0,0 +1,318 @@ +// This code is available on the terms of the project LICENSE.md file, +// also available online at https://blueoakcouncil.org/license/1.0.0. + +package mesh + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "strings" + "sync" + "time" + + "decred.org/dcrdex/dex" + "decred.org/dcrdex/dex/fiatrates" + "decred.org/dcrdex/dex/lexi" + "decred.org/dcrdex/dex/msgjson" + "decred.org/dcrdex/tatanka/client/conn" + "decred.org/dcrdex/tatanka/mj" + "decred.org/dcrdex/tatanka/tanka" + "github.com/decred/dcrd/dcrec/secp256k1/v4" +) + +// Config is the configuration settings for Mesh. +type Config struct { + DataDir string + PrivateKey *secp256k1.PrivateKey + Logger dex.Logger + EntryNode *TatankaCredentials +} + +// Mesh is a manager for operations on the Tatanka Mesh Network. +type Mesh struct { + priv *secp256k1.PrivateKey + peerID tanka.PeerID + // cfg *Config + log dex.Logger + entryNode *TatankaCredentials + conn *meshConn + payloads chan interface{} + + dataDir string + db *lexi.DB + dbCM *dex.ConnectionMaster + bondTable *lexi.Table + + marketsMtx sync.RWMutex + markets map[string]*market + + fiatRatesMtx sync.RWMutex + fiatRates map[string]*fiatrates.FiatRateInfo +} + +// New is the constructor for a new Mesh. +func New(cfg *Config) (*Mesh, error) { + var peerID tanka.PeerID + copy(peerID[:], cfg.PrivateKey.PubKey().SerializeCompressed()) + + if cfg.DataDir == "" { + return nil, errors.New("no data directory provided") + } + + mesh := &Mesh{ + priv: cfg.PrivateKey, + peerID: peerID, + log: cfg.Logger, + dataDir: cfg.DataDir, + entryNode: cfg.EntryNode, + payloads: make(chan interface{}, 128), + markets: make(map[string]*market), + fiatRates: make(map[string]*fiatrates.FiatRateInfo), + } + + if err := mesh.initializeDB(); err != nil { + return nil, fmt.Errorf("failed to initialize database: %w", err) + } + + return mesh, nil +} + +// Connect initializes the Mesh. +func (m *Mesh) Connect(ctx context.Context) (*sync.WaitGroup, error) { + var wg sync.WaitGroup + + dbCM := dex.NewConnectionMaster(m.db) + if err := dbCM.ConnectOnce(ctx); err != nil { + return nil, fmt.Errorf("couldn't start database: %w", err) + } + m.dbCM = dbCM + + mesh := conn.New(&conn.Config{ + EntryNode: m.entryNode, + Logger: m.log.SubLogger("tTC"), + Handlers: &conn.MessageHandlers{ + HandleTatankaRequest: func(tatankaID tanka.PeerID, msg *msgjson.Message) *msgjson.Error { + return m.handleTatankaRequest(tatankaID, msg) + }, + HandleTatankaNotification: func(tatankaID tanka.PeerID, msg *msgjson.Message) { + m.handleTatankaNotification(tatankaID, msg) + }, + HandlePeerMessage: func(peerID tanka.PeerID, msg any) *msgjson.Error { + return m.handlePeerRequest(peerID, msg) + }, + }, + PrivateKey: m.priv, + }) + + meshCM := dex.NewConnectionMaster(mesh) + if err := meshCM.ConnectOnce(ctx); err != nil { + return nil, fmt.Errorf("ConnectOnce error: %w", err) + } + + m.conn = &meshConn{mesh, meshCM} + + wg.Add(1) + go func() { + <-dbCM.Done() + <-meshCM.Done() + wg.Done() + }() + + return &wg, nil +} + +// ID returns our peer ID on Mesh. +func (m *Mesh) ID() tanka.PeerID { + return m.peerID +} + +func (m *Mesh) initializeDB() error { + db, err := lexi.New(&lexi.Config{ + Path: m.dataDir, + Log: m.log.SubLogger("DB"), + }) + if err != nil { + return err + } + m.db = db + + m.bondTable, err = db.Table("bond") + return err +} + +// Next emits certain types of messages. +func (m *Mesh) Next() <-chan any { + return m.payloads +} + +func (m *Mesh) emit(thing any) { + select { + case m.payloads <- thing: + default: + m.log.Errorf("payload channel is blocking") + } +} + +func (m *Mesh) handleTatankaRequest(tatankaID tanka.PeerID, msg *msgjson.Message) *msgjson.Error { + switch msg.Route { + default: + m.log.Debugf("Received a request from tatanka node %s for unknown route %q", tatankaID, msg.Route) + } + return nil +} + +func (m *Mesh) handleTatankaNotification(peerID tanka.PeerID, msg *msgjson.Message) { + switch msg.Route { + case mj.RouteBroadcast: + m.handleBroadcast(msg) + case mj.RouteRates: + m.handleRates(msg) + default: + m.emit(msg) + } +} + +func (m *Mesh) handlePeerRequest(peerID tanka.PeerID, msgI any) *msgjson.Error { + switch msgI.(type) { + case *conn.IncomingPeerConnect: + case *conn.IncomingTankagram: + } + m.emit(msgI) + return nil +} + +func (m *Mesh) Broadcast(topic tanka.Topic, subject tanka.Subject, msgType mj.BroadcastMessageType, thing interface{}) error { + payload, err := json.Marshal(thing) + if err != nil { + return fmt.Errorf("error marshaling broadcast payload: %v", err) + } + req := mj.MustRequest(mj.RouteBroadcast, &mj.Broadcast{ + PeerID: m.peerID, + Topic: topic, + Subject: subject, + MessageType: msgType, + Payload: payload, + Stamp: time.Now(), + }) + // Only possible non-error response is `true`. + var ok bool + return m.conn.RequestMesh(req, &ok) +} + +func (m *Mesh) SubscribeToFiatRates() error { + req := mj.MustRequest(mj.RouteSubscribe, &mj.Subscription{ + Topic: mj.TopicFiatRate, + }) + + // Only possible non-error response is `true`. + var ok bool + return m.conn.RequestMesh(req, &ok) +} + +// PostBond stores the bond in the database and sends it to the mesh. +func (m *Mesh) PostBond(bond *tanka.Bond) error { + k := bond.ID() + if err := m.bondTable.Set(lexi.B(k[:]), lexi.JSON(bond)); err != nil { + return fmt.Errorf("error storing bond in DB: %w", err) + } + req := mj.MustRequest(mj.RoutePostBond, []*tanka.Bond{bond}) + var res bool + return m.conn.RequestMesh(req, &res) +} + +// ActiveBonds retrieves the active bonds from the database. +func (m *Mesh) ActiveBonds() ([]*tanka.Bond, error) { + bonds := make([]*tanka.Bond, 0, 1) + return bonds, m.bondTable.Iterate(nil, func(it *lexi.Iter) error { + var bond tanka.Bond + if err := it.V(func(vB []byte) error { + return json.Unmarshal(vB, &bond) + }); err != nil { + return err + } + bonds = append(bonds, &bond) + return nil + }) +} + +func (m *Mesh) SubscribeMarket(baseID, quoteID uint32) error { + mktName, err := dex.MarketName(baseID, quoteID) + if err != nil { + return fmt.Errorf("error constructing market name: %w", err) + } + + req := mj.MustRequest(mj.RouteSubscribe, &mj.Subscription{ + Topic: mj.TopicMarket, + Subject: tanka.Subject(mktName), + }) + + m.marketsMtx.Lock() + defer m.marketsMtx.Unlock() + + // Only possible non-error response is `true`. + var ok bool + if err := m.conn.RequestMesh(req, &ok); err != nil { + return err + } + + m.markets[mktName] = &market{ + log: m.log.SubLogger(mktName), + ords: make(map[tanka.ID32]*order), + } + return nil +} + +func (m *Mesh) handleBroadcast(msg *msgjson.Message) { + var bcast mj.Broadcast + if err := msg.Unmarshal(&bcast); err != nil { + m.log.Errorf("%s broadcast unmarshal error: %w", err) + return + } + switch bcast.Topic { + case mj.TopicMarket: + m.handleMarketBroadcast(&bcast) + } + m.emit(&bcast) +} + +func (m *Mesh) handleRates(msg *msgjson.Message) { + var rm mj.RateMessage + if err := msg.Unmarshal(&rm); err != nil { + m.log.Errorf("%s rate message unmarshal error: %w", err) + return + } + switch rm.Topic { + case mj.TopicFiatRate: + m.fiatRatesMtx.Lock() + for ticker, rateInfo := range rm.Rates { + m.fiatRates[strings.ToLower(ticker)] = &fiatrates.FiatRateInfo{ + Value: rateInfo.Value, + LastUpdate: time.Now(), + } + } + m.fiatRatesMtx.Unlock() + } + m.emit(&rm) +} + +func (m *Mesh) ConnectPeer(peerID tanka.PeerID) error { + return m.conn.ConnectPeer(peerID) +} + +func (m *Mesh) Auth(tatankaID tanka.PeerID) error { + return m.conn.Auth(tatankaID) +} + +func (m *Mesh) RequestPeer(peerID tanka.PeerID, msg *msgjson.Message, thing interface{}) (*mj.TankagramResult, error) { + return m.conn.RequestPeer(peerID, msg, thing) +} + +type TatankaCredentials = conn.TatankaCredentials + +// meshConn is our representation of the connection to the mesh network. +type meshConn struct { + *conn.MeshConn + cm *dex.ConnectionMaster +} diff --git a/tatanka/client/mesh/trade.go b/tatanka/client/mesh/trade.go new file mode 100644 index 0000000000..c3a0b6efa7 --- /dev/null +++ b/tatanka/client/mesh/trade.go @@ -0,0 +1,145 @@ +package mesh + +import ( + "encoding/json" + "fmt" + "sync" + "time" + + "decred.org/dcrdex/dex" + "decred.org/dcrdex/dex/fiatrates" + "decred.org/dcrdex/tatanka/mj" + "decred.org/dcrdex/tatanka/tanka" +) + +type order struct { + *tanka.Order + oid tanka.ID32 + proposed map[tanka.ID32]*tanka.Match + accepted map[tanka.ID32]*tanka.Match +} + +type market struct { + log dex.Logger + + ordsMtx sync.RWMutex + ords map[tanka.ID32]*order +} + +func (m *market) addOrder(ord *tanka.Order) { + m.ordsMtx.Lock() + defer m.ordsMtx.Unlock() + oid := ord.ID() + if _, exists := m.ords[oid]; exists { + // ignore it then + return + } + m.ords[oid] = &order{ + Order: ord, + oid: oid, + proposed: make(map[tanka.ID32]*tanka.Match), + accepted: make(map[tanka.ID32]*tanka.Match), + } +} + +func (m *market) addMatchProposal(match *tanka.Match) { + m.ordsMtx.Lock() + defer m.ordsMtx.Unlock() + ord, found := m.ords[match.OrderID] + if !found { + m.log.Debugf("ignoring match proposal for unknown order %s", match.OrderID) + } + // Make sure it's not already known or accepted + mid := match.ID() + if ord.proposed[mid] != nil { + // Already known + return + } + if ord.accepted[mid] != nil { + // Already accepted + return + } + ord.proposed[mid] = match +} + +func (m *market) addMatchAcceptance(match *tanka.Match) { + m.ordsMtx.Lock() + defer m.ordsMtx.Unlock() + ord, found := m.ords[match.OrderID] + if !found { + m.log.Debugf("ignoring match proposal for unknown order %s", match.OrderID) + } + // Make sure it's not already known or accepted + mid := match.ID() + if ord.proposed[mid] != nil { + delete(ord.proposed, mid) + } + if ord.accepted[mid] != nil { + // Already accepted + return + } + ord.accepted[mid] = match +} + +func (m *Mesh) handleMarketBroadcast(bcast *mj.Broadcast) { + mktName := string(bcast.Subject) + m.marketsMtx.RLock() + mkt, found := m.markets[mktName] + m.marketsMtx.RUnlock() + if !found { + m.log.Debugf("received order notification for unknown market %q", mktName) + return + } + switch bcast.MessageType { + case mj.MessageTypeTrollBox: + var troll mj.Troll + if err := json.Unmarshal(bcast.Payload, &troll); err != nil { + m.log.Errorf("error unmarshaling trollbox message: %v", err) + return + } + fmt.Printf("trollbox message for market %s: %s\n", mktName, troll.Msg) + case mj.MessageTypeNewOrder: + var ord tanka.Order + if err := json.Unmarshal(bcast.Payload, &ord); err != nil { + m.log.Errorf("error unmarshaling new order: %v", err) + return + } + mkt.addOrder(&ord) + case mj.MessageTypeProposeMatch: + var match tanka.Match + if err := json.Unmarshal(bcast.Payload, &match); err != nil { + m.log.Errorf("error unmarshaling match proposal: %v", err) + return + } + mkt.addMatchProposal(&match) + case mj.MessageTypeAcceptMatch: + var match tanka.Match + if err := json.Unmarshal(bcast.Payload, &match); err != nil { + m.log.Errorf("error unmarshaling match proposal: %v", err) + return + } + mkt.addMatchAcceptance(&match) + case mj.MessageTypeNewSubscriber: + var ns mj.NewSubscriber + if err := json.Unmarshal(bcast.Payload, &ns); err != nil { + m.log.Errorf("error decoding new_subscriber payload: %v", err) + } + // c.emit(&NewMarketSubscriber{ + // MarketName: mktName, + // PeerID: bcast.PeerID, + // }) + default: + m.log.Errorf("received broadcast on %s -> %s with unknown message type %s", bcast.Topic, bcast.Subject) + } +} + +func (m *Mesh) FiatRate(assetID uint32) float64 { + m.fiatRatesMtx.RLock() + defer m.fiatRatesMtx.RUnlock() + sym := dex.BipIDSymbol(assetID) + rateInfo := m.fiatRates[sym] + if rateInfo != nil && time.Since(rateInfo.LastUpdate) < fiatrates.FiatRateDataExpiry && rateInfo.Value > 0 { + return rateInfo.Value + } + return 0 +} diff --git a/tatanka/client_messages.go b/tatanka/client_messages.go index c44ca80a55..0d7bcc19f8 100644 --- a/tatanka/client_messages.go +++ b/tatanka/client_messages.go @@ -155,10 +155,13 @@ func (t *Tatanka) handleClientMessage(cl tanka.Sender, msg *msgjson.Message) *ms return nil } - if err := mj.CheckSig(msg, c.PubKey); err != nil { - t.log.Errorf("Signature error for %q message from %q: %v", msg.Route, c.ID, err) - return msgjson.NewError(mj.ErrSig, "signature doesn't check") - } + // For wss connections, we'll only check sigs for RouteConnect. + // The TLS & WebSockets protocols collectively ensure that subsequent + // messages are the same client. + // if err := mj.CheckSig(msg, c.PubKey); err != nil { + // t.log.Errorf("Signature error for %q message from %q: %v", msg.Route, c.ID, err) + // return msgjson.NewError(mj.ErrSig, "signature doesn't check") + // } t.clientMtx.RLock() c, found := t.clients[peerID] @@ -278,12 +281,23 @@ func (t *Tatanka) handleSubscription(c *client, msg *msgjson.Message) *msgjson.E return msgjson.NewError(mj.ErrBadRequest, "is this payload a subscription?") } + newSubB, err := json.Marshal(&mj.NewSubscriber{ + PeerID: c.ID, + Topic: sub.Topic, + Subject: sub.Subject, + }) + if err != nil { + t.log.Errorf("error marshaling subscription from %s: %w", c.ID, err) + return msgjson.NewError(mj.ErrInternal, "why didn't the NewSubscriber marshal?") + } + bcast := &mj.Broadcast{ Topic: sub.Topic, Subject: sub.Subject, MessageType: mj.MessageTypeNewSubscriber, PeerID: c.ID, Stamp: time.Now(), + Payload: newSubB, } // Do a helper function here to keep things tidy below. diff --git a/tatanka/cmd/demo/main.go b/tatanka/cmd/demo/main.go index c81e46919e..3153f16fbf 100644 --- a/tatanka/cmd/demo/main.go +++ b/tatanka/cmd/demo/main.go @@ -19,7 +19,8 @@ import ( "decred.org/dcrdex/server/comms" "decred.org/dcrdex/tatanka" "decred.org/dcrdex/tatanka/chain/utxo" - tankaclient "decred.org/dcrdex/tatanka/client" + "decred.org/dcrdex/tatanka/client/conn" + "decred.org/dcrdex/tatanka/client/mesh" "decred.org/dcrdex/tatanka/mj" "decred.org/dcrdex/tatanka/tanka" "decred.org/dcrdex/tatanka/tcp" @@ -126,15 +127,17 @@ func mainErr() (err error) { time.Sleep(time.Second) - cl1, err := newClient(ctx, addrs[0].String(), pid0, 0) + cl1, shutdown1, err := newClient(ctx, addrs[0].String(), pid0, 0) if err != nil { return fmt.Errorf("error making first connected client: %v", err) } + defer shutdown1() - cl2, err := newClient(ctx, addrs[1].String(), pid1, 1) + cl2, shutdown2, err := newClient(ctx, addrs[1].String(), pid1, 1) if err != nil { return fmt.Errorf("error making second connected client: %v", err) } + defer shutdown2() // cm.Disconnect() @@ -152,7 +155,7 @@ func mainErr() (err error) { // Client 1 should receive a notification. select { case bcastI := <-cl1.Next(): - bcast, is := bcastI.(mj.Broadcast) + bcast, is := bcastI.(*mj.Broadcast) if !is { return fmt.Errorf("expected new subscription Broadcast, got %T", bcastI) } @@ -175,7 +178,7 @@ func mainErr() (err error) { select { case bcastI := <-cl1.Next(): - bcast, is := bcastI.(mj.Broadcast) + bcast, is := bcastI.(*mj.Broadcast) if !is { return fmt.Errorf("client 1 expected trollbox Broadcast bounceback, got %T", bcastI) } @@ -189,7 +192,7 @@ func mainErr() (err error) { select { case bcastI := <-cl2.Next(): - bcast, is := bcastI.(mj.Broadcast) + bcast, is := bcastI.(*mj.Broadcast) if !is { return fmt.Errorf("client 2 expected trollbox Broadcast, got %T", bcastI) } @@ -202,13 +205,13 @@ func mainErr() (err error) { } // Connect clients - if _, err := cl1.ConnectPeer(cl2.ID()); err != nil { + if err := cl1.ConnectPeer(cl2.ID()); err != nil { return fmt.Errorf("error connecting peers: %w", err) } select { case newPeerI := <-cl2.Next(): - if _, is := newPeerI.(*tankaclient.IncomingPeerConnect); !is { + if _, is := newPeerI.(*conn.IncomingPeerConnect); !is { return fmt.Errorf("expected IncomingPeerConnect, got %T", newPeerI) } fmt.Printf("Client 2's received the new peer notification: %+v \n", newPeerI) @@ -218,10 +221,11 @@ func mainErr() (err error) { // Send a tankagram const testRoute = "test_route" - respC := make(chan interface{}) + respC := make(chan any) go func() { msg := mj.MustRequest(testRoute, true) - r, resB, err := cl1.SendTankagram(cl2.ID(), msg) + var resp string + r, err := cl1.RequestPeer(cl2.ID(), msg, &resp) if r.Result != mj.TRTTransmitted { respC <- fmt.Errorf("not transmitted. %q", r.Result) return @@ -230,12 +234,12 @@ func mainErr() (err error) { respC <- err return } - respC <- resB + respC <- resp }() select { case gramI := <-cl2.Next(): - gram, is := gramI.(*tankaclient.IncomingTankagram) + gram, is := gramI.(*conn.IncomingTankagram) if !is { return fmt.Errorf("expected IncomingTankagram, got a %T", gramI) } @@ -252,13 +256,9 @@ func mainErr() (err error) { switch resp := respI.(type) { case error: return fmt.Errorf("error sending tankagram: %v", resp) - case json.RawMessage: - var s string - if err := json.Unmarshal(resp, &s); err != nil { - return fmt.Errorf("tankagram response didn't unmarshal: %w", err) - } - if s != "ok" { - return fmt.Errorf("wrong tankagram response %q", s) + case string: + if resp != "ok" { + return fmt.Errorf("wrong tankagram response %q", resp) } } case <-time.After(time.Second): @@ -272,7 +272,19 @@ func mainErr() (err error) { } // Wait for rate message. - <-cl1.Next() + timeout := time.NewTimer(time.Second) +out: + for { + select { + case msgI := <-cl1.Next(): + switch msgI.(type) { + case *mj.RateMessage: + break out + } + case <-timeout.C: + return errors.New("timed out waiting for rate message") + } + } want := len(chains) got := 0 @@ -302,7 +314,7 @@ func mainErr() (err error) { } type connectedClient struct { - *tankaclient.TankaClient + *mesh.Mesh cm *dex.ConnectionMaster } @@ -385,45 +397,54 @@ func newBootNode(addr string, peerID []byte) tatanka.BootNode { } } -func newClient(ctx context.Context, addr string, peerID tanka.PeerID, i int) (*connectedClient, error) { +func newClient(ctx context.Context, addr string, peerID tanka.PeerID, i int) (*connectedClient, context.CancelFunc, error) { log := logMaker.NewLogger(fmt.Sprintf("tCL[%d:%s]", i, addr), dex.LevelTrace) priv, _ := secp256k1.GeneratePrivateKey() - tc := tankaclient.New(&tankaclient.Config{ + dataDir, _ := os.MkdirTemp("", "") + shutdown := func() { + os.RemoveAll(dataDir) + } + + mesh, err := mesh.New(&mesh.Config{ + DataDir: dataDir, Logger: log.SubLogger("tTC"), PrivateKey: priv, + EntryNode: &mesh.TatankaCredentials{ + PeerID: peerID, + Addr: addr, + NoTLS: true, + }, }) - - cm := dex.NewConnectionMaster(tc) - if err := cm.ConnectOnce(ctx); err != nil { - return nil, fmt.Errorf("ConnectOnce error: %w", err) + if err != nil { + return nil, nil, err } - if err := tc.AddTatankaNode(ctx, peerID, "ws://"+addr, nil); err != nil { - cm.Disconnect() - return nil, fmt.Errorf("error adding server %q", addr) + cm := dex.NewConnectionMaster(mesh) + if err := cm.ConnectOnce(ctx); err != nil { + return nil, nil, fmt.Errorf("ConnectOnce error: %w", err) } - if err := tc.PostBond(&tanka.Bond{ - PeerID: tc.ID(), + if err := mesh.PostBond(&tanka.Bond{ + PeerID: mesh.ID(), AssetID: 42, CoinID: nil, Strength: 1, Expiration: time.Now().Add(time.Hour * 24 * 365), }); err != nil { cm.Disconnect() - return nil, fmt.Errorf("PostBond error: %v", err) + return nil, nil, fmt.Errorf("PostBond error: %v", err) } - if err := tc.Auth(peerID); err != nil { + if err := mesh.Auth(peerID); err != nil { cm.Disconnect() - return nil, fmt.Errorf("auth error: %v", err) + return nil, nil, fmt.Errorf("auth error: %v", err) } return &connectedClient{ - TankaClient: tc, - cm: cm, - }, nil + Mesh: mesh, + cm: cm, + }, shutdown, nil } func mustEncode(thing interface{}) json.RawMessage { diff --git a/tatanka/mj/types.go b/tatanka/mj/types.go index 280c37f898..539500399a 100644 --- a/tatanka/mj/types.go +++ b/tatanka/mj/types.go @@ -28,6 +28,7 @@ const ( ErrNoConfig ErrBannned ErrFailedRelay + ErrUnknownSender ) const ( diff --git a/tatanka/tanka/reputation.go b/tatanka/tanka/reputation.go index 6add2bb74c..3e74bcf65c 100644 --- a/tatanka/tanka/reputation.go +++ b/tatanka/tanka/reputation.go @@ -7,6 +7,8 @@ import ( "time" "decred.org/dcrdex/dex" + "decred.org/dcrdex/dex/encode" + "github.com/decred/dcrd/crypto/blake256" ) const ( @@ -27,6 +29,17 @@ type Bond struct { CoinID dex.Bytes `json:"coinID"` Strength uint64 `json:"strength"` Expiration time.Time `json:"expiration"` + // TODO (buck): Switch to Maturation. + Maturation time.Time `json:"maturation"` +} + +func (bond *Bond) ID() ID32 { + buf := make([]byte, PeerIDLength+4 /* asset ID */ +len(bond.CoinID)) + copy(buf[:PeerIDLength], bond.PeerID[:]) + copy(buf[PeerIDLength:PeerIDLength+4], encode.Uint32Bytes(bond.AssetID)) + copy(buf[PeerIDLength+4:], bond.CoinID[:]) + return blake256.Sum256(buf) + } type HTLCAudit struct{}