Skip to content

Commit

Permalink
work around ticket purchase bug
Browse files Browse the repository at this point in the history
  • Loading branch information
buck54321 committed Nov 15, 2023
1 parent f1ec281 commit cb1bb14
Show file tree
Hide file tree
Showing 21 changed files with 470 additions and 137 deletions.
139 changes: 129 additions & 10 deletions client/asset/dcr/dcr.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,12 @@ type ExchangeWallet struct {
connected atomic.Bool

subsidyCache *blockchain.SubsidyCache

ticketBuyer struct {
running atomic.Bool
remaining atomic.Int32
unconfirmedTickets map[chainhash.Hash]struct{}
}
}

func (dcr *ExchangeWallet) config() *exchangeWalletConfig {
Expand Down Expand Up @@ -5019,6 +5025,7 @@ func (dcr *ExchangeWallet) StakeStatus() (*asset.TicketStakingStatus, error) {
TicketCount: sinfo.OwnMempoolTix + sinfo.Unspent + sinfo.Immature + sinfo.Voted + sinfo.Revoked,
Votes: sinfo.Voted,
Revokes: sinfo.Revoked,
Queued: uint32(dcr.ticketBuyer.remaining.Load()),
},
}, nil
}
Expand Down Expand Up @@ -5119,28 +5126,137 @@ func (dcr *ExchangeWallet) SetVSP(url string) error {

// PurchaseTickets purchases n number of tickets. Part of the asset.TicketBuyer
// interface.
func (dcr *ExchangeWallet) PurchaseTickets(n int, feeSuggestion uint64) ([]*asset.Ticket, error) {
func (dcr *ExchangeWallet) PurchaseTickets(n int, feeSuggestion uint64) error {
if n < 1 {
return nil, nil
return nil
}
if !dcr.connected.Load() {
return nil, errors.New("not connected, login first")
return errors.New("not connected, login first")
}
// I think we need to set this, otherwise we probably end up with default
// of DefaultRelayFeePerKb = 1e4 => 10 atoms/byte.
feePerKB := dcrutil.Amount(dcr.feeRateWithFallback(feeSuggestion) * 1000)
if err := dcr.wallet.SetTxFee(dcr.ctx, feePerKB); err != nil {
return nil, fmt.Errorf("error setting wallet tx fee: %w", err)
return fmt.Errorf("error setting wallet tx fee: %w", err)
}

remain := dcr.ticketBuyer.remaining.Add(int32(n))
dcr.emit.Data(ticketDataRoute, &TicketPurchaseUpdate{Remaining: uint32(remain)})
go dcr.runTicketBuyer()

return nil
}

const ticketDataRoute = "ticketPurchaseUpdate"

// TicketPurchaseUpdate is an update from the asynchronous ticket purchasing
// loop.
type TicketPurchaseUpdate struct {
Err string `json:"err,omitempty"`
Remaining uint32 `json:"remaining"`
Tickets []*asset.Ticket `json:"tickets"`
}

func (dcr *ExchangeWallet) runTicketBuyer() {
tb := &dcr.ticketBuyer
if !tb.running.CompareAndSwap(false, true) {
// already running
return
}
defer tb.running.Store(false)
var ok bool
defer func() {
if !ok {
tb.remaining.Store(0)
}
}()

if tb.unconfirmedTickets == nil {
tb.unconfirmedTickets = make(map[chainhash.Hash]struct{})
}

remain := tb.remaining.Load()
if remain < 1 {
return
}

var unconf int
for txHash := range tb.unconfirmedTickets {
tx, err := dcr.wallet.GetTransaction(dcr.ctx, &txHash)
if err != nil {
dcr.log.Errorf("GetTransaction error ticket tx %s: %v", txHash, err)
dcr.emit.Data(ticketDataRoute, &TicketPurchaseUpdate{Err: err.Error()})
return
}
if tx.Confirmations > 0 {
delete(tb.unconfirmedTickets, txHash)
} else {
unconf++
}
}
if unconf > 0 {
ok = true
dcr.log.Tracef("Skipping ticket purchase attempt because there are still %d unconfirmed tickets", unconf)
}

dcr.log.Tracef("Attempting to purchase %d tickets", remain)

bal, err := dcr.Balance()
if err != nil {
dcr.log.Errorf("GetBalance error: %v", err)
dcr.emit.Data(ticketDataRoute, &TicketPurchaseUpdate{Err: err.Error()})
return
}
sinfo, err := dcr.wallet.StakeInfo(dcr.ctx)
if err != nil {
dcr.log.Errorf("StakeInfo error: %v", err)
dcr.emit.Data(ticketDataRoute, &TicketPurchaseUpdate{Err: err.Error()})
return
}
if dcrutil.Amount(bal.Available) < sinfo.Sdiff*dcrutil.Amount(remain) {
dcr.log.Errorf("Insufficient balance %s to purches %d ticket at price %s: %v", dcrutil.Amount(bal.Available), remain, sinfo.Sdiff)
dcr.emit.Data(ticketDataRoute, &TicketPurchaseUpdate{Err: "insufficient balance"})
return
}

var tickets []*asset.Ticket
if !dcr.isNative() {
return dcr.wallet.PurchaseTickets(dcr.ctx, n, "", "")
tickets, err = dcr.wallet.PurchaseTickets(dcr.ctx, int(remain), "", "")
} else {
v := dcr.vspV.Load()
if v == nil {
err = errors.New("no vsp set")
} else {
vInfo := v.(*vsp)
tickets, err = dcr.wallet.PurchaseTickets(dcr.ctx, int(remain), vInfo.URL, vInfo.PubKey)
}
}
v := dcr.vspV.Load()
if v == nil {
return nil, errors.New("no vsp set")
if err != nil {
dcr.log.Errorf("PurchaseTickets error: %v", err)
dcr.emit.Data(ticketDataRoute, &TicketPurchaseUpdate{Err: err.Error()})
return
}
vInfo := v.(*vsp)
return dcr.wallet.PurchaseTickets(dcr.ctx, n, vInfo.URL, vInfo.PubKey)
purchased := int32(len(tickets))
remain = tb.remaining.Add(-purchased)
// sanity check
if remain < 0 {
remain = 0
tb.remaining.Store(remain)
}
dcr.emit.Data(ticketDataRoute, &TicketPurchaseUpdate{
Tickets: tickets,
Remaining: uint32(remain),
})
for _, ticket := range tickets {
txHash, err := chainhash.NewHashFromStr(ticket.Tx.Hash)
if err != nil {
dcr.log.Errorf("NewHashFromStr error for ticket hash %s: %v", ticket.Tx.Hash, err)
dcr.emit.Data(ticketDataRoute, &TicketPurchaseUpdate{Err: err.Error()})
return
}
tb.unconfirmedTickets[*txHash] = struct{}{}
}
ok = true
}

// SetVotingPreferences sets the vote choices for all active tickets and future
Expand Down Expand Up @@ -5334,11 +5450,14 @@ func (dcr *ExchangeWallet) emitTipChange(height int64) {
TicketCount: sinfo.OwnMempoolTix + sinfo.Unspent + sinfo.Immature + sinfo.Voted + sinfo.Revoked,
Votes: sinfo.Voted,
Revokes: sinfo.Revoked,
Queued: uint32(dcr.ticketBuyer.remaining.Load()),
},
VotingSubsidy: dcr.voteSubsidy(height),
}
}

dcr.emit.TipChange(uint64(height), data)
dcr.runTicketBuyer()
}

// monitorBlocks pings for new blocks and runs the tipChange callback function
Expand Down
160 changes: 156 additions & 4 deletions client/asset/dcr/dcr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"sort"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -145,7 +146,7 @@ func tNewWalletMonitorBlocks(monitorBlocks bool) (*ExchangeWallet, *tRPCClient,
client := newTRPCClient()
log := tLogger.SubLogger("trpc")
walletCfg := &asset.WalletConfig{
Emit: asset.NewWalletEmitter(make(chan asset.WalletNotification, 128), BipID, log),
Emit: asset.NewWalletEmitter(client.emitC, BipID, log),
PeersChange: func(uint32, error) {},
}
walletCtx, shutdown := context.WithCancel(tCtx)
Expand Down Expand Up @@ -217,6 +218,11 @@ type tRPCClient struct {
lockedCoins []*wire.OutPoint // Last submitted to LockUnspent
listLockedErr error
estFeeErr error
emitC chan asset.WalletNotification
// tickets
purchasedTickets [][]*chainhash.Hash
purchaseTicketsErr error
stakeInfo walletjson.GetStakeInfoResult
}

type wireTxWithHeight struct {
Expand Down Expand Up @@ -330,6 +336,7 @@ func newTRPCClient() *tRPCClient {
signFunc: defaultSignFunc,
rawRes: make(map[string]json.RawMessage),
rawErr: make(map[string]error),
emitC: make(chan asset.WalletNotification, 128),
}
}

Expand Down Expand Up @@ -520,13 +527,23 @@ func (c *tRPCClient) Disconnected() bool {
}

func (c *tRPCClient) GetStakeInfo(ctx context.Context) (*walletjson.GetStakeInfoResult, error) {
return &walletjson.GetStakeInfoResult{}, nil
return &c.stakeInfo, nil
}

func (c *tRPCClient) PurchaseTicket(ctx context.Context, fromAccount string, spendLimit dcrutil.Amount, minConf *int,
ticketAddress stdaddr.Address, numTickets *int, poolAddress stdaddr.Address, poolFees *dcrutil.Amount,
expiry *int, ticketChange *bool, ticketFee *dcrutil.Amount) ([]*chainhash.Hash, error) {
return nil, nil
expiry *int, ticketChange *bool, ticketFee *dcrutil.Amount) (tix []*chainhash.Hash, _ error) {

if c.purchaseTicketsErr != nil {
return nil, c.purchaseTicketsErr
}

if len(c.purchasedTickets) > 0 {
tix = c.purchasedTickets[0]
c.purchasedTickets = c.purchasedTickets[1:]
}

return tix, nil
}

func (c *tRPCClient) GetTickets(ctx context.Context, includeImmature bool) ([]*chainhash.Hash, error) {
Expand Down Expand Up @@ -4380,3 +4397,138 @@ func TestConfirmRedemption(t *testing.T) {
}
}
}

func TestPurchaseTickets(t *testing.T) {
wallet, cl, shutdown := tNewWalletMonitorBlocks(false)
defer shutdown()
wallet.connected.Store(true)
cl.stakeInfo.Difficulty = dcrutil.Amount(1).ToCoin()
cl.balanceResult = &walletjson.GetBalanceResult{Balances: []walletjson.GetAccountBalanceResult{{AccountName: tAcctName}}}
setBalance := func(avail uint64, reserves uint64) {
cl.balanceResult.Balances[0].Spendable = dcrutil.Amount(avail).ToCoin()
wallet.bondReserves.Store(reserves)
}

var blocksToConfirm atomic.Int64
cl.walletTxFn = func() (*walletjson.GetTransactionResult, error) {
txHex, _ := makeTxHex(nil, []dex.Bytes{randBytes(25)})
var confs int64 = 1
if blocksToConfirm.Load() > 0 {
confs = 0
}
return &walletjson.GetTransactionResult{Hex: txHex, Confirmations: confs}, nil
}

var remains []uint32
checkRemains := func(exp ...uint32) {
t.Helper()
if len(remains) != len(exp) {
t.Fatalf("wrong number of remains, wanted %d, got %+v", len(exp), remains)
}
for i := 0; i < len(remains); i++ {
if remains[i] != exp[i] {
t.Fatalf("wrong remains updates: wanted %+v, got %+v", exp, remains)
}
}
}

waitForTicketLoopToExit := func() {
// Ensure the loop closes
timeout := time.After(time.Second)
for {
if !wallet.ticketBuyer.running.Load() {
break
}
select {
case <-time.After(time.Millisecond):
return
case <-timeout:
t.Fatalf("ticket loop didn't exit")
}
}
}

buyTickets := func(n int, wantErr bool) {
defer waitForTicketLoopToExit()
remains = make([]uint32, 0)
if err := wallet.PurchaseTickets(n, 100); err != nil {
t.Fatalf("initial PurchaseTickets error: %v", err)
}

var emitted int
timeout := time.After(time.Second)
out:
for {
var ni asset.WalletNotification
select {
case ni = <-cl.emitC:
case <-timeout:
t.Fatalf("timed out looking for ticket updates")
default:
blocksToConfirm.Add(-1)
wallet.runTicketBuyer()
continue
}
switch nt := ni.(type) {
case *asset.CustomWalletNote:
switch n := nt.Payload.(type) {
case *TicketPurchaseUpdate:
remains = append(remains, n.Remaining)
if n.Err != "" {
if wantErr {
return
}
t.Fatalf("Error received in TicketPurchaseUpdate: %s", n.Err)
}
if n.Remaining == 0 {
break out
}
emitted++
}

}
}
}

tixHashes := func(n int) []*chainhash.Hash {
hs := make([]*chainhash.Hash, n)
for i := 0; i < n; i++ {
var ticketHash chainhash.Hash
copy(ticketHash[:], randBytes(32))
hs[i] = &ticketHash
}
return hs
}

// Single ticket purchased right away.
cl.purchasedTickets = [][]*chainhash.Hash{tixHashes(1)}
setBalance(1, 0)
buyTickets(1, false)
checkRemains(1, 0)

// Multiple tickets purchased right away.
cl.purchasedTickets = [][]*chainhash.Hash{tixHashes(2)}
setBalance(2, 0)
buyTickets(2, false)
checkRemains(2, 0)

// Two tickets, purchased in two tries, skipping some tries for unconfirmed
// tickets.
blocksToConfirm.Store(3)
cl.purchasedTickets = [][]*chainhash.Hash{tixHashes(1), tixHashes(1)}
buyTickets(2, false)
checkRemains(2, 1, 0)

// (Wallet).PurchaseTickets error
cl.purchasedTickets = [][]*chainhash.Hash{tixHashes(4)}
cl.purchaseTicketsErr = errors.New("test error")
setBalance(4, 0)
buyTickets(4, true)
checkRemains(4, 0)

// Low-balance error
cl.purchasedTickets = [][]*chainhash.Hash{tixHashes(1)}
setBalance(1, 1) // reserves make our available balance 0
buyTickets(1, true)
checkRemains(1, 0)
}
Loading

0 comments on commit cb1bb14

Please sign in to comment.