diff --git a/internal/netsync/manager.go b/internal/netsync/manager.go index 3f0911668..2b3504101 100644 --- a/internal/netsync/manager.go +++ b/internal/netsync/manager.go @@ -338,7 +338,7 @@ type SyncManager struct { rejectedTxns *apbf.Filter rejectedMixMsgs *apbf.Filter requestedTxns map[chainhash.Hash]struct{} - requestedBlocks map[chainhash.Hash]struct{} + requestedBlocks map[chainhash.Hash]*Peer requestedMixMsgs map[chainhash.Hash]struct{} progressLogger *progresslog.Logger syncPeer *Peer @@ -424,6 +424,16 @@ func (m *SyncManager) maybeUpdateNextNeededBlocks() { } } +// isRequestedBlock returns whether or not the given block hash has already been +// requested from any remote peer. +// +// This function is NOT safe for concurrent access. It must be called from the +// event handler goroutine. +func (m *SyncManager) isRequestedBlock(hash *chainhash.Hash) bool { + _, ok := m.requestedBlocks[*hash] + return ok +} + // fetchNextBlocks creates and sends a request to the provided peer for the next // blocks to be downloaded based on the current headers. func (m *SyncManager) fetchNextBlocks(peer *Peer) { @@ -458,12 +468,12 @@ func (m *SyncManager) fetchNextBlocks(peer *Peer) { // Skip blocks that have already been requested. The needed blocks // might have been updated above thereby potentially repopulating some // blocks that are still in flight. - if _, ok := m.requestedBlocks[*hash]; ok { + if m.isRequestedBlock(hash) { continue } iv := wire.NewInvVect(wire.InvTypeBlock, hash) - m.requestedBlocks[*hash] = struct{}{} + m.requestedBlocks[*hash] = peer peer.requestedBlocks[*hash] = struct{}{} gdmsg.AddInvVect(iv) } @@ -1454,14 +1464,19 @@ func (m *SyncManager) handleHeadersMsg(hmsg *headersMsg) { // Skip the block when it has already been requested or is otherwise // already known. hash := &headerHashes[i] - _, isRequestedBlock := m.requestedBlocks[*hash] - if isRequestedBlock || chain.HaveBlock(hash) { + if m.isRequestedBlock(hash) || chain.HaveBlock(hash) { continue } + // Stop requesting when the request would exceed the max size of the + // map used to track requests. + if len(m.requestedBlocks)+1 > maxRequestedBlocks { + break + } + + m.requestedBlocks[*hash] = peer + peer.requestedBlocks[*hash] = struct{}{} iv := wire.NewInvVect(wire.InvTypeBlock, hash) - limitAdd(m.requestedBlocks, *hash, maxRequestedBlocks) - limitAdd(peer.requestedBlocks, *hash, maxRequestedBlocks) gdmsg.AddInvVect(iv) } if len(gdmsg.InvList) > 0 { @@ -1920,12 +1935,9 @@ func (m *SyncManager) requestFromPeer(peer *Peer, blocks, voteHashes, // Add the blocks to the request. msgResp := wire.NewMsgGetData() for i := range blocks { - // If we've already requested this block, skip it. + // Skip the block when it has already been requested. bh := &blocks[i] - _, alreadyReqP := peer.requestedBlocks[*bh] - _, alreadyReqB := m.requestedBlocks[*bh] - - if alreadyReqP || alreadyReqB { + if m.isRequestedBlock(bh) { continue } @@ -1942,7 +1954,7 @@ func (m *SyncManager) requestFromPeer(peer *Peer, blocks, voteHashes, } peer.requestedBlocks[*bh] = struct{}{} - m.requestedBlocks[*bh] = struct{}{} + m.requestedBlocks[*bh] = peer } addTxsToRequest := func(txs []chainhash.Hash, txType stake.TxType) error { @@ -2169,7 +2181,7 @@ func New(config *Config) *SyncManager { rejectedTxns: apbf.NewFilter(maxRejectedTxns, rejectedTxnsFPRate), rejectedMixMsgs: apbf.NewFilter(maxRejectedMixMsgs, rejectedMixMsgsFPRate), requestedTxns: make(map[chainhash.Hash]struct{}), - requestedBlocks: make(map[chainhash.Hash]struct{}), + requestedBlocks: make(map[chainhash.Hash]*Peer), requestedMixMsgs: make(map[chainhash.Hash]struct{}), peers: make(map[*Peer]struct{}), minKnownWork: minKnownWork,