diff --git a/crates/floresta-cli/src/main.rs b/crates/floresta-cli/src/main.rs index 63eee4f5..3995a4f6 100644 --- a/crates/floresta-cli/src/main.rs +++ b/crates/floresta-cli/src/main.rs @@ -52,7 +52,6 @@ fn get_req(cmd: &Cli) -> (Vec>, String) { Methods::GetRoots => "getroots", Methods::GetBlock { .. } => "getblock", Methods::GetPeerInfo => "getpeerinfo", - Methods::FindUtxo { .. } => "findtxout", Methods::ListTransactions => "gettransactions", Methods::Stop => "stop", Methods::AddNode { .. } => "addnode", @@ -86,9 +85,6 @@ fn get_req(cmd: &Cli) -> (Vec>, String) { Methods::GetBlock { hash } => vec![arg(hash)], Methods::GetPeerInfo => Vec::new(), Methods::ListTransactions => Vec::new(), - Methods::FindUtxo { height, txid, vout } => { - vec![arg(height), arg(txid), arg(vout)] - } Methods::Stop => Vec::new(), Methods::AddNode { node } => { vec![arg(node)] @@ -137,10 +133,6 @@ pub enum Methods { /// Returns the hash of the block associated with height #[command(name = "getblockhash")] GetBlockHash { height: u32 }, - /// Returns information about a transaction output, assuming it is cached by our watch - /// only wallet - #[command(name = "gettxout")] - GetTxOut { txid: Txid, vout: u32 }, /// Returns the proof that one or more transactions were included in a block #[command(name = "gettxproof")] GetTxProof { @@ -174,11 +166,11 @@ pub enum Methods { /// List all transactions we are watching #[command(name = "listtransactions")] ListTransactions, - /// Finds a TXO by its outpoint and block height. Since we don't have a UTXO set - /// the block height is required to find the UTXO. Note that this command doesn't - /// check if the UTXO is spent or not. - #[command(name = "findutxo")] - FindUtxo { height: u32, txid: Txid, vout: u32 }, + /// Returns the value associated with a UTXO, if it's still not spent. + /// This function only works properly if we have the compact block filters + /// feature enabled + #[command(name = "gettxout")] + GetTxOut { txid: Txid, vout: u32 }, /// Stops the node #[command(name = "stop")] Stop, diff --git a/crates/floresta-compact-filters/src/kv_filter_database.rs b/crates/floresta-compact-filters/src/kv_filter_database.rs index d5bfc060..c01cde4a 100644 --- a/crates/floresta-compact-filters/src/kv_filter_database.rs +++ b/crates/floresta-compact-filters/src/kv_filter_database.rs @@ -1,16 +1,17 @@ use std::path::PathBuf; use bitcoin::util::bip158::BlockFilter; -use kv::{Bucket, Integer, Config}; +use kv::{Bucket, Config, Integer}; use crate::BlockFilterStore; /// Stores the block filters insinde a kv database -pub struct KvFilterStore<'a> { - bucket: Bucket<'a, Integer, Vec>, +#[derive(Clone)] +pub struct KvFilterStore { + bucket: Bucket<'static, Integer, Vec>, } -impl KvFilterStore<'_> { +impl KvFilterStore { /// Creates a new [KvFilterStore] that stores it's content in `datadir`. /// /// If the path does't exist it'll be created. This store uses compression by default, if you @@ -20,15 +21,15 @@ impl KvFilterStore<'_> { let store = kv::Store::new(kv::Config { path: datadir.to_owned(), temporary: false, - use_compression: true, + use_compression: false, flush_every_ms: None, cache_capacity: None, segment_size: None, - }).expect("Could not open store"); + }) + .expect("Could not open store"); let bucket = store.bucket(Some("cfilters")).unwrap(); KvFilterStore { bucket } - } /// Creates a new [KvFilterStore] that stores it's content with a given config pub fn with_config(config: Config) -> Self { @@ -38,7 +39,7 @@ impl KvFilterStore<'_> { } } -impl BlockFilterStore for KvFilterStore<'_> { +impl BlockFilterStore for KvFilterStore { fn get_filter(&self, block_height: u64) -> Option { let value = self .bucket diff --git a/crates/floresta-compact-filters/src/lib.rs b/crates/floresta-compact-filters/src/lib.rs index 4b44d074..c331a68a 100644 --- a/crates/floresta-compact-filters/src/lib.rs +++ b/crates/floresta-compact-filters/src/lib.rs @@ -22,7 +22,7 @@ use log::error; use std::io::Write; /// A database that stores our compact filters -pub trait BlockFilterStore { +pub trait BlockFilterStore: Send + Sync { /// Fetches a block filter fn get_filter(&self, block_height: u64) -> Option; /// Stores a new filter @@ -140,16 +140,20 @@ impl BlockFilterBackend { k1: u64::from_le_bytes(k1), } } + /// Returns a given filter pub fn get_filter(&self, block_height: u32) -> Option { self.storage.get_filter(block_height as u64) } + /// Build and index a given block height pub fn filter_block(&self, block: &Block, block_height: u64) -> Result<(), bip158::Error> { let mut writer = Vec::new(); let mut filter = FilterBuilder::new(&mut writer, FILTER_M, FILTER_P, self.k0, self.k1); + if self.index_inputs { self.write_inputs(&block.txdata, &mut filter); } + if self.index_txids { self.write_txids(&block.txdata, &mut filter); } @@ -158,38 +162,41 @@ impl BlockFilterBackend { filter.finish()?; let filter = BlockFilter::new(writer.as_slice()); - self.storage.put_filter(block_height, filter); + self.storage.put_filter(block_height, filter); Ok(()) } + /// Maches a set of filters against out current set of filters /// /// This function will run over each filter inside the range `[start, end]` and sees /// if at least one query mathes. It'll return a vector of block heights where it matches. /// you should download those blocks and see what if there's anything interesting. pub fn match_any(&self, start: u64, end: u64, query: &[QueryType]) -> Option> { - let mut values = query.iter().map(|filter| filter.as_slice()); - let mut blocks = Vec::new(); + let key = BlockHash::from_inner(self.key); + let values = query + .iter() + .map(|filter| filter.as_slice()) + .collect::>(); for i in start..=end { - if self - .storage - .get_filter(i)? - .match_any(&BlockHash::from_inner(self.key), &mut values) - .ok()? - { - blocks.push(i); + if let Some(result) = self.storage.get_filter(i) { + let result = result.match_any(&key, &mut values.iter().copied()).ok()?; + if result { + blocks.push(i); + } } } - Some(blocks) } + fn write_txids(&self, txs: &Vec, filter: &mut FilterBuilder) { for tx in txs { filter.put(tx.txid().as_inner()); } } + fn write_inputs(&self, txs: &Vec, filter: &mut FilterBuilder) { for tx in txs { tx.input.iter().for_each(|input| { @@ -200,6 +207,7 @@ impl BlockFilterBackend { }) } } + fn write_tx_outs(&self, tx: &Transaction, filter: &mut FilterBuilder) { for output in tx.output.iter() { let hash = floresta_common::get_spk_hash(&output.script_pubkey); @@ -220,6 +228,7 @@ impl BlockFilterBackend { } } } + fn write_outputs(&self, txs: &Vec, filter: &mut FilterBuilder) { for tx in txs { self.write_tx_outs(tx, filter); @@ -322,6 +331,7 @@ impl FilterBackendBuilder { } /// A serialized output that can be queried against our filter +#[derive(Debug)] pub struct QueriableOutpoint(pub(crate) [u8; 36]); impl From for QueriableOutpoint { @@ -334,6 +344,7 @@ impl From for QueriableOutpoint { } /// The type of value we are looking for in a filter. +#[derive(Debug)] pub enum QueryType { /// We are looking for a specific outpoint being spent Input(QueriableOutpoint), @@ -365,6 +376,10 @@ pub struct MemoryBlockFilterStorage { filters: RefCell>, } +#[cfg(test)] +#[doc(hidden)] +unsafe impl Sync for MemoryBlockFilterStorage {} + #[doc(hidden)] #[cfg(test)] impl BlockFilterStore for MemoryBlockFilterStorage { @@ -399,7 +414,7 @@ impl<'a> KvFiltersStore<'a> { } } -impl BlockFilterStore for KvFiltersStore<'_> { +impl<'a> BlockFilterStore for KvFiltersStore<'a> { fn get_filter(&self, block_height: u64) -> Option { self.bucket .get(&block_height.into()) diff --git a/crates/floresta-wire/src/p2p_wire/node.rs b/crates/floresta-wire/src/p2p_wire/node.rs index 9210075e..a8d524bb 100644 --- a/crates/floresta-wire/src/p2p_wire/node.rs +++ b/crates/floresta-wire/src/p2p_wire/node.rs @@ -1096,6 +1096,7 @@ where // Use this node state to Initial Block download let mut ibd = UtreexoNode(self.0, IBDNode::default()); try_and_log!(UtreexoNode::::run(&mut ibd, kill_signal).await); + // Then take the final state and run the node self = UtreexoNode(ibd.0, self.1); @@ -1107,7 +1108,8 @@ where .await; loop { - while let Ok(notification) = timeout(Duration::from_secs(1), self.node_rx.recv()).await + while let Ok(notification) = + timeout(Duration::from_millis(1), self.node_rx.recv()).await { try_and_log!(self.handle_notification(notification).await); } diff --git a/florestad/src/json_rpc/res.rs b/florestad/src/json_rpc/res.rs index a000ee5b..ef930e33 100644 --- a/florestad/src/json_rpc/res.rs +++ b/florestad/src/json_rpc/res.rs @@ -18,6 +18,7 @@ pub struct GetBlockchainInfoRes { pub progress: f32, pub difficulty: u64, } + #[derive(Deserialize, Serialize)] pub struct RawTxJson { pub in_active_chain: bool, @@ -36,12 +37,14 @@ pub struct RawTxJson { pub blocktime: u32, pub time: u32, } + #[derive(Deserialize, Serialize)] pub struct TxOutJson { pub value: u64, pub n: u32, pub script_pub_key: ScriptPubKeyJson, } + #[derive(Deserialize, Serialize)] pub struct ScriptPubKeyJson { pub asm: String, @@ -51,6 +54,7 @@ pub struct ScriptPubKeyJson { pub type_: String, pub address: String, } + #[derive(Deserialize, Serialize)] pub struct TxInJson { pub txid: String, @@ -59,11 +63,13 @@ pub struct TxInJson { pub sequence: u32, pub witness: Vec, } + #[derive(Deserialize, Serialize)] pub struct ScriptSigJson { pub asm: String, pub hex: String, } + #[derive(Deserialize, Serialize)] pub struct BlockJson { pub hash: String, @@ -85,6 +91,7 @@ pub struct BlockJson { pub chainwork: String, pub n_tx: usize, pub previousblockhash: String, + #[serde(skip_serializing_if = "Option::is_none")] pub nextblockhash: Option, } @@ -96,7 +103,10 @@ pub enum Error { Chain, InvalidPort, InvalidAddress, + Node, + NoBlockFilters, } + impl Display for Error { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let msg = match self { @@ -106,10 +116,13 @@ impl Display for Error { Error::Chain => "Chain error", Error::InvalidPort => "Invalid port", Error::InvalidAddress => "Invalid address", + Error::Node => "Node returned an error", + Error::NoBlockFilters => "You don't have block filters enabled, please start florestad with --cfilters to run this RPC" }; write!(f, "{}", msg) } } + impl From for i64 { fn from(val: Error) -> Self { match val { @@ -119,9 +132,12 @@ impl From for i64 { Error::InvalidDescriptor => 4, Error::InvalidPort => 5, Error::InvalidAddress => 6, + Error::Node => 7, + Error::NoBlockFilters => 8, } } } + impl From for ErrorCode { fn from(val: Error) -> Self { let code = val.into(); diff --git a/florestad/src/json_rpc/server.rs b/florestad/src/json_rpc/server.rs index 32e993a9..4fb3c765 100644 --- a/florestad/src/json_rpc/server.rs +++ b/florestad/src/json_rpc/server.rs @@ -9,13 +9,13 @@ use bitcoin::{ hex::{FromHex, ToHex}, Hash, }, - Address, BlockHash, BlockHeader, Network, Script, TxIn, TxOut, Txid, + Address, Block, BlockHash, BlockHeader, Network, OutPoint, Script, TxIn, TxOut, Txid, }; use floresta_chain::{ pruned_utreexo::{BlockchainInterface, UpdatableChainstate}, ChainState, KvChainStore, }; -use floresta_compact_filters::BlockFilterBackend; +use floresta_compact_filters::{BlockFilterBackend, QueryType}; use floresta_watch_only::{kv_database::KvDatabase, AddressCache, CachedTransaction}; use floresta_wire::node_interface::{NodeInterface, NodeMethods, PeerInfo}; use futures::executor::block_on; @@ -39,8 +39,6 @@ pub trait Rpc { fn get_transaction(&self, tx_id: Txid, verbosity: Option) -> Result; #[rpc(name = "gettxproof")] fn get_tx_proof(&self, tx_id: Txid) -> Result>; - #[rpc(name = "gettxout")] - fn get_tx_out(&self, tx_id: Txid, outpoint: usize) -> Result; #[rpc(name = "loaddescriptor")] fn load_descriptor(&self, descriptor: String, rescan: Option) -> Result<()>; #[rpc(name = "rescan")] @@ -55,8 +53,8 @@ pub trait Rpc { fn get_peer_info(&self) -> Result>; #[rpc(name = "getblock")] fn get_block(&self, hash: BlockHash, verbosity: Option) -> Result; - #[rpc(name = "findtxout")] - fn find_tx_out(&self, block_height: u32, tx_id: Txid, outpoint: usize) -> Result; + #[rpc(name = "gettxout", returns = "TxOut")] + fn get_tx_out(&self, tx_id: Txid, outpoint: u32) -> Result; #[rpc(name = "stop")] fn stop(&self) -> Result; #[rpc(name = "addnode")] @@ -72,6 +70,78 @@ pub struct RpcImpl { kill_signal: Arc>, } impl Rpc for RpcImpl { + fn get_tx_out(&self, tx_id: Txid, outpoint: u32) -> Result { + fn has_input(block: &Block, expected_input: OutPoint) -> bool { + block.txdata.iter().any(|tx| { + tx.input + .iter() + .any(|input| input.previous_output == expected_input) + }) + } + // can't proceed without block filters + if self.block_filter_storage.is_none() { + return Err(jsonrpc_core::Error { + code: Error::NoBlockFilters.into(), + message: Error::NoBlockFilters.to_string(), + data: None, + }); + } + // this variable will be set to the UTXO iff (i) it have been created + // (ii) it haven't been spent + let mut txout = None; + let tip = self.chain.get_height().unwrap(); + + if let Some(ref cfilters) = self.block_filter_storage { + let vout = OutPoint { + txid: tx_id, + vout: outpoint, + }; + + let filter_outpoint = QueryType::Input(vout.into()); + let filter_txid = QueryType::Txid(tx_id); + + let candidates = cfilters + .match_any(1, tip as u64, &[filter_outpoint, filter_txid]) + .unwrap(); + + let candidates = candidates + .into_iter() + .flat_map(|height| self.chain.get_block_hash(height as u32)) + .map(|hash| self.node.get_block(hash)); + + for candidate in candidates { + let candidate = match candidate { + Err(e) => { + return Err(jsonrpc_core::Error { + code: Error::Node.into(), + message: format!("error while downloading block {candidate:?}"), + data: Some(jsonrpc_core::Value::String(e.to_string())), + }); + } + Ok(None) => { + return Err(jsonrpc_core::Error { + code: Error::Node.into(), + message: format!("BUG: block {candidate:?} is a match in our filters, but we can't get it?"), + data: None, + }); + } + Ok(Some(candidate)) => candidate, + }; + + if let Some(tx) = candidate.txdata.iter().position(|tx| tx.txid() == tx_id) { + txout = candidate.txdata[tx].output.get(outpoint as usize).cloned(); + } + + if has_input(&candidate, vout) { + txout = None; + } + } + } + match txout { + Some(txout) => Ok(json!({"txout": txout})), + None => Ok(json!({})), + } + } fn get_height(&self) -> Result { Ok(self.chain.get_best_block().unwrap().0) } @@ -84,7 +154,11 @@ impl Rpc for RpcImpl { .content .to_hex()); } - Err(jsonrpc_core::Error { code: 10.into(), message: String::from("You don't have block filters enabled in your node, change this by restarting with -cfilters"), data: None }) + Err(jsonrpc_core::Error { + code: Error::NoBlockFilters.into(), + message: Error::NoBlockFilters.to_string(), + data: None, + }) } fn add_node(&self, node: String) -> Result { let node = node.split(':').collect::>(); @@ -208,14 +282,6 @@ impl Rpc for RpcImpl { Err(Error::Chain.into()) } - fn get_tx_out(&self, tx_id: Txid, outpoint: usize) -> Result { - let tx = block_on(self.wallet.read()).get_transaction(&tx_id); - if let Some(tx) = tx { - return Ok(tx.tx.output[outpoint].clone()); - } - Err(Error::TxNotFound.into()) - } - fn get_tx_proof(&self, tx_id: Txid) -> Result> { if let Some((proof, _)) = block_on(self.wallet.read()).get_merkle_proof(&tx_id) { return Ok(proof); @@ -282,26 +348,6 @@ impl Rpc for RpcImpl { Err(Error::BlockNotFound.into()) } - fn find_tx_out(&self, block_height: u32, tx_id: Txid, outpoint: usize) -> Result { - let block_hash = self - .chain - .get_block_hash(block_height) - .map_err(|_| Error::Chain)?; - let block = self - .node - .get_block(block_hash) - .map_err(|_| jsonrpc_core::Error { - code: 5.into(), - message: "Block not found".into(), - data: None, - })?; - let tx = block.and_then(|block| block.txdata.iter().find(|tx| tx.txid() == tx_id).cloned()); - if let Some(tx) = tx { - return Ok(tx.output[outpoint].clone()); - } - Err(Error::TxNotFound.into()) - } - fn get_peer_info(&self) -> Result> { let peers = self.node.get_peer_info(); if let Ok(peers) = peers { diff --git a/florestad/src/main.rs b/florestad/src/main.rs index b576eb19..d2cb3a51 100644 --- a/florestad/src/main.rs +++ b/florestad/src/main.rs @@ -71,7 +71,6 @@ struct Ctx { #[cfg(feature = "zmq-server")] zmq_address: Option, } - fn main() { // Setup global logger pretty_env_logger::env_logger::Builder::from_env(Env::default().default_filter_or("info")) @@ -159,6 +158,7 @@ fn run_with_ctx(ctx: Ctx) { let mut wallet = load_wallet(&data_dir); wallet.setup().expect("Could not initialize wallet"); debug!("Done loading wallet"); + // Try to add more wallets to watch if needed let result = setup_wallet( get_both_vec(ctx.wallet_xpub, config_file.wallet.xpubs),