From 157f7b020abfd33deaaaf6ba420b0159d9de1ca6 Mon Sep 17 00:00:00 2001 From: Davidson Souza Date: Wed, 8 Nov 2023 18:31:17 -0300 Subject: [PATCH] feature: allow jsonrpc finding utxos This commit update the gettxout (and removes findtxout) from our json rpc that now uses compact block filters to find any given utxo. We also use the same filters to figure if the TXO is spent or not. --- crates/floresta-cli/src/main.rs | 18 +-- .../src/kv_filter_database.rs | 17 +-- crates/floresta-compact-filters/src/lib.rs | 41 +++++-- crates/floresta-wire/src/p2p_wire/node.rs | 4 +- florestad/src/json_rpc/res.rs | 16 +++ florestad/src/json_rpc/server.rs | 116 ++++++++++++------ florestad/src/main.rs | 2 +- 7 files changed, 143 insertions(+), 71 deletions(-) diff --git a/crates/floresta-cli/src/main.rs b/crates/floresta-cli/src/main.rs index 63eee4f5..3995a4f6 100644 --- a/crates/floresta-cli/src/main.rs +++ b/crates/floresta-cli/src/main.rs @@ -52,7 +52,6 @@ fn get_req(cmd: &Cli) -> (Vec>, String) { Methods::GetRoots => "getroots", Methods::GetBlock { .. } => "getblock", Methods::GetPeerInfo => "getpeerinfo", - Methods::FindUtxo { .. } => "findtxout", Methods::ListTransactions => "gettransactions", Methods::Stop => "stop", Methods::AddNode { .. } => "addnode", @@ -86,9 +85,6 @@ fn get_req(cmd: &Cli) -> (Vec>, String) { Methods::GetBlock { hash } => vec![arg(hash)], Methods::GetPeerInfo => Vec::new(), Methods::ListTransactions => Vec::new(), - Methods::FindUtxo { height, txid, vout } => { - vec![arg(height), arg(txid), arg(vout)] - } Methods::Stop => Vec::new(), Methods::AddNode { node } => { vec![arg(node)] @@ -137,10 +133,6 @@ pub enum Methods { /// Returns the hash of the block associated with height #[command(name = "getblockhash")] GetBlockHash { height: u32 }, - /// Returns information about a transaction output, assuming it is cached by our watch - /// only wallet - #[command(name = "gettxout")] - GetTxOut { txid: Txid, vout: u32 }, /// Returns the proof that one or more transactions were included in a block #[command(name = "gettxproof")] GetTxProof { @@ -174,11 +166,11 @@ pub enum Methods { /// List all transactions we are watching #[command(name = "listtransactions")] ListTransactions, - /// Finds a TXO by its outpoint and block height. Since we don't have a UTXO set - /// the block height is required to find the UTXO. Note that this command doesn't - /// check if the UTXO is spent or not. - #[command(name = "findutxo")] - FindUtxo { height: u32, txid: Txid, vout: u32 }, + /// Returns the value associated with a UTXO, if it's still not spent. + /// This function only works properly if we have the compact block filters + /// feature enabled + #[command(name = "gettxout")] + GetTxOut { txid: Txid, vout: u32 }, /// Stops the node #[command(name = "stop")] Stop, diff --git a/crates/floresta-compact-filters/src/kv_filter_database.rs b/crates/floresta-compact-filters/src/kv_filter_database.rs index d5bfc060..c01cde4a 100644 --- a/crates/floresta-compact-filters/src/kv_filter_database.rs +++ b/crates/floresta-compact-filters/src/kv_filter_database.rs @@ -1,16 +1,17 @@ use std::path::PathBuf; use bitcoin::util::bip158::BlockFilter; -use kv::{Bucket, Integer, Config}; +use kv::{Bucket, Config, Integer}; use crate::BlockFilterStore; /// Stores the block filters insinde a kv database -pub struct KvFilterStore<'a> { - bucket: Bucket<'a, Integer, Vec>, +#[derive(Clone)] +pub struct KvFilterStore { + bucket: Bucket<'static, Integer, Vec>, } -impl KvFilterStore<'_> { +impl KvFilterStore { /// Creates a new [KvFilterStore] that stores it's content in `datadir`. /// /// If the path does't exist it'll be created. This store uses compression by default, if you @@ -20,15 +21,15 @@ impl KvFilterStore<'_> { let store = kv::Store::new(kv::Config { path: datadir.to_owned(), temporary: false, - use_compression: true, + use_compression: false, flush_every_ms: None, cache_capacity: None, segment_size: None, - }).expect("Could not open store"); + }) + .expect("Could not open store"); let bucket = store.bucket(Some("cfilters")).unwrap(); KvFilterStore { bucket } - } /// Creates a new [KvFilterStore] that stores it's content with a given config pub fn with_config(config: Config) -> Self { @@ -38,7 +39,7 @@ impl KvFilterStore<'_> { } } -impl BlockFilterStore for KvFilterStore<'_> { +impl BlockFilterStore for KvFilterStore { fn get_filter(&self, block_height: u64) -> Option { let value = self .bucket diff --git a/crates/floresta-compact-filters/src/lib.rs b/crates/floresta-compact-filters/src/lib.rs index 4b44d074..c331a68a 100644 --- a/crates/floresta-compact-filters/src/lib.rs +++ b/crates/floresta-compact-filters/src/lib.rs @@ -22,7 +22,7 @@ use log::error; use std::io::Write; /// A database that stores our compact filters -pub trait BlockFilterStore { +pub trait BlockFilterStore: Send + Sync { /// Fetches a block filter fn get_filter(&self, block_height: u64) -> Option; /// Stores a new filter @@ -140,16 +140,20 @@ impl BlockFilterBackend { k1: u64::from_le_bytes(k1), } } + /// Returns a given filter pub fn get_filter(&self, block_height: u32) -> Option { self.storage.get_filter(block_height as u64) } + /// Build and index a given block height pub fn filter_block(&self, block: &Block, block_height: u64) -> Result<(), bip158::Error> { let mut writer = Vec::new(); let mut filter = FilterBuilder::new(&mut writer, FILTER_M, FILTER_P, self.k0, self.k1); + if self.index_inputs { self.write_inputs(&block.txdata, &mut filter); } + if self.index_txids { self.write_txids(&block.txdata, &mut filter); } @@ -158,38 +162,41 @@ impl BlockFilterBackend { filter.finish()?; let filter = BlockFilter::new(writer.as_slice()); - self.storage.put_filter(block_height, filter); + self.storage.put_filter(block_height, filter); Ok(()) } + /// Maches a set of filters against out current set of filters /// /// This function will run over each filter inside the range `[start, end]` and sees /// if at least one query mathes. It'll return a vector of block heights where it matches. /// you should download those blocks and see what if there's anything interesting. pub fn match_any(&self, start: u64, end: u64, query: &[QueryType]) -> Option> { - let mut values = query.iter().map(|filter| filter.as_slice()); - let mut blocks = Vec::new(); + let key = BlockHash::from_inner(self.key); + let values = query + .iter() + .map(|filter| filter.as_slice()) + .collect::>(); for i in start..=end { - if self - .storage - .get_filter(i)? - .match_any(&BlockHash::from_inner(self.key), &mut values) - .ok()? - { - blocks.push(i); + if let Some(result) = self.storage.get_filter(i) { + let result = result.match_any(&key, &mut values.iter().copied()).ok()?; + if result { + blocks.push(i); + } } } - Some(blocks) } + fn write_txids(&self, txs: &Vec, filter: &mut FilterBuilder) { for tx in txs { filter.put(tx.txid().as_inner()); } } + fn write_inputs(&self, txs: &Vec, filter: &mut FilterBuilder) { for tx in txs { tx.input.iter().for_each(|input| { @@ -200,6 +207,7 @@ impl BlockFilterBackend { }) } } + fn write_tx_outs(&self, tx: &Transaction, filter: &mut FilterBuilder) { for output in tx.output.iter() { let hash = floresta_common::get_spk_hash(&output.script_pubkey); @@ -220,6 +228,7 @@ impl BlockFilterBackend { } } } + fn write_outputs(&self, txs: &Vec, filter: &mut FilterBuilder) { for tx in txs { self.write_tx_outs(tx, filter); @@ -322,6 +331,7 @@ impl FilterBackendBuilder { } /// A serialized output that can be queried against our filter +#[derive(Debug)] pub struct QueriableOutpoint(pub(crate) [u8; 36]); impl From for QueriableOutpoint { @@ -334,6 +344,7 @@ impl From for QueriableOutpoint { } /// The type of value we are looking for in a filter. +#[derive(Debug)] pub enum QueryType { /// We are looking for a specific outpoint being spent Input(QueriableOutpoint), @@ -365,6 +376,10 @@ pub struct MemoryBlockFilterStorage { filters: RefCell>, } +#[cfg(test)] +#[doc(hidden)] +unsafe impl Sync for MemoryBlockFilterStorage {} + #[doc(hidden)] #[cfg(test)] impl BlockFilterStore for MemoryBlockFilterStorage { @@ -399,7 +414,7 @@ impl<'a> KvFiltersStore<'a> { } } -impl BlockFilterStore for KvFiltersStore<'_> { +impl<'a> BlockFilterStore for KvFiltersStore<'a> { fn get_filter(&self, block_height: u64) -> Option { self.bucket .get(&block_height.into()) diff --git a/crates/floresta-wire/src/p2p_wire/node.rs b/crates/floresta-wire/src/p2p_wire/node.rs index 9210075e..a8d524bb 100644 --- a/crates/floresta-wire/src/p2p_wire/node.rs +++ b/crates/floresta-wire/src/p2p_wire/node.rs @@ -1096,6 +1096,7 @@ where // Use this node state to Initial Block download let mut ibd = UtreexoNode(self.0, IBDNode::default()); try_and_log!(UtreexoNode::::run(&mut ibd, kill_signal).await); + // Then take the final state and run the node self = UtreexoNode(ibd.0, self.1); @@ -1107,7 +1108,8 @@ where .await; loop { - while let Ok(notification) = timeout(Duration::from_secs(1), self.node_rx.recv()).await + while let Ok(notification) = + timeout(Duration::from_millis(1), self.node_rx.recv()).await { try_and_log!(self.handle_notification(notification).await); } diff --git a/florestad/src/json_rpc/res.rs b/florestad/src/json_rpc/res.rs index a000ee5b..ef930e33 100644 --- a/florestad/src/json_rpc/res.rs +++ b/florestad/src/json_rpc/res.rs @@ -18,6 +18,7 @@ pub struct GetBlockchainInfoRes { pub progress: f32, pub difficulty: u64, } + #[derive(Deserialize, Serialize)] pub struct RawTxJson { pub in_active_chain: bool, @@ -36,12 +37,14 @@ pub struct RawTxJson { pub blocktime: u32, pub time: u32, } + #[derive(Deserialize, Serialize)] pub struct TxOutJson { pub value: u64, pub n: u32, pub script_pub_key: ScriptPubKeyJson, } + #[derive(Deserialize, Serialize)] pub struct ScriptPubKeyJson { pub asm: String, @@ -51,6 +54,7 @@ pub struct ScriptPubKeyJson { pub type_: String, pub address: String, } + #[derive(Deserialize, Serialize)] pub struct TxInJson { pub txid: String, @@ -59,11 +63,13 @@ pub struct TxInJson { pub sequence: u32, pub witness: Vec, } + #[derive(Deserialize, Serialize)] pub struct ScriptSigJson { pub asm: String, pub hex: String, } + #[derive(Deserialize, Serialize)] pub struct BlockJson { pub hash: String, @@ -85,6 +91,7 @@ pub struct BlockJson { pub chainwork: String, pub n_tx: usize, pub previousblockhash: String, + #[serde(skip_serializing_if = "Option::is_none")] pub nextblockhash: Option, } @@ -96,7 +103,10 @@ pub enum Error { Chain, InvalidPort, InvalidAddress, + Node, + NoBlockFilters, } + impl Display for Error { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let msg = match self { @@ -106,10 +116,13 @@ impl Display for Error { Error::Chain => "Chain error", Error::InvalidPort => "Invalid port", Error::InvalidAddress => "Invalid address", + Error::Node => "Node returned an error", + Error::NoBlockFilters => "You don't have block filters enabled, please start florestad with --cfilters to run this RPC" }; write!(f, "{}", msg) } } + impl From for i64 { fn from(val: Error) -> Self { match val { @@ -119,9 +132,12 @@ impl From for i64 { Error::InvalidDescriptor => 4, Error::InvalidPort => 5, Error::InvalidAddress => 6, + Error::Node => 7, + Error::NoBlockFilters => 8, } } } + impl From for ErrorCode { fn from(val: Error) -> Self { let code = val.into(); diff --git a/florestad/src/json_rpc/server.rs b/florestad/src/json_rpc/server.rs index 32e993a9..4fb3c765 100644 --- a/florestad/src/json_rpc/server.rs +++ b/florestad/src/json_rpc/server.rs @@ -9,13 +9,13 @@ use bitcoin::{ hex::{FromHex, ToHex}, Hash, }, - Address, BlockHash, BlockHeader, Network, Script, TxIn, TxOut, Txid, + Address, Block, BlockHash, BlockHeader, Network, OutPoint, Script, TxIn, TxOut, Txid, }; use floresta_chain::{ pruned_utreexo::{BlockchainInterface, UpdatableChainstate}, ChainState, KvChainStore, }; -use floresta_compact_filters::BlockFilterBackend; +use floresta_compact_filters::{BlockFilterBackend, QueryType}; use floresta_watch_only::{kv_database::KvDatabase, AddressCache, CachedTransaction}; use floresta_wire::node_interface::{NodeInterface, NodeMethods, PeerInfo}; use futures::executor::block_on; @@ -39,8 +39,6 @@ pub trait Rpc { fn get_transaction(&self, tx_id: Txid, verbosity: Option) -> Result; #[rpc(name = "gettxproof")] fn get_tx_proof(&self, tx_id: Txid) -> Result>; - #[rpc(name = "gettxout")] - fn get_tx_out(&self, tx_id: Txid, outpoint: usize) -> Result; #[rpc(name = "loaddescriptor")] fn load_descriptor(&self, descriptor: String, rescan: Option) -> Result<()>; #[rpc(name = "rescan")] @@ -55,8 +53,8 @@ pub trait Rpc { fn get_peer_info(&self) -> Result>; #[rpc(name = "getblock")] fn get_block(&self, hash: BlockHash, verbosity: Option) -> Result; - #[rpc(name = "findtxout")] - fn find_tx_out(&self, block_height: u32, tx_id: Txid, outpoint: usize) -> Result; + #[rpc(name = "gettxout", returns = "TxOut")] + fn get_tx_out(&self, tx_id: Txid, outpoint: u32) -> Result; #[rpc(name = "stop")] fn stop(&self) -> Result; #[rpc(name = "addnode")] @@ -72,6 +70,78 @@ pub struct RpcImpl { kill_signal: Arc>, } impl Rpc for RpcImpl { + fn get_tx_out(&self, tx_id: Txid, outpoint: u32) -> Result { + fn has_input(block: &Block, expected_input: OutPoint) -> bool { + block.txdata.iter().any(|tx| { + tx.input + .iter() + .any(|input| input.previous_output == expected_input) + }) + } + // can't proceed without block filters + if self.block_filter_storage.is_none() { + return Err(jsonrpc_core::Error { + code: Error::NoBlockFilters.into(), + message: Error::NoBlockFilters.to_string(), + data: None, + }); + } + // this variable will be set to the UTXO iff (i) it have been created + // (ii) it haven't been spent + let mut txout = None; + let tip = self.chain.get_height().unwrap(); + + if let Some(ref cfilters) = self.block_filter_storage { + let vout = OutPoint { + txid: tx_id, + vout: outpoint, + }; + + let filter_outpoint = QueryType::Input(vout.into()); + let filter_txid = QueryType::Txid(tx_id); + + let candidates = cfilters + .match_any(1, tip as u64, &[filter_outpoint, filter_txid]) + .unwrap(); + + let candidates = candidates + .into_iter() + .flat_map(|height| self.chain.get_block_hash(height as u32)) + .map(|hash| self.node.get_block(hash)); + + for candidate in candidates { + let candidate = match candidate { + Err(e) => { + return Err(jsonrpc_core::Error { + code: Error::Node.into(), + message: format!("error while downloading block {candidate:?}"), + data: Some(jsonrpc_core::Value::String(e.to_string())), + }); + } + Ok(None) => { + return Err(jsonrpc_core::Error { + code: Error::Node.into(), + message: format!("BUG: block {candidate:?} is a match in our filters, but we can't get it?"), + data: None, + }); + } + Ok(Some(candidate)) => candidate, + }; + + if let Some(tx) = candidate.txdata.iter().position(|tx| tx.txid() == tx_id) { + txout = candidate.txdata[tx].output.get(outpoint as usize).cloned(); + } + + if has_input(&candidate, vout) { + txout = None; + } + } + } + match txout { + Some(txout) => Ok(json!({"txout": txout})), + None => Ok(json!({})), + } + } fn get_height(&self) -> Result { Ok(self.chain.get_best_block().unwrap().0) } @@ -84,7 +154,11 @@ impl Rpc for RpcImpl { .content .to_hex()); } - Err(jsonrpc_core::Error { code: 10.into(), message: String::from("You don't have block filters enabled in your node, change this by restarting with -cfilters"), data: None }) + Err(jsonrpc_core::Error { + code: Error::NoBlockFilters.into(), + message: Error::NoBlockFilters.to_string(), + data: None, + }) } fn add_node(&self, node: String) -> Result { let node = node.split(':').collect::>(); @@ -208,14 +282,6 @@ impl Rpc for RpcImpl { Err(Error::Chain.into()) } - fn get_tx_out(&self, tx_id: Txid, outpoint: usize) -> Result { - let tx = block_on(self.wallet.read()).get_transaction(&tx_id); - if let Some(tx) = tx { - return Ok(tx.tx.output[outpoint].clone()); - } - Err(Error::TxNotFound.into()) - } - fn get_tx_proof(&self, tx_id: Txid) -> Result> { if let Some((proof, _)) = block_on(self.wallet.read()).get_merkle_proof(&tx_id) { return Ok(proof); @@ -282,26 +348,6 @@ impl Rpc for RpcImpl { Err(Error::BlockNotFound.into()) } - fn find_tx_out(&self, block_height: u32, tx_id: Txid, outpoint: usize) -> Result { - let block_hash = self - .chain - .get_block_hash(block_height) - .map_err(|_| Error::Chain)?; - let block = self - .node - .get_block(block_hash) - .map_err(|_| jsonrpc_core::Error { - code: 5.into(), - message: "Block not found".into(), - data: None, - })?; - let tx = block.and_then(|block| block.txdata.iter().find(|tx| tx.txid() == tx_id).cloned()); - if let Some(tx) = tx { - return Ok(tx.output[outpoint].clone()); - } - Err(Error::TxNotFound.into()) - } - fn get_peer_info(&self) -> Result> { let peers = self.node.get_peer_info(); if let Ok(peers) = peers { diff --git a/florestad/src/main.rs b/florestad/src/main.rs index b576eb19..d2cb3a51 100644 --- a/florestad/src/main.rs +++ b/florestad/src/main.rs @@ -71,7 +71,6 @@ struct Ctx { #[cfg(feature = "zmq-server")] zmq_address: Option, } - fn main() { // Setup global logger pretty_env_logger::env_logger::Builder::from_env(Env::default().default_filter_or("info")) @@ -159,6 +158,7 @@ fn run_with_ctx(ctx: Ctx) { let mut wallet = load_wallet(&data_dir); wallet.setup().expect("Could not initialize wallet"); debug!("Done loading wallet"); + // Try to add more wallets to watch if needed let result = setup_wallet( get_both_vec(ctx.wallet_xpub, config_file.wallet.xpubs),