diff --git a/src/mempool.rs b/src/mempool.rs index 822b0b255..1e406d423 100644 --- a/src/mempool.rs +++ b/src/mempool.rs @@ -1,4 +1,4 @@ -use anyhow::Result; +use anyhow::{Context, Result}; use std::collections::{BTreeSet, HashMap, HashSet}; use std::convert::TryFrom; @@ -36,6 +36,63 @@ pub(crate) struct Mempool { count: Gauge, } +/// An update to [`Mempool`]'s internal state. This can be fetched +/// asynchronously using [`MempoolSyncUpdate::poll`], and applied +/// using [`Mempool::apply_sync_update`]. +pub(crate) struct MempoolSyncUpdate { + new_entries: Vec, + removed_entries: HashSet, +} + +impl MempoolSyncUpdate { + /// Poll the bitcoin node and compute a [`MempoolSyncUpdate`] based on the given set of + /// `old_txids` which are already cached. + pub fn poll( + daemon: &Daemon, + old_txids: HashSet, + exit_flag: &ExitFlag, + ) -> Result { + let txids = daemon.get_mempool_txids()?; + debug!("loading {} mempool transactions", txids.len()); + + let new_txids = HashSet::::from_iter(txids); + + let to_add = &new_txids - &old_txids; + let to_remove = &old_txids - &new_txids; + + let to_add: Vec = to_add.into_iter().collect(); + let mut new_entries = Vec::with_capacity(to_add.len()); + + for chunk in to_add.chunks(100) { + exit_flag.poll().context("mempool update interrupted")?; + let chunk_entries: Vec = chunk + .par_iter() + .filter_map(|txid| -> Option { + // skip missing mempool entries + let tx = daemon.get_transaction(txid, None).ok()?; + let entry = daemon.get_mempool_entry(txid).ok()?; + + Some(Entry { + txid: *txid, + tx, + vsize: entry.vsize, + fee: entry.fees.base, + has_unconfirmed_inputs: !entry.depends.is_empty(), + }) + }) + .collect(); + + new_entries.extend(chunk_entries); + } + + let update = MempoolSyncUpdate { + new_entries, + removed_entries: to_remove, + }; + Ok(update) + } +} + // Smallest possible txid fn txid_min() -> Txid { Txid::all_zeros() @@ -96,53 +153,17 @@ impl Mempool { .collect() } - pub fn sync(&mut self, daemon: &Daemon, exit_flag: &ExitFlag) { - let txids = match daemon.get_mempool_txids() { - Ok(txids) => txids, - Err(e) => { - warn!("mempool sync failed: {}", e); - return; - } - }; - debug!("loading {} mempool transactions", txids.len()); - - let new_txids = HashSet::::from_iter(txids); - let old_txids = HashSet::::from_iter(self.entries.keys().copied()); - - let to_add = &new_txids - &old_txids; - let to_remove = &old_txids - &new_txids; + /// Apply a [`MempoolSyncUpdate`] to the mempool state. + pub fn apply_sync_update(&mut self, update: MempoolSyncUpdate) { + let removed = update.removed_entries.len(); + let added = update.new_entries.len(); - let removed = to_remove.len(); - for txid in to_remove { - self.remove_entry(txid); + for txid_to_remove in update.removed_entries { + self.remove_entry(txid_to_remove); } - let to_add: Vec = to_add.into_iter().collect(); - let mut added = 0; - for chunk in to_add.chunks(100) { - if exit_flag.poll().is_err() { - info!("interrupted while syncing mempool"); - return; - } - let entries: Vec = chunk - .par_iter() - .filter_map(|txid| { - // skip missing mempool entries - let tx = daemon.get_transaction(txid, None).ok()?; - let entry = daemon.get_mempool_entry(txid).ok()?; - Some(Entry { - txid: *txid, - tx, - vsize: entry.vsize, - fee: entry.fees.base, - has_unconfirmed_inputs: !entry.depends.is_empty(), - }) - }) - .collect(); - added += entries.len(); - for entry in entries { - self.add_entry(entry); - } + for entry in update.new_entries { + self.add_entry(entry); } self.update_metrics(); @@ -165,6 +186,23 @@ impl Mempool { } } + pub fn sync(&mut self, daemon: &Daemon, exit_flag: &ExitFlag) { + let old_txids = HashSet::::from_iter(self.entries.keys().copied()); + + let poll_result = MempoolSyncUpdate::poll(daemon, old_txids, exit_flag); + + let sync_update = match poll_result { + Ok(sync_update) => sync_update, + Err(e) => { + warn!("mempool sync failed: {}", e); + return; + } + }; + + self.apply_sync_update(sync_update); + } + + /// Add a transaction entry to the mempool and update the fee histogram. fn add_entry(&mut self, entry: Entry) { for txi in &entry.tx.input { self.by_spending.insert((txi.previous_output, entry.txid)); @@ -182,6 +220,7 @@ impl Mempool { ); } + /// Remove a transaction entry from the mempool and update the fee histogram. fn remove_entry(&mut self, txid: Txid) { let entry = self.entries.remove(&txid).expect("missing tx from mempool"); for txi in entry.tx.input {