Skip to content

Commit

Permalink
*review* fix synced semantics
Browse files Browse the repository at this point in the history
  • Loading branch information
matheusd committed Jan 22, 2024
1 parent 0954ba9 commit a918500
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 16 deletions.
32 changes: 24 additions & 8 deletions chain/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,11 @@ func (s *Syncer) DisableDiscoverAccounts() {
// the target height it is attempting to sync to.
func (s *Syncer) Synced(ctx context.Context) (bool, int32) {
synced := s.atomicWalletSynced.Load() == 1
targetHeight := s.atomicTargetSyncHeight.Load()
_, tipHeight := s.wallet.MainChainTip(ctx)
if tipHeight > targetHeight {
targetHeight = tipHeight
var targetHeight int32
if !synced {
targetHeight = s.atomicTargetSyncHeight.Load()
} else {
_, targetHeight = s.wallet.MainChainTip(ctx)
}
return synced, targetHeight
}
Expand All @@ -129,6 +130,15 @@ func (s *Syncer) synced() {
}
}

// unsynced checks the atomic that controls wallet syncness and if previously
// synced, updates to unsynced and notifies the callback, if set.
func (s *Syncer) unsynced() {
swapped := s.atomicWalletSynced.CompareAndSwap(1, 0)
if swapped && s.cb != nil && s.cb.Synced != nil {
s.cb.Synced(false)
}
}

func (s *Syncer) fetchMissingCfiltersStart() {
if s.cb != nil && s.cb.FetchMissingCFiltersStarted != nil {
s.cb.FetchMissingCFiltersStarted()
Expand Down Expand Up @@ -203,18 +213,23 @@ func (s *Syncer) getHeaders(ctx context.Context) error {
return err
}

startedSynced := s.atomicWalletSynced.Load() == 1

cnet := s.wallet.ChainParams().Net
s.fetchHeadersStart()
for {
if err := ctx.Err(); err != nil {
return err
}

info, err := s.rpc.GetBlockchainInfo(ctx)
if err != nil {
return err
// If unsynced, update the target sync height.
if !startedSynced {
info, err := s.rpc.GetBlockchainInfo(ctx)
if err != nil {
return err
}
s.atomicTargetSyncHeight.Store(int32(info.Headers))
}
s.atomicTargetSyncHeight.Store(int32(info.Headers))

headers, err := s.rpc.Headers(ctx, locators, &hashStop)
if err != nil {
Expand Down Expand Up @@ -528,6 +543,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
if err != nil {
return err
}
defer s.unsynced()

// Request notifications for connected and disconnected blocks.
err = s.rpc.Call(ctx, "notifyblocks", nil)
Expand Down
34 changes: 26 additions & 8 deletions spv/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,16 +156,31 @@ func (s *Syncer) synced() {
}
}

// Synced returns whether this wallet is completely synced to the network.
// unsynced checks the atomic that controls wallet syncness and if previously
// synced, updates to unsynced and notifies the callback, if set.
func (s *Syncer) unsynced() {
if s.atomicWalletSynced.CompareAndSwap(1, 0) &&
s.notifications != nil &&
s.notifications.Synced != nil {
s.notifications.Synced(false)
}
}

// Synced returns whether this wallet is completely synced to the network and
// the target height it is attempting to sync to.
func (s *Syncer) Synced(ctx context.Context) (bool, int32) {
synced := s.atomicWalletSynced.Load() == 1
_, targetHeight := s.wallet.MainChainTip(ctx)
s.forRemotes(func(rp *p2p.RemotePeer) error {
if rp.InitialHeight() > targetHeight {
targetHeight = rp.InitialHeight()
}
return nil
})
var targetHeight int32
if !synced {
s.forRemotes(func(rp *p2p.RemotePeer) error {
if rp.InitialHeight() > targetHeight {
targetHeight = rp.InitialHeight()
}
return nil
})
} else {
_, targetHeight = s.wallet.MainChainTip(ctx)
}

return synced, targetHeight
}
Expand Down Expand Up @@ -462,6 +477,9 @@ func (s *Syncer) connectAndRunPeer(ctx context.Context, raddr string) {
n = len(s.remotes)
s.remotesMu.Unlock()
s.peerDisconnected(n, raddr)
if n == 0 {
s.unsynced()
}
}()

// Perform peer startup.
Expand Down

0 comments on commit a918500

Please sign in to comment.