From 9a1eb3b9e976bffbd4d2f542d7716d94ddd916d4 Mon Sep 17 00:00:00 2001 From: conduition Date: Fri, 8 Dec 2023 00:40:07 +0000 Subject: [PATCH] logically separate mempool sync polling and mutation The existing Mempool::sync logic maintained a mutable reference to the Mempool even though it's not actually using that mutable privilege for the majority of the time that the sync method is running. We separate the fetching the polling and applying processes, so that we can make mempool polling async. --- src/mempool.rs | 129 ++++++++++++++++++++++++++++++++----------------- 1 file changed, 84 insertions(+), 45 deletions(-) diff --git a/src/mempool.rs b/src/mempool.rs index 7ebee4522..ea3c2feda 100644 --- a/src/mempool.rs +++ b/src/mempool.rs @@ -1,4 +1,4 @@ -use anyhow::Result; +use anyhow::{Context, Result}; use std::collections::{BTreeSet, HashMap, HashSet}; use std::convert::TryFrom; @@ -36,6 +36,63 @@ pub(crate) struct Mempool { count: Gauge, } +/// An update to [`Mempool`]'s internal state. This can be fetched +/// asynchronously using [`MempoolSyncUpdate::poll`], and applied +/// using [`Mempool::apply_sync_update`]. +pub(crate) struct MempoolSyncUpdate { + new_entries: Vec, + removed_entries: HashSet, +} + +impl MempoolSyncUpdate { + /// Poll the bitcoin node and compute a [`MempoolSyncUpdate`] based on the given set of + /// `old_txids` which are already cached. + pub fn poll( + daemon: &Daemon, + old_txids: HashSet, + exit_flag: &ExitFlag, + ) -> Result { + let txids = daemon.get_mempool_txids()?; + debug!("loading {} mempool transactions", txids.len()); + + let new_txids = HashSet::::from_iter(txids); + + let to_add = &new_txids - &old_txids; + let to_remove = &old_txids - &new_txids; + + let to_add: Vec = to_add.into_iter().collect(); + let mut new_entries = Vec::with_capacity(to_add.len()); + + for chunk in to_add.chunks(100) { + exit_flag.poll().context("mempool update interrupted")?; + let chunk_entries: Vec = chunk + .par_iter() + .filter_map(|txid| -> Option { + // skip missing mempool entries + let tx = daemon.get_transaction(txid, None).ok()?; + let entry = daemon.get_mempool_entry(txid).ok()?; + + Some(Entry { + txid: *txid, + tx, + vsize: entry.vsize, + fee: entry.fees.base, + has_unconfirmed_inputs: !entry.depends.is_empty(), + }) + }) + .collect(); + + new_entries.extend(chunk_entries); + } + + let update = MempoolSyncUpdate { + new_entries, + removed_entries: to_remove, + }; + Ok(update) + } +} + // Smallest possible txid fn txid_min() -> Txid { Txid::all_zeros() @@ -96,53 +153,17 @@ impl Mempool { .collect() } - pub fn sync(&mut self, daemon: &Daemon, exit_flag: &ExitFlag) { - let txids = match daemon.get_mempool_txids() { - Ok(txids) => txids, - Err(e) => { - warn!("mempool sync failed: {}", e); - return; - } - }; - debug!("loading {} mempool transactions", txids.len()); - - let new_txids = HashSet::::from_iter(txids); - let old_txids = HashSet::::from_iter(self.entries.keys().copied()); - - let to_add = &new_txids - &old_txids; - let to_remove = &old_txids - &new_txids; + /// Apply a [`MempoolSyncUpdate`] to the mempool state. + pub fn apply_sync_update(&mut self, update: MempoolSyncUpdate) { + let removed = update.removed_entries.len(); + let added = update.new_entries.len(); - let removed = to_remove.len(); - for txid in to_remove { - self.remove_entry(txid); + for txid_to_remove in update.removed_entries { + self.remove_entry(txid_to_remove); } - let to_add: Vec = to_add.into_iter().collect(); - let mut added = 0; - for chunk in to_add.chunks(100) { - if exit_flag.poll().is_err() { - info!("interrupted while syncing mempool"); - return; - } - let entries: Vec = chunk - .par_iter() - .filter_map(|txid| { - // skip missing mempool entries - let tx = daemon.get_transaction(txid, None).ok()?; - let entry = daemon.get_mempool_entry(txid).ok()?; - Some(Entry { - txid: *txid, - tx, - vsize: entry.vsize, - fee: entry.fees.base, - has_unconfirmed_inputs: !entry.depends.is_empty(), - }) - }) - .collect(); - added += entries.len(); - for entry in entries { - self.add_entry(entry); - } + for entry in update.new_entries { + self.add_entry(entry); } self.update_metrics(); @@ -165,6 +186,23 @@ impl Mempool { } } + pub fn sync(&mut self, daemon: &Daemon, exit_flag: &ExitFlag) { + let old_txids = HashSet::::from_iter(self.entries.keys().copied()); + + let poll_result = MempoolSyncUpdate::poll(daemon, old_txids, exit_flag); + + let sync_update = match poll_result { + Ok(sync_update) => sync_update, + Err(e) => { + warn!("mempool sync failed: {}", e); + return; + } + }; + + self.apply_sync_update(sync_update); + } + + /// Add a transaction entry to the mempool and update the fee histogram. fn add_entry(&mut self, entry: Entry) { for txi in &entry.tx.input { self.by_spending.insert((txi.previous_output, entry.txid)); @@ -182,6 +220,7 @@ impl Mempool { ); } + /// Remove a transaction entry from the mempool and update the fee histogram. fn remove_entry(&mut self, txid: Txid) { let entry = self.entries.remove(&txid).expect("missing tx from mempool"); for txi in entry.tx.input {