Skip to content

Commit

Permalink
btc/ltc: Do not use chain client notifications. (crypto-power#491)
Browse files Browse the repository at this point in the history
Notification from the chain client are only meant for one consumer, and
that consumer is btcwallet. Instead receive all notifications from
btcwallet.
  • Loading branch information
JoeGruffins authored Apr 29, 2024
1 parent c15cd49 commit a247b2f
Show file tree
Hide file tree
Showing 6 changed files with 154 additions and 427 deletions.
36 changes: 5 additions & 31 deletions libwallet/assets/btc/rescan.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"decred.org/dcrwallet/v3/errors"
"github.com/btcsuite/btcd/btcutil"
"github.com/btcsuite/btcwallet/chain"
"github.com/btcsuite/btcwallet/waddrmgr"
w "github.com/btcsuite/btcwallet/wallet"
"github.com/btcsuite/btcwallet/walletdb"
Expand Down Expand Up @@ -333,25 +332,25 @@ func (asset *Asset) getBirthdayBlock() (int32, bool, error) {
return birthdayblock, isverified, err
}

func (asset *Asset) updateRescanProgress(progress *chain.RescanProgress) {
func (asset *Asset) updateRescanProgress(height int32) {
if asset.syncData.rescanStartHeight == nil {
asset.syncData.rescanStartHeight = &progress.Height
asset.syncData.rescanStartHeight = &height
}

headersFetchedSoFar := float64(progress.Height - *asset.syncData.rescanStartHeight)
headersFetchedSoFar := float64(height - *asset.syncData.rescanStartHeight)
if headersFetchedSoFar < 1 {
headersFetchedSoFar = 1
}

remainingHeaders := float64(asset.GetBestBlockHeight() - progress.Height)
remainingHeaders := float64(asset.GetBestBlockHeight() - height)
if remainingHeaders < 1 {
remainingHeaders = 1
}

allHeadersToFetch := headersFetchedSoFar + remainingHeaders

rescanProgressReport := &sharedW.HeadersRescanProgressReport{
CurrentRescanHeight: progress.Height,
CurrentRescanHeight: height,
TotalHeadersToScan: int32(allHeadersToFetch),
WalletID: asset.ID,
}
Expand Down Expand Up @@ -390,28 +389,3 @@ func (asset *Asset) getblockStamp(height int32) (*waddrmgr.BlockStamp, error) {
Timestamp: block.Header.Timestamp,
}, nil
}

// updateSyncedToBlock is used to update syncedTo block. Sometimes btcwallet might
// miss the trigger event to update syncedTo block so the update is done here
// regardless thus avoid handling the possible scenario where btcwallet might miss
// the syncedto store trigger event.
func (asset *Asset) updateSyncedToBlock(height int32) {
// Ignore blocks notifications received during the wallet recovery phase.
if !asset.IsSynced() || asset.IsRescanning() {
return
}

err := walletdb.Update(asset.Internal().BTC.Database(), func(dbtx walletdb.ReadWriteTx) error {
addrmgrNs := dbtx.ReadWriteBucket(wAddrMgrBkt)

bs, err := asset.getblockStamp(height)
if err != nil {
return err
}

return asset.Internal().BTC.Manager.SetSyncedTo(addrmgrNs, bs)
})
if err != nil {
log.Errorf("updating syncedTo block failed: Error: %v", err)
}
}
184 changes: 72 additions & 112 deletions libwallet/assets/btc/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@ import (
)

const (
// syncIntervalGap defines the interval at which to publish and log progress
// without unnecessarily spamming the receiver.
syncIntervalGap = time.Second * 3

// start helps to synchronously execute compare-and-swap operation when
// initiating the notifications handler.
start uint32 = 1
Expand All @@ -34,7 +30,6 @@ type SyncData struct {

bestBlockheight int32 // Synced peers best block height.
syncstarted uint32
txlistening uint32
chainServiceStopped bool

syncing bool
Expand Down Expand Up @@ -222,114 +217,81 @@ func (asset *Asset) handleSyncUIUpdate() {
}

func (asset *Asset) handleNotifications() {
t := time.NewTicker(syncIntervalGap)
notes := asset.Internal().BTC.NtfnServer.TransactionNotifications()
defer notes.Done()

notificationsLoop:
for {
select {
case n, ok := <-asset.chainClient.Notifications():
if !ok {
continue notificationsLoop
}

switch n := n.(type) {
case chain.ClientConnected:
// Notification type sent is sent when the client connects or reconnects
// to the RPC server. It initialize the sync progress data report.

case chain.BlockConnected:
// Notification type is sent when a new block connects to the longest chain.
// Trigger the progress report only when the block to be reported
// is the best chaintip.

asset.updateSyncedToBlock(n.Height)

select {
case <-t.C:
// log sync progress always
asset.updateSyncProgress(n.Block.Height)
default:
}

case chain.BlockDisconnected:
select {
case <-t.C:
if !asset.IsSynced() {
// initial sync is inprogress.
asset.updateSyncProgress(n.Height)
} else {
// initial sync is complete
asset.publishBlockAttached(n.Height)
}
default:
case n := <-notes.C:
for _, block := range n.AttachedBlocks {
// When syncing historical data no tx are available.
// Txs are reported only when chain is synced and newly mined tx
// we discovered in the latest block.
for _, tx := range block.Transactions {
log.Debugf("(%v) Incoming mined tx with hash=%v block=%v",
asset.GetWalletName(), tx.Hash, block.Height)

// Publish the confirmed tx notification.
asset.publishTransactionConfirmed(tx.Hash.String(), block.Height)
}

case chain.FilteredBlockConnected:
// if relevants txs were detected. Atempt to send them first
for _, tx := range n.RelevantTxs {
asset.publishTransactionConfirmed(tx.Hash.String(), n.Block.Height)
}
asset.publishBlockAttached(block.Height)
}

// Update the progress at the interval of syncIntervalGap.
select {
case <-t.C:
asset.updateSyncProgress(n.Block.Height)
default:
}
txToCache := make([]*sharedW.Transaction, len(n.UnminedTransactions))

case *chain.RescanProgress:
// Notifications sent at interval of 10k blocks
asset.updateRescanProgress(n)
// handle txs hitting the mempool.
for i, tx := range n.UnminedTransactions {
log.Debugf("(%v) Incoming unmined tx with hash (%v)",
asset.GetWalletName(), tx.Hash.String())

case *chain.RescanFinished:
asset.syncData.mu.Lock()
// Address recovery is complete.
asset.syncData.isRescan = false
asset.syncData.mu.Unlock()
// decodeTxs
txToCache[i] = asset.decodeTransactionWithTxSummary(sharedW.UnminedTxHeight, tx)

// Notification type is sent when the rescan is completed.
asset.updateSyncProgress(n.Height)
asset.publishHeadersFetchComplete()

// once initial scan is complete reset the ticket to track every
// new block or transaction detected.
t.Reset(1 * time.Second)

// Only run the listener once the chain is synced and ready to listen
// for newly mined block. This prevents unnecessary CPU use spikes
// on startup when a wallet is syncing from scratch.
go asset.listenForTransactions()

// Since the initial run on a restored wallet, address discovery
// is complete, mark discovered accounts as true.
if asset.IsRestored && !asset.ContainsDiscoveredAccounts() {
// Update the assets birthday from genesis block to a date closer
// to when the privatekey was first used.
asset.updateAssetBirthday()
asset.MarkWalletAsDiscoveredAccounts()
}

asset.syncData.mu.Lock()
asset.syncData.isRescan = false
asset.syncData.mu.Unlock()

if asset.blocksRescanProgressListener != nil {
asset.blocksRescanProgressListener.OnBlocksRescanEnded(asset.ID, nil)
}
// publish mempool tx.
asset.mempoolTransactionNotification(txToCache[i])
}

asset.updateSyncedToBlock(n.Height)
if len(n.UnminedTransactions) > 0 {
// Since the tx cache receives a fresh update only when a new
// block is detected, update cache with the newly received mempool tx(s).
asset.txs.mu.Lock()
asset.txs.unminedTxs = append(txToCache, asset.txs.unminedTxs...)
asset.txs.mu.Unlock()
}

case <-asset.syncCtx.Done():
break notificationsLoop
}
}

// stop the ticker timer.
t.Stop()
// Signal that handleNotifications can be safely started next time its needed.
atomic.StoreUint32(&asset.syncData.syncstarted, stop)
}

func (asset *Asset) rescanFinished(height int32) {
// Notification type is sent when the rescan is completed.
asset.updateSyncProgress(height)
asset.publishHeadersFetchComplete()

// Since the initial run on a restored wallet, address discovery
// is complete, mark discovered accounts as true.
if asset.IsRestored && !asset.ContainsDiscoveredAccounts() {
// Update the assets birthday from genesis block to a date closer
// to when the privatekey was first used.
asset.updateAssetBirthday()
asset.MarkWalletAsDiscoveredAccounts()
}

asset.syncData.mu.Lock()
asset.syncData.isRescan = false
asset.syncData.mu.Unlock()

if asset.blocksRescanProgressListener != nil {
asset.blocksRescanProgressListener.OnBlocksRescanEnded(asset.ID, nil)
}
}

// prepareChain sets up the chain service and the chain source
func (asset *Asset) prepareChain() error {
exists, err := asset.WalletExists()
Expand Down Expand Up @@ -501,26 +463,15 @@ func (asset *Asset) startSync() error {
return err
}

log.Infof("Synchronizing wallet (%s) with network...", asset.GetWalletName())
// Initializes the goroutines handling chain notifications, rescan progress and handlers.
asset.Internal().BTC.SynchronizeRPC(asset.chainClient)

select {
// Wait for 5 seconds so that all goroutines initialized in SynchronizeRPC()
// can startup successfully. To be specific, btcwallet's handleChainNotifications()
// should have completed setting up by the time asset.handleNotifications() starts up.
// This 5 seconds delay is arbitrary chosen, and if found inadequate in future,
// it could be increased.
case <-time.After(time.Second * 5):
case <-asset.syncCtx.Done():
return nil
}

// Listen and handle incoming notification events.
if atomic.CompareAndSwapUint32(&asset.syncData.syncstarted, stop, start) {
go asset.handleNotifications()
}

log.Infof("Synchronizing wallet (%s) with network...", asset.GetWalletName())
// Initializes the goroutines handling chain notifications, rescan progress and handlers.
asset.Internal().BTC.SynchronizeRPC(asset.chainClient)

return nil
}

Expand Down Expand Up @@ -557,7 +508,16 @@ func (asset *Asset) waitForSyncCompletion() {
for {
select {
case <-t.C:
block, err := asset.chainClient.CS.BestBlock()
if err != nil {
log.Error("GetBestBlock hash for BTC failed, Err: ", err)
}
asset.updateSyncProgress(block.Height)
asset.updateRescanProgress(block.Height)

if asset.chainClient.IsCurrent() {
asset.rescanFinished(block.Height)

asset.syncData.mu.Lock()
asset.syncData.synced = true
asset.syncData.syncing = false
Expand Down Expand Up @@ -594,15 +554,15 @@ func (asset *Asset) SpvSync() (err error) {
asset.cancelSync = cancel
asset.notificationListenersMu.Unlock()

// Set wallet synced state to true when chainclient considers itself
// as synced with the network.
go asset.waitForSyncCompletion()

asset.syncData.mu.Lock()
asset.syncData.syncing = true
asset.syncData.synced = false
asset.syncData.mu.Unlock()

// Set wallet synced state to true when chainclient considers itself
// as synced with the network.
go asset.waitForSyncCompletion()

for _, listener := range asset.syncData.syncProgressListeners {
if listener.OnSyncStarted != nil {
listener.OnSyncStarted()
Expand Down
Loading

0 comments on commit a247b2f

Please sign in to comment.