Skip to content

Commit

Permalink
mixclient: Wait for KEs from all attempted sessions
Browse files Browse the repository at this point in the history
This commit modifies the mixing client to consider fully formed sessions
(where all peers have sent a KE with matching sessions), even if other
sessions have already been attempted after.  Previously, only the most
recently attempted session would ever be used, and clients would only
ratchet-down the peers they would mix with.

With this change, initial session disagreement can be recovered from more
gracefully, as some peers will form an alternate session matching the session
that other peers originally tried.

This commit also introduces a fix for a race that could result in root-solving
wallets not publishing their solutions in a timely manner, which would result
in non-solving wallets eventually being blamed for DC timeout.
  • Loading branch information
jrick committed Jan 23, 2025
1 parent 172eeb5 commit d81384a
Show file tree
Hide file tree
Showing 4 changed files with 772 additions and 577 deletions.
82 changes: 44 additions & 38 deletions mixing/mixclient/blame.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"fmt"
"math/big"
"sort"
"time"

"github.com/decred/dcrd/chaincfg/chainhash"
"github.com/decred/dcrd/dcrec/secp256k1/v4"
Expand All @@ -20,6 +21,8 @@ import (
"github.com/decred/dcrd/wire"
)

var errBlameFailed = errors.New("blame failed")

// blamedIdentities identifies detected misbehaving peers.
//
// If a run returns a blamedIdentities error, these peers are immediately
Expand Down Expand Up @@ -48,7 +51,7 @@ func (e blamedIdentities) String() string {
}

func (c *Client) blame(ctx context.Context, sesRun *sessionRun) (err error) {
c.logf("Blaming for sid=%x", sesRun.sid[:])
sesRun.logf("running blame assignment")

mp := c.mixpool
prs := sesRun.prs
Expand All @@ -65,14 +68,11 @@ func (c *Client) blame(ctx context.Context, sesRun *sessionRun) (err error) {
}
}()

err = c.sendLocalPeerMsgs(ctx, sesRun, true, func(p *peer) mixing.Message {
// Send initial secrets messages from any peers who detected
// misbehavior.
if !p.triggeredBlame {
return nil
}
return p.rs
})
deadline := time.Now().Add(timeoutDuration)

// Send initial secrets messages from any peers who detected
// misbehavior.
err = c.sendLocalPeerMsgs(ctx, deadline, sesRun, 0)
if err != nil {
return err
}
Expand All @@ -87,15 +87,17 @@ func (c *Client) blame(ctx context.Context, sesRun *sessionRun) (err error) {
rsHashes = append(rsHashes, rs.Hash())
}

// Send remaining secrets messages.
err = c.sendLocalPeerMsgs(ctx, sesRun, true, func(p *peer) mixing.Message {
if p.triggeredBlame {
p.triggeredBlame = false
return nil
// Send remaining secrets messages with observed RS hashes from the
// initial peers who published secrets.
c.forLocalPeers(ctx, sesRun, func(p *peer) error {
if !p.triggeredBlame {
if p.rs != nil {
p.rs.SeenSecrets = rsHashes
}
}
p.rs.SeenSecrets = rsHashes
return p.rs
return nil
})
err = c.sendLocalPeerMsgs(ctx, deadline, sesRun, msgRS)
if err != nil {
return err
}
Expand All @@ -113,14 +115,14 @@ func (c *Client) blame(ctx context.Context, sesRun *sessionRun) (err error) {
}
if len(rss) != len(sesRun.peers) {
// Blame peers who did not send secrets
c.logf("received %d RSs for %d peers; blaming unresponsive peers",
sesRun.logf("received %d RSs for %d peers; blaming unresponsive peers",
len(rss), len(sesRun.peers))

for _, p := range sesRun.peers {
if p.rs != nil {
continue
}
c.logf("blaming %x for RS timeout", p.id[:])
sesRun.logf("blaming %x for RS timeout", p.id[:])
blamed = append(blamed, *p.id)
}
return blamed
Expand All @@ -142,10 +144,14 @@ func (c *Client) blame(ctx context.Context, sesRun *sessionRun) (err error) {
continue
}
id := &rs.Identity
c.logf("blaming %x for false failure accusation", id[:])
sesRun.logf("blaming %x for false failure accusation", id[:])
blamed = append(blamed, *id)
}
err = blamed
if len(blamed) > 0 {
err = blamed
} else {
err = errBlameFailed
}
}()

defer c.mu.Unlock()
Expand All @@ -159,7 +165,7 @@ func (c *Client) blame(ctx context.Context, sesRun *sessionRun) (err error) {
KELoop:
for _, p := range sesRun.peers {
if p.ke == nil {
c.logf("blaming %x for missing messages", p.id[:])
sesRun.logf("blaming %x for missing messages", p.id[:])
blamed = append(blamed, *p.id)
continue
}
Expand All @@ -169,15 +175,15 @@ KELoop:
cm := p.rs.Commitment(c.blake256Hasher)
c.blake256HasherMu.Unlock()
if cm != p.ke.Commitment {
c.logf("blaming %x for false commitment, got %x want %x",
sesRun.logf("blaming %x for false commitment, got %x want %x",
p.id[:], cm[:], p.ke.Commitment[:])
blamed = append(blamed, *p.id)
continue
}

// Blame peers whose seed is not the correct length (will panic chacha20prng).
if len(p.rs.Seed) != chacha20prng.SeedSize {
c.logf("blaming %x for bad seed size in RS message", p.id[:])
sesRun.logf("blaming %x for bad seed size in RS message", p.id[:])
blamed = append(blamed, *p.id)
continue
}
Expand All @@ -187,7 +193,7 @@ KELoop:
if mixing.InField(scratch.SetBytes(m)) {
continue
}
c.logf("blaming %x for SR message outside field", p.id[:])
sesRun.logf("blaming %x for SR message outside field", p.id[:])
blamed = append(blamed, *p.id)
continue KELoop
}
Expand All @@ -199,7 +205,7 @@ KELoop:
// Recover derived key exchange from PRNG.
p.kx, err = mixing.NewKX(p.prng)
if err != nil {
c.logf("blaming %x for bad KX", p.id[:])
sesRun.logf("blaming %x for bad KX", p.id[:])
blamed = append(blamed, *p.id)
continue
}
Expand All @@ -210,14 +216,14 @@ KELoop:
case !bytes.Equal(p.ke.ECDH[:], p.kx.ECDHPublicKey.SerializeCompressed()):
fallthrough
case !bytes.Equal(p.ke.PQPK[:], p.kx.PQPublicKey[:]):
c.logf("blaming %x for KE public keys not derived from their PRNG",
sesRun.logf("blaming %x for KE public keys not derived from their PRNG",
p.id[:])
blamed = append(blamed, *p.id)
continue KELoop
}
publishedECDHPub, err := secp256k1.ParsePubKey(p.ke.ECDH[:])
if err != nil {
c.logf("blaming %x for unparsable pubkey")
sesRun.logf("blaming %x for unparsable pubkey")
blamed = append(blamed, *p.id)
continue
}
Expand All @@ -229,7 +235,7 @@ KELoop:
start += mcount

if uint32(len(p.rs.SlotReserveMsgs)) != mcount || uint32(len(p.rs.DCNetMsgs)) != mcount {
c.logf("blaming %x for bad message count", p.id[:])
sesRun.logf("blaming %x for bad message count", p.id[:])
blamed = append(blamed, *p.id)
continue
}
Expand Down Expand Up @@ -261,19 +267,19 @@ KELoop:
// from their PRNG.
for i, p := range sesRun.peers {
if p.ct == nil {
c.logf("blaming %x for missing messages", p.id[:])
sesRun.logf("blaming %x for missing messages", p.id[:])
blamed = append(blamed, *p.id)
continue
}
if len(recoveredCTs[i]) != len(p.ct.Ciphertexts) {
c.logf("blaming %x for different ciphertexts count %d != %d",
sesRun.logf("blaming %x for different ciphertexts count %d != %d",
p.id[:], len(recoveredCTs[i]), len(p.ct.Ciphertexts))
blamed = append(blamed, *p.id)
continue
}
for j := range p.ct.Ciphertexts {
if !bytes.Equal(p.ct.Ciphertexts[j][:], recoveredCTs[i][j][:]) {
c.logf("blaming %x for different ciphertexts", p.id[:])
sesRun.logf("blaming %x for different ciphertexts", p.id[:])
blamed = append(blamed, *p.id)
break
}
Expand All @@ -294,7 +300,7 @@ KELoop:
for _, pids := range shared {
if len(pids) > 1 {
for i := range pids {
c.logf("blaming %x for shared SR message", pids[i][:])
sesRun.logf("blaming %x for shared SR message", pids[i][:])
}
blamed = append(blamed, pids...)
}
Expand All @@ -306,7 +312,7 @@ KELoop:
SRLoop:
for i, p := range sesRun.peers {
if p.sr == nil {
c.logf("blaming %x for missing messages", p.id[:])
sesRun.logf("blaming %x for missing messages", p.id[:])
blamed = append(blamed, *p.id)
continue
}
Expand All @@ -325,7 +331,7 @@ SRLoop:
var decapErr *mixing.DecapsulateError
if errors.As(err, &decapErr) {
submittingID := p.id
c.logf("blaming %x for unrecoverable secrets", submittingID[:])
sesRun.logf("blaming %x for unrecoverable secrets", submittingID[:])
blamed = append(blamed, *submittingID)
continue
}
Expand All @@ -343,7 +349,7 @@ SRLoop:
// Blame when committed mix does not match provided.
for k := range srMix {
if srMix[k].Cmp(scratch.SetBytes(p.sr.DCMix[j][k])) != 0 {
c.logf("blaming %x for bad SR mix", p.id[:])
sesRun.logf("blaming %x for bad SR mix", p.id[:])
blamed = append(blamed, *p.id)
continue SRLoop
}
Expand Down Expand Up @@ -376,7 +382,7 @@ DCLoop:
// deferred function) if no peers could be assigned blame is
// not likely to be seen under this situation.
if p.dc == nil {
c.logf("blaming %x for missing messages", p.id[:])
sesRun.logf("blaming %x for missing messages", p.id[:])
blamed = append(blamed, *p.id)
continue
}
Expand All @@ -386,7 +392,7 @@ DCLoop:
// message, and there must be mcount DC-net vectors.
mcount := p.pr.MessageCount
if uint32(len(p.dc.DCNet)) != mcount {
c.logf("blaming %x for missing DC mix vectors", p.id[:])
sesRun.logf("blaming %x for missing DC mix vectors", p.id[:])
blamed = append(blamed, *p.id)
continue
}
Expand All @@ -406,7 +412,7 @@ DCLoop:
// Blame when committed mix does not match provided.
for k := 0; k < len(dcMix); k++ {
if !dcMix.Equals(mixing.Vec(p.dc.DCNet[j])) {
c.logf("blaming %x for bad DC mix", p.id[:])
sesRun.logf("blaming %x for bad DC mix", p.id[:])
blamed = append(blamed, *p.id)
continue DCLoop
}
Expand Down
Loading

0 comments on commit d81384a

Please sign in to comment.