Skip to content

Commit

Permalink
logically separate mempool sync polling and mutation
Browse files Browse the repository at this point in the history
The existing Mempool::sync logic maintained a mutable reference
to the Mempool even though it's not actually using that
mutable privilege for the majority of the time that the sync
method is running. We separate the fetching the polling and
applying processes, so that we can make mempool polling async.
  • Loading branch information
conduition committed Dec 10, 2023
1 parent ed42680 commit 9a1eb3b
Showing 1 changed file with 84 additions and 45 deletions.
129 changes: 84 additions & 45 deletions src/mempool.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use anyhow::Result;
use anyhow::{Context, Result};

use std::collections::{BTreeSet, HashMap, HashSet};
use std::convert::TryFrom;
Expand Down Expand Up @@ -36,6 +36,63 @@ pub(crate) struct Mempool {
count: Gauge,
}

/// An update to [`Mempool`]'s internal state. This can be fetched
/// asynchronously using [`MempoolSyncUpdate::poll`], and applied
/// using [`Mempool::apply_sync_update`].
pub(crate) struct MempoolSyncUpdate {
new_entries: Vec<Entry>,
removed_entries: HashSet<Txid>,
}

impl MempoolSyncUpdate {
/// Poll the bitcoin node and compute a [`MempoolSyncUpdate`] based on the given set of
/// `old_txids` which are already cached.
pub fn poll(
daemon: &Daemon,
old_txids: HashSet<Txid>,
exit_flag: &ExitFlag,
) -> Result<MempoolSyncUpdate> {
let txids = daemon.get_mempool_txids()?;
debug!("loading {} mempool transactions", txids.len());

let new_txids = HashSet::<Txid>::from_iter(txids);

let to_add = &new_txids - &old_txids;
let to_remove = &old_txids - &new_txids;

let to_add: Vec<Txid> = to_add.into_iter().collect();
let mut new_entries = Vec::with_capacity(to_add.len());

for chunk in to_add.chunks(100) {
exit_flag.poll().context("mempool update interrupted")?;
let chunk_entries: Vec<Entry> = chunk
.par_iter()
.filter_map(|txid| -> Option<Entry> {
// skip missing mempool entries
let tx = daemon.get_transaction(txid, None).ok()?;
let entry = daemon.get_mempool_entry(txid).ok()?;

Some(Entry {
txid: *txid,
tx,
vsize: entry.vsize,
fee: entry.fees.base,
has_unconfirmed_inputs: !entry.depends.is_empty(),
})
})
.collect();

new_entries.extend(chunk_entries);
}

let update = MempoolSyncUpdate {
new_entries,
removed_entries: to_remove,
};
Ok(update)
}
}

// Smallest possible txid
fn txid_min() -> Txid {
Txid::all_zeros()
Expand Down Expand Up @@ -96,53 +153,17 @@ impl Mempool {
.collect()
}

pub fn sync(&mut self, daemon: &Daemon, exit_flag: &ExitFlag) {
let txids = match daemon.get_mempool_txids() {
Ok(txids) => txids,
Err(e) => {
warn!("mempool sync failed: {}", e);
return;
}
};
debug!("loading {} mempool transactions", txids.len());

let new_txids = HashSet::<Txid>::from_iter(txids);
let old_txids = HashSet::<Txid>::from_iter(self.entries.keys().copied());

let to_add = &new_txids - &old_txids;
let to_remove = &old_txids - &new_txids;
/// Apply a [`MempoolSyncUpdate`] to the mempool state.
pub fn apply_sync_update(&mut self, update: MempoolSyncUpdate) {
let removed = update.removed_entries.len();
let added = update.new_entries.len();

let removed = to_remove.len();
for txid in to_remove {
self.remove_entry(txid);
for txid_to_remove in update.removed_entries {
self.remove_entry(txid_to_remove);
}
let to_add: Vec<Txid> = to_add.into_iter().collect();
let mut added = 0;
for chunk in to_add.chunks(100) {
if exit_flag.poll().is_err() {
info!("interrupted while syncing mempool");
return;
}
let entries: Vec<Entry> = chunk
.par_iter()
.filter_map(|txid| {
// skip missing mempool entries
let tx = daemon.get_transaction(txid, None).ok()?;
let entry = daemon.get_mempool_entry(txid).ok()?;

Some(Entry {
txid: *txid,
tx,
vsize: entry.vsize,
fee: entry.fees.base,
has_unconfirmed_inputs: !entry.depends.is_empty(),
})
})
.collect();
added += entries.len();
for entry in entries {
self.add_entry(entry);
}
for entry in update.new_entries {
self.add_entry(entry);
}

self.update_metrics();
Expand All @@ -165,6 +186,23 @@ impl Mempool {
}
}

pub fn sync(&mut self, daemon: &Daemon, exit_flag: &ExitFlag) {
let old_txids = HashSet::<Txid>::from_iter(self.entries.keys().copied());

let poll_result = MempoolSyncUpdate::poll(daemon, old_txids, exit_flag);

let sync_update = match poll_result {
Ok(sync_update) => sync_update,
Err(e) => {
warn!("mempool sync failed: {}", e);
return;
}
};

self.apply_sync_update(sync_update);
}

/// Add a transaction entry to the mempool and update the fee histogram.
fn add_entry(&mut self, entry: Entry) {
for txi in &entry.tx.input {
self.by_spending.insert((txi.previous_output, entry.txid));
Expand All @@ -182,6 +220,7 @@ impl Mempool {
);
}

/// Remove a transaction entry from the mempool and update the fee histogram.
fn remove_entry(&mut self, txid: Txid) {
let entry = self.entries.remove(&txid).expect("missing tx from mempool");
for txi in entry.tx.input {
Expand Down

0 comments on commit 9a1eb3b

Please sign in to comment.