Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

multi: Refactor wallet.NetworkBackend. #2320

Merged
merged 1 commit into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions chain/backend.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright (c) 2023 The Decred developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.

package chain

import (
"context"

"github.com/decred/dcrd/chaincfg/chainhash"
"github.com/decred/dcrd/dcrutil/v4"
"github.com/decred/dcrd/gcs/v4"
dcrdtypes "github.com/decred/dcrd/rpc/jsonrpc/types/v4"
"github.com/decred/dcrd/txscript/v4/stdaddr"
"github.com/decred/dcrd/wire"
"github.com/jrick/bitset"
)

// Blocks is part of the wallet.NetworkBackend interface.
func (s *Syncer) Blocks(ctx context.Context, blockHashes []*chainhash.Hash) ([]*wire.MsgBlock, error) {
return s.rpc.Blocks(ctx, blockHashes)
}

type filterProof = struct {
Filter *gcs.FilterV2
ProofIndex uint32
Proof []chainhash.Hash
}

// CFiltersV2 is part of the wallet.NetworkBackend interface.
func (s *Syncer) CFiltersV2(ctx context.Context, blockHashes []*chainhash.Hash) ([]filterProof, error) {
return s.rpc.CFiltersV2(ctx, blockHashes)
}

// PublishTransactions is part of the wallet.NetworkBackend interface.
func (s *Syncer) PublishTransactions(ctx context.Context, txs ...*wire.MsgTx) error {
return s.rpc.PublishTransactions(ctx, txs...)
}

// LoadTxFilter is part of the wallet.NetworkBackend interface.
func (s *Syncer) LoadTxFilter(ctx context.Context, reload bool, addrs []stdaddr.Address, outpoints []wire.OutPoint) error {
return s.rpc.LoadTxFilter(ctx, reload, addrs, outpoints)
}

// Rescan is part of the wallet.NetworkBackend interface.
func (s *Syncer) Rescan(ctx context.Context, blocks []chainhash.Hash, save func(block *chainhash.Hash, txs []*wire.MsgTx) error) error {
return s.rpc.Rescan(ctx, blocks, save)
}

// StakeDifficulty is part of the wallet.NetworkBackend interface.
func (s *Syncer) StakeDifficulty(ctx context.Context) (dcrutil.Amount, error) {
return s.rpc.StakeDifficulty(ctx)
}

// Deployments fulfills the DeploymentQuerier interface.
func (s *Syncer) Deployments(ctx context.Context) (map[string]dcrdtypes.AgendaInfo, error) {
info, err := s.rpc.GetBlockchainInfo(ctx)
if err != nil {
return nil, err
}
return info.Deployments, nil
}

// GetTxOut fulfills the LiveTicketQuerier interface.
func (s *Syncer) GetTxOut(ctx context.Context, txHash *chainhash.Hash, index uint32, tree int8, includeMempool bool) (*dcrdtypes.GetTxOutResult, error) {
return s.rpc.GetTxOut(ctx, txHash, index, tree, includeMempool)
}

// GetConfirmationHeight fulfills the LiveTicketQuerier interface.
func (s *Syncer) GetConfirmationHeight(ctx context.Context, txHash *chainhash.Hash) (int32, error) {
return s.rpc.GetConfirmationHeight(ctx, txHash)
}

// ExistsLiveTickets fulfills the LiveTicketQuerier interface.
func (s *Syncer) ExistsLiveTickets(ctx context.Context, tickets []*chainhash.Hash) (bitset.Bytes, error) {
return s.rpc.ExistsLiveTickets(ctx, tickets)
}

// UsedAddresses fulfills the usedAddressesQuerier interface.
func (s *Syncer) UsedAddresses(ctx context.Context, addrs []stdaddr.Address) (bitset.Bytes, error) {
return s.rpc.UsedAddresses(ctx, addrs)
}
59 changes: 50 additions & 9 deletions chain/sync.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2017-2020 The Decred developers
// Copyright (c) 2017-2023 The Decred developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.

Expand Down Expand Up @@ -30,7 +30,8 @@ var requiredAPIVersion = semver{Major: 8, Minor: 0, Patch: 0}
// Syncer implements wallet synchronization services by processing
// notifications from a dcrd JSON-RPC server.
type Syncer struct {
atomicWalletSynced atomic.Uint32 // CAS (synced=1) when wallet syncing complete
atomicWalletSynced atomic.Uint32 // CAS (synced=1) when wallet syncing complete
atomicTargetSyncHeight atomic.Int32

wallet *wallet.Wallet
opts *RPCOptions
Expand Down Expand Up @@ -94,6 +95,11 @@ func (s *Syncer) SetCallbacks(cb *Callbacks) {
s.cb = cb
}

// RPC returns the JSON-RPC client to the underlying dcrd node.
func (s *Syncer) RPC() *dcrd.RPC {
return s.rpc
}

// DisableDiscoverAccounts disables account discovery. This has an effect only
// if called before the main Run() executes the account discovery process.
func (s *Syncer) DisableDiscoverAccounts() {
Expand All @@ -102,6 +108,19 @@ func (s *Syncer) DisableDiscoverAccounts() {
s.mu.Unlock()
}

// Synced returns whether the syncer has completed syncing to the backend and
// the target height it is attempting to sync to.
func (s *Syncer) Synced(ctx context.Context) (bool, int32) {
synced := s.atomicWalletSynced.Load() == 1
var targetHeight int32
if !synced {
targetHeight = s.atomicTargetSyncHeight.Load()
} else {
_, targetHeight = s.wallet.MainChainTip(ctx)
}
return synced, targetHeight
}

// synced checks the atomic that controls wallet syncness and if previously
// unsynced, updates to synced and notifies the callback, if set.
func (s *Syncer) synced() {
Expand All @@ -111,6 +130,15 @@ func (s *Syncer) synced() {
}
}

// unsynced checks the atomic that controls wallet syncness and if previously
// synced, updates to unsynced and notifies the callback, if set.
func (s *Syncer) unsynced() {
swapped := s.atomicWalletSynced.CompareAndSwap(1, 0)
if swapped && s.cb != nil && s.cb.Synced != nil {
s.cb.Synced(false)
}
}

func (s *Syncer) fetchMissingCfiltersStart() {
if s.cb != nil && s.cb.FetchMissingCFiltersStarted != nil {
s.cb.FetchMissingCFiltersStarted()
Expand Down Expand Up @@ -185,12 +213,24 @@ func (s *Syncer) getHeaders(ctx context.Context) error {
return err
}

startedSynced := s.atomicWalletSynced.Load() == 1

cnet := s.wallet.ChainParams().Net
s.fetchHeadersStart()
for {
if err := ctx.Err(); err != nil {
return err
}

// If unsynced, update the target sync height.
if !startedSynced {
info, err := s.rpc.GetBlockchainInfo(ctx)
if err != nil {
return err
}
s.atomicTargetSyncHeight.Store(int32(info.Headers))
}

headers, err := s.rpc.Headers(ctx, locators, &hashStop)
if err != nil {
return err
Expand Down Expand Up @@ -312,15 +352,15 @@ func (s *Syncer) getHeaders(ctx context.Context) error {
discoverAccts := s.discoverAccts
s.mu.Unlock()
s.discoverAddressesStart()
err = s.wallet.DiscoverActiveAddresses(ctx, s.rpc, rescanPoint, discoverAccts, s.wallet.GapLimit())
err = s.wallet.DiscoverActiveAddresses(ctx, s, rescanPoint, discoverAccts, s.wallet.GapLimit())
if err != nil {
return err
}
s.discoverAddressesFinished()
s.mu.Lock()
s.discoverAccts = false
s.mu.Unlock()
err = s.wallet.LoadActiveDataFilters(ctx, s.rpc, true)
err = s.wallet.LoadActiveDataFilters(ctx, s, true)
if err != nil {
return err
}
Expand All @@ -331,7 +371,7 @@ func (s *Syncer) getHeaders(ctx context.Context) error {
return err
}
progress := make(chan wallet.RescanProgress, 1)
go s.wallet.RescanProgressFromHeight(ctx, s.rpc, int32(rescanBlock.Height), progress)
go s.wallet.RescanProgressFromHeight(ctx, s, int32(rescanBlock.Height), progress)

for p := range progress {
if p.Err != nil {
Expand All @@ -342,7 +382,7 @@ func (s *Syncer) getHeaders(ctx context.Context) error {
s.rescanFinished()

} else {
err = s.wallet.LoadActiveDataFilters(ctx, s.rpc, true)
err = s.wallet.LoadActiveDataFilters(ctx, s, true)
if err != nil {
return err
}
Expand Down Expand Up @@ -454,7 +494,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
}

// Associate the RPC client with the wallet and remove the association on return.
s.wallet.SetNetworkBackend(s.rpc)
s.wallet.SetNetworkBackend(s)
defer s.wallet.SetNetworkBackend(nil)

tipHash, tipHeight := s.wallet.MainChainTip(ctx)
Expand Down Expand Up @@ -489,7 +529,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
// Fetch any missing main chain compact filters.
s.fetchMissingCfiltersStart()
progress := make(chan wallet.MissingCFilterProgress, 1)
go s.wallet.FetchMissingCFiltersWithProgress(ctx, s.rpc, progress)
go s.wallet.FetchMissingCFiltersWithProgress(ctx, s, progress)
for p := range progress {
if p.Err != nil {
return p.Err
Expand All @@ -503,6 +543,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
if err != nil {
return err
}
defer s.unsynced()

// Request notifications for connected and disconnected blocks.
err = s.rpc.Call(ctx, "notifyblocks", nil)
Expand All @@ -517,7 +558,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
}

// Rebroadcast unmined transactions
err = s.wallet.PublishUnminedTransactions(ctx, s.rpc)
err = s.wallet.PublishUnminedTransactions(ctx, s)
if err != nil {
// Returning this error would end and (likely) restart sync in
// an endless loop. It's possible a transaction should be
Expand Down
29 changes: 16 additions & 13 deletions deployments/deployments.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"context"

"decred.org/dcrwallet/v4/errors"
"decred.org/dcrwallet/v4/rpc/client/dcrd"
"github.com/decred/dcrd/chaincfg/v3"
dcrdtypes "github.com/decred/dcrd/rpc/jsonrpc/types/v4"
"github.com/decred/dcrd/wire"
Expand Down Expand Up @@ -78,12 +77,20 @@ const (
activeStatus = dcrdtypes.AgendaInfoStatusActive
)

// Querier defines the interface for a chain backend that can (trustfully)
// query for agenda deployment information.
type Querier interface {
// Deployments should return information about existing agendas,
// including their deployment status.
Deployments(context.Context) (map[string]dcrdtypes.AgendaInfo, error)
}

// DCP0010Active returns whether the consensus rules for the next block with the
// current chain tip height requires the subsidy split as specified in DCP0010.
// DCP0010 is always active on simnet, and requires the RPC syncer to detect
// activation on mainnet and testnet3.
func DCP0010Active(ctx context.Context, height int32, params *chaincfg.Params,
syncer any) (bool, error) {
querier Querier) (bool, error) {

net := params.Net
rcai := int32(params.RuleChangeActivationInterval)
Expand All @@ -94,16 +101,14 @@ func DCP0010Active(ctx context.Context, height int32, params *chaincfg.Params,
if net != wire.MainNet && net != wire.TestNet3 {
return false, nil
}
rpc, ok := syncer.(*dcrd.RPC)
if !ok {
if querier == nil {
return false, errors.E(errors.Bug, "DCP0010 activation check requires RPC syncer")
}
var resp dcrdtypes.GetBlockChainInfoResult
err := rpc.Call(ctx, "getblockchaininfo", &resp)
deployments, err := querier.Deployments(ctx)
if err != nil {
return false, err
}
d, ok := resp.Deployments[chaincfg.VoteIDChangeSubsidySplit]
d, ok := deployments[chaincfg.VoteIDChangeSubsidySplit]
if !ok {
return false, nil
}
Expand All @@ -122,24 +127,22 @@ func DCP0010Active(ctx context.Context, height int32, params *chaincfg.Params,
// DCP0012. DCP0012 requires the RPC syncer to detect activation on mainnet,
// testnet3 and simnet.
func DCP0012Active(ctx context.Context, height int32, params *chaincfg.Params,
syncer any) (bool, error) {
querier Querier) (bool, error) {

net := params.Net
rcai := int32(params.RuleChangeActivationInterval)

if net != wire.MainNet && net != wire.TestNet3 && net != wire.SimNet {
return false, nil
}
rpc, ok := syncer.(*dcrd.RPC)
if !ok {
if querier == nil {
return false, errors.E(errors.Bug, "DCP0012 activation check requires RPC syncer")
}
var resp dcrdtypes.GetBlockChainInfoResult
err := rpc.Call(ctx, "getblockchaininfo", &resp)
deployments, err := querier.Deployments(ctx)
if err != nil {
return false, err
}
d, ok := resp.Deployments[chaincfg.VoteIDChangeSubsidySplitR2]
d, ok := deployments[chaincfg.VoteIDChangeSubsidySplitR2]
if !ok {
return false, nil
}
Expand Down
Loading
Loading