Skip to content

Commit

Permalink
Merge #1180: Save transactions in database
Browse files Browse the repository at this point in the history
bf1e90e sqlite: merge two migration tests (jp1ac4)
e883675 sqlite: add a unit test for migration between v4 and v5 (Antoine Poinsot)
af5fddf commands: use database for TxGetter (jp1ac4)
a86d12d commands: get wallet transactions from db (jp1ac4)
afa6a51 poller: save transactions in database (jp1ac4)
ba4c819 sqlite: separate DB migration from constructor (Antoine Poinsot)
50e7ffa lib: setup the connection to bitcoind before the connection to SQLite. (Antoine Poinsot)

Pull request description:

  This is the first step of #56 (comment).

  The poller will now save transactions in our own database. These transactions are selected based on the deposit and spend transactions of coins. Only the txid and transaction itself are saved, with the corresponding block height and time taken from the coins table.

  In a couple of follow-up commits, I've replaced some RPC calls to bitcoind with DB queries.

ACKs for top commit:
  darosior:
    re-ACK bf1e90e

Tree-SHA512: a1d0a6381efe307655b94a3ff257c58e4d921e98a7fa79e5c9f80016c19df761b10266d4122cb290b78424c5e2acefc163683fcfc948950e3c838e39ba31ba57
  • Loading branch information
darosior committed Aug 2, 2024
2 parents b947968 + bf1e90e commit 90df14c
Show file tree
Hide file tree
Showing 8 changed files with 1,039 additions and 316 deletions.
36 changes: 36 additions & 0 deletions src/bitcoin/poller/looper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,40 @@ fn update_coins(
}
}

// Add new deposit and spend transactions to the database.
fn add_txs_to_db(
bit: &impl BitcoinInterface,
db_conn: &mut Box<dyn DatabaseConnection>,
updated_coins: &UpdatedCoins,
) {
let curr_txids: HashSet<_> = db_conn.list_saved_txids().into_iter().collect();
let mut new_txids = HashSet::new();
// First get all newly received coins that have not expired.
new_txids.extend(updated_coins.received.iter().filter_map(|c| {
if !updated_coins.expired.contains(&c.outpoint) {
Some(c.outpoint.txid)
} else {
None
}
}));

// Add spend txid for new & existing coins.
new_txids.extend(updated_coins.spending.iter().map(|(_, txid)| txid));

// Remove those txids we already have.
let missing_txids = new_txids.difference(&curr_txids);
log::debug!("Missing txids: {:?}", missing_txids);

// Now retrieve txs.
let txs: Vec<_> = missing_txids
.map(|txid| bit.wallet_transaction(txid).map(|(tx, _)| tx))
.collect::<Option<Vec<_>>>()
.expect("we must retrieve all txs");
if !txs.is_empty() {
db_conn.new_txs(&txs);
}
}

#[derive(Debug, Clone, Copy)]
enum TipUpdate {
// The best block is still the same as in the previous poll.
Expand Down Expand Up @@ -233,6 +267,8 @@ fn updates(
return updates(db_conn, bit, descs, secp);
}

// Transactions must be added to the DB before coins due to foreign key constraints.
add_txs_to_db(bit, db_conn, &updated_coins);
// The chain tip did not change since we started our updates. Record them and the latest tip.
// Having the tip in database means that, as far as the chain is concerned, we've got all
// updates up to this block. But not more.
Expand Down
150 changes: 77 additions & 73 deletions src/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,27 +155,33 @@ impl fmt::Display for RbfErrorInfo {
}
}

/// A wallet transaction getter which fetches the transaction from our Bitcoin backend with a cache
/// A wallet transaction getter which fetches the transaction from our database backend with a cache
/// to avoid needless redundant calls. Note the cache holds an Option<> so we also avoid redundant
/// calls when the txid isn't known by our Bitcoin backend.
struct BitcoindTxGetter<'a> {
bitcoind: &'a sync::Arc<sync::Mutex<dyn BitcoinInterface>>,
/// calls when the txid isn't known by our database backend.
struct DbTxGetter<'a> {
db: &'a sync::Arc<sync::Mutex<dyn DatabaseInterface>>,
cache: HashMap<bitcoin::Txid, Option<bitcoin::Transaction>>,
}

impl<'a> BitcoindTxGetter<'a> {
pub fn new(bitcoind: &'a sync::Arc<sync::Mutex<dyn BitcoinInterface>>) -> Self {
impl<'a> DbTxGetter<'a> {
pub fn new(db: &'a sync::Arc<sync::Mutex<dyn DatabaseInterface>>) -> Self {
Self {
bitcoind,
db,
cache: HashMap::new(),
}
}
}

impl<'a> TxGetter for BitcoindTxGetter<'a> {
impl<'a> TxGetter for DbTxGetter<'a> {
fn get_tx(&mut self, txid: &bitcoin::Txid) -> Option<bitcoin::Transaction> {
if let hash_map::Entry::Vacant(entry) = self.cache.entry(*txid) {
entry.insert(self.bitcoind.wallet_transaction(txid).map(|wtx| wtx.0));
let tx = self
.db
.connection()
.list_wallet_transactions(&[*txid])
.pop()
.map(|(tx, _, _)| tx);
entry.insert(tx);
}
self.cache.get(txid).cloned().flatten()
}
Expand Down Expand Up @@ -457,7 +463,7 @@ impl DaemonControl {
return Err(CommandError::InvalidFeerate(feerate_vb));
}
let mut db_conn = self.db.connection();
let mut tx_getter = BitcoindTxGetter::new(&self.bitcoin);
let mut tx_getter = DbTxGetter::new(&self.db);

// Prepare the destination addresses.
let mut destinations_checked = Vec::with_capacity(destinations.len());
Expand Down Expand Up @@ -754,7 +760,7 @@ impl DaemonControl {
feerate_vb: Option<u64>,
) -> Result<CreateSpendResult, CommandError> {
let mut db_conn = self.db.connection();
let mut tx_getter = BitcoindTxGetter::new(&self.bitcoin);
let mut tx_getter = DbTxGetter::new(&self.db);

if is_cancel && feerate_vb.is_some() {
return Err(CommandError::RbfError(RbfErrorInfo::SuperfluousFeerate));
Expand Down Expand Up @@ -1015,39 +1021,19 @@ impl DaemonControl {
limit: u64,
) -> ListTransactionsResult {
let mut db_conn = self.db.connection();
// Note the result could in principle be retrieved in a single database query.
let txids = db_conn.list_txids(start, end, limit);
let transactions = txids
.iter()
.filter_map(|txid| {
// TODO: batch those calls to the Bitcoin backend
// so it can in turn optimize its queries.
self.bitcoin
.wallet_transaction(txid)
.map(|(tx, block)| TransactionInfo {
tx,
height: block.map(|b| b.height),
time: block.map(|b| b.time),
})
})
.collect();
ListTransactionsResult { transactions }
self.list_transactions(&txids)
}

/// list_transactions retrieves the transactions with the given txids.
pub fn list_transactions(&self, txids: &[bitcoin::Txid]) -> ListTransactionsResult {
let transactions = txids
.iter()
.filter_map(|txid| {
// TODO: batch those calls to the Bitcoin backend
// so it can in turn optimize its queries.
self.bitcoin
.wallet_transaction(txid)
.map(|(tx, block)| TransactionInfo {
tx,
height: block.map(|b| b.height),
time: block.map(|b| b.time),
})
})
let transactions = self
.db
.connection()
.list_wallet_transactions(txids)
.into_iter()
.map(|(tx, height, time)| TransactionInfo { tx, height, time })
.collect();
ListTransactionsResult { transactions }
}
Expand All @@ -1068,7 +1054,7 @@ impl DaemonControl {
if feerate_vb < 1 {
return Err(CommandError::InvalidFeerate(feerate_vb));
}
let mut tx_getter = BitcoindTxGetter::new(&self.bitcoin);
let mut tx_getter = DbTxGetter::new(&self.db);
let mut db_conn = self.db.connection();
let sweep_addr = self.spend_addr(&mut db_conn, self.validate_address(address)?);

Expand Down Expand Up @@ -1422,25 +1408,17 @@ mod tests {

#[test]
fn create_spend() {
let dummy_op = bitcoin::OutPoint::from_str(
"3753a1d74c0af8dd0a0f3b763c14faf3bd9ed03cbdf33337a074fb0e9f6c7810:0",
)
.unwrap();
let mut dummy_bitcoind = DummyBitcoind::new();
dummy_bitcoind.txs.insert(
dummy_op.txid,
(
bitcoin::Transaction {
version: TxVersion::TWO,
lock_time: absolute::LockTime::Blocks(absolute::Height::ZERO),
input: vec![],
output: vec![],
},
None,
),
);
let ms = DummyLiana::new(dummy_bitcoind, DummyDatabase::new());
let dummy_tx = bitcoin::Transaction {
version: TxVersion::TWO,
lock_time: absolute::LockTime::Blocks(absolute::Height::ZERO),
input: vec![],
output: vec![],
};
let dummy_op = bitcoin::OutPoint::new(dummy_tx.txid(), 0);
let ms = DummyLiana::new(DummyBitcoind::new(), DummyDatabase::new());
let control = &ms.control();
let mut db_conn = control.db().lock().unwrap().connection();
db_conn.new_txs(&[dummy_tx]);

// Arguments sanity checking
let dummy_addr =
Expand Down Expand Up @@ -1471,7 +1449,6 @@ mod tests {
control.create_spend(&destinations, &[dummy_op], 1, None),
Err(CommandError::UnknownOutpoint(dummy_op))
);
let mut db_conn = control.db().lock().unwrap().connection();
db_conn.new_unspent_coins(&[Coin {
outpoint: dummy_op,
is_immature: false,
Expand Down Expand Up @@ -2305,8 +2282,8 @@ mod tests {
},
]);

let mut btc = DummyBitcoind::new();
btc.txs.insert(
let mut txs_map = HashMap::new();
txs_map.insert(
deposit1.txid(),
(
deposit1.clone(),
Expand All @@ -2320,7 +2297,7 @@ mod tests {
}),
),
);
btc.txs.insert(
txs_map.insert(
deposit2.txid(),
(
deposit2.clone(),
Expand All @@ -2334,7 +2311,7 @@ mod tests {
}),
),
);
btc.txs.insert(
txs_map.insert(
spend_tx.txid(),
(
spend_tx.clone(),
Expand All @@ -2348,7 +2325,7 @@ mod tests {
}),
),
);
btc.txs.insert(
txs_map.insert(
deposit3.txid(),
(
deposit3.clone(),
Expand All @@ -2363,11 +2340,15 @@ mod tests {
),
);

let ms = DummyLiana::new(btc, db);
let ms = DummyLiana::new(DummyBitcoind::new(), db);

let control = &ms.control();
let mut db_conn = control.db.connection();
let txs: Vec<_> = txs_map.values().map(|(tx, _)| tx.clone()).collect();
db_conn.new_txs(&txs);

let transactions = control.list_confirmed_transactions(0, 4, 10).transactions;
let mut transactions = control.list_confirmed_transactions(0, 4, 10).transactions;
transactions.sort_by(|tx1, tx2| tx2.height.cmp(&tx1.height));
assert_eq!(transactions.len(), 4);

assert_eq!(transactions[0].time, Some(4));
Expand All @@ -2382,7 +2363,8 @@ mod tests {
assert_eq!(transactions[3].time, Some(1));
assert_eq!(transactions[3].tx, deposit1);

let transactions = control.list_confirmed_transactions(2, 3, 10).transactions;
let mut transactions = control.list_confirmed_transactions(2, 3, 10).transactions;
transactions.sort_by(|tx1, tx2| tx2.height.cmp(&tx1.height));
assert_eq!(transactions.len(), 2);

assert_eq!(transactions[0].time, Some(3));
Expand Down Expand Up @@ -2451,8 +2433,8 @@ mod tests {
}],
};

let mut btc = DummyBitcoind::new();
btc.txs.insert(
let mut txs_map = HashMap::new();
txs_map.insert(
tx1.txid(),
(
tx1.clone(),
Expand All @@ -2466,7 +2448,7 @@ mod tests {
}),
),
);
btc.txs.insert(
txs_map.insert(
tx2.txid(),
(
tx2.clone(),
Expand All @@ -2480,7 +2462,7 @@ mod tests {
}),
),
);
btc.txs.insert(
txs_map.insert(
tx3.txid(),
(
tx3.clone(),
Expand All @@ -2495,9 +2477,31 @@ mod tests {
),
);

let ms = DummyLiana::new(btc, DummyDatabase::new());

let ms = DummyLiana::new(DummyBitcoind::new(), DummyDatabase::new());
let control = &ms.control();
let mut db_conn = control.db.connection();
let txs: Vec<_> = txs_map.values().map(|(tx, _)| tx.clone()).collect();
db_conn.new_txs(&txs);
// We need coins in the DB in order to get the block info for the transactions.
for (txid, (_tx, block)) in txs_map {
// Insert more than one coin per transaction to check that the command does not
// return duplicate transactions.
for vout in 0..4 {
db_conn.new_unspent_coins(&[Coin {
outpoint: bitcoin::OutPoint::new(txid, vout),
is_immature: false,
block_info: block.map(|b| BlockInfo {
height: b.height,
time: b.time,
}),
amount: bitcoin::Amount::from_sat(100_000),
derivation_index: bip32::ChildNumber::from(13),
is_change: false,
spend_txid: None,
spend_block: None,
}]);
}
}

let transactions = control.list_transactions(&[tx1.txid()]).transactions;
assert_eq!(transactions.len(), 1);
Expand Down
36 changes: 36 additions & 0 deletions src/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,18 @@ pub trait DatabaseConnection {

/// Retrieve a limited list of txids that where deposited or spent between the start and end timestamps (inclusive bounds)
fn list_txids(&mut self, start: u32, end: u32, limit: u64) -> Vec<bitcoin::Txid>;

/// Retrieves all txids from the transactions table whether or not they are referenced by a coin.
fn list_saved_txids(&mut self) -> Vec<bitcoin::Txid>;

/// Store transactions in database, ignoring any that already exist.
fn new_txs(&mut self, txs: &[bitcoin::Transaction]);

/// Retrieve a list of transactions and their corresponding block heights and times.
fn list_wallet_transactions(
&mut self,
txids: &[bitcoin::Txid],
) -> Vec<(bitcoin::Transaction, Option<i32>, Option<u32>)>;
}

impl DatabaseConnection for SqliteConn {
Expand Down Expand Up @@ -310,6 +322,30 @@ impl DatabaseConnection for SqliteConn {
fn list_txids(&mut self, start: u32, end: u32, limit: u64) -> Vec<bitcoin::Txid> {
self.db_list_txids(start, end, limit)
}

fn list_saved_txids(&mut self) -> Vec<bitcoin::Txid> {
self.db_list_saved_txids()
}

fn new_txs<'a>(&mut self, txs: &[bitcoin::Transaction]) {
self.new_txs(txs)
}

fn list_wallet_transactions(
&mut self,
txids: &[bitcoin::Txid],
) -> Vec<(bitcoin::Transaction, Option<i32>, Option<u32>)> {
self.list_wallet_transactions(txids)
.into_iter()
.map(|wtx| {
(
wtx.transaction,
wtx.block_info.map(|b| b.height),
wtx.block_info.map(|b| b.time),
)
})
.collect()
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
Expand Down
Loading

0 comments on commit 90df14c

Please sign in to comment.