Skip to content

Commit

Permalink
Don't shift deadlines by 30s in following runs; the current run may h…
Browse files Browse the repository at this point in the history
…ave ran longer than 30s already
  • Loading branch information
jrick committed Dec 9, 2024
1 parent 102ff72 commit 70e58d7
Showing 1 changed file with 10 additions and 18 deletions.
28 changes: 10 additions & 18 deletions mixing/mixclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ type Wallet interface {
}

type deadlines struct {
epoch time.Time
recvKE time.Time
recvCT time.Time
recvSR time.Time
Expand All @@ -157,14 +156,6 @@ func (d *deadlines) start(begin time.Time) {
d.recvCM = add()
}

func (d *deadlines) shift() {
d.recvKE = d.recvCT
d.recvCT = d.recvSR
d.recvSR = d.recvDC
d.recvDC = d.recvCM
d.recvCM = d.recvCM.Add(timeoutDuration)
}

// peerRunState describes the peer state that changes across different
// sessions/runs.
type peerRunState struct {
Expand Down Expand Up @@ -277,6 +268,7 @@ type pairedSessions struct {
pairing []byte
runs []sessionRun

epoch time.Time
deadlines

// Track state of pairing completion.
Expand Down Expand Up @@ -955,7 +947,7 @@ func (c *Client) pairSession(ctx context.Context, ps *pairedSessions, prs []*wir
c.mu.Unlock()
}()

ps.deadlines.epoch = epoch
ps.epoch = epoch
ps.deadlines.start(epoch)

sid := mixing.SortPRsForSession(prs, unixEpoch)
Expand Down Expand Up @@ -1041,7 +1033,7 @@ func (c *Client) pairSession(ctx context.Context, ps *pairedSessions, prs []*wir
// instead of just going away.
if altses.err != nil {
r.logf("Unable to recreate session: %v", altses.err)
ps.deadlines.shift()
ps.deadlines.start(time.Now())
continue
}

Expand Down Expand Up @@ -1122,7 +1114,7 @@ func (c *Client) pairSession(ctx context.Context, ps *pairedSessions, prs []*wir
ps.runs = append(ps.runs, *rerun)
newRun = &ps.runs[len(ps.runs)-1]
}
ps.deadlines.shift()
ps.deadlines.start(time.Now())
if requirePeerAgreement {
ps.peerAgreementRunIdx = len(ps.runs) - 1
}
Expand Down Expand Up @@ -1225,7 +1217,7 @@ func (c *Client) run(ctx context.Context, ps *pairedSessions) (sesRun *sessionRu
prs := sesRun.prs

d := &ps.deadlines
unixEpoch := uint64(d.epoch.Unix())
unixEpoch := uint64(ps.epoch.Unix())

// A map of identity public keys to their PR position sort all
// messages in the same order as the PRs are ordered.
Expand Down Expand Up @@ -1358,7 +1350,7 @@ func (c *Client) run(ctx context.Context, ps *pairedSessions) (sesRun *sessionRu
// epoch. The session forming will be performed by a new
// goroutine started by the epoch ticker, possibly with
// additional PRs.
nextEpoch := d.epoch.Add(c.epoch)
nextEpoch := ps.epoch.Add(c.epoch)
if time.Now().Add(timeoutDuration).After(nextEpoch) {
c.logf("Aborting session %x after %d attempts",
sesRun.sid[:], len(ps.runs))
Expand All @@ -1368,7 +1360,7 @@ func (c *Client) run(ctx context.Context, ps *pairedSessions) (sesRun *sessionRu
// If peer agreement was never established, alternate sessions
// based on the seen PRs must be formed.
if !ps.peerAgreement {
return sesRun, c.alternateSession(ps.pairing, sesRun.prs, d)
return sesRun, c.alternateSession(ps, sesRun.prs)
}

return sesRun, err
Expand Down Expand Up @@ -2092,10 +2084,10 @@ func (e *alternateSession) Unwrap() error {
return e.err
}

func (c *Client) alternateSession(pairing []byte, prs []*wire.MsgMixPairReq, d *deadlines) *alternateSession {
unixEpoch := uint64(d.epoch.Unix())
func (c *Client) alternateSession(ps *pairedSessions, prs []*wire.MsgMixPairReq) *alternateSession {
unixEpoch := uint64(ps.epoch.Unix())

kes := c.mixpool.ReceiveKEsByPairing(pairing, unixEpoch)
kes := c.mixpool.ReceiveKEsByPairing(ps.pairing, unixEpoch)

// Sort KEs by identity first (just to group these together) followed
// by the total referenced PR counts in increasing order (most recent
Expand Down

0 comments on commit 70e58d7

Please sign in to comment.