Skip to content

Commit

Permalink
instrumented macro usage
Browse files Browse the repository at this point in the history
  • Loading branch information
mariusz-reichert authored and m-reichert committed Jan 28, 2025
1 parent 1b48fcb commit 7fce6ba
Show file tree
Hide file tree
Showing 11 changed files with 107 additions and 108 deletions.
68 changes: 34 additions & 34 deletions src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use bitcoin::consensus::encode::{deserialize, serialize_hex};
#[cfg(feature = "liquid")]
use elements::encode::{deserialize, serialize_hex};

use tracing::instrument;
use instrumented_macro::instrumented;

use crate::chain::{Block, BlockHash, BlockHeader, Network, Transaction, Txid};
use crate::metrics::{HistogramOpts, HistogramVec, Metrics};
Expand All @@ -44,7 +44,7 @@ lazy_static! {
const MAX_ATTEMPTS: u32 = 5;
const RETRY_WAIT_DURATION: Duration = Duration::from_secs(1);

#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
#[instrumented]
fn parse_hash<T>(value: &Value) -> Result<T>
where
T: FromStr,
Expand All @@ -58,7 +58,7 @@ where
.chain_err(|| format!("non-hex value: {}", value))?)
}

#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
#[instrumented]
fn header_from_value(value: Value) -> Result<BlockHeader> {
let header_hex = value
.as_str()
Expand Down Expand Up @@ -153,7 +153,7 @@ struct Connection {
signal: Waiter,
}

#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
#[instrumented]
fn tcp_connect(addr: SocketAddr, signal: &Waiter) -> Result<TcpStream> {
loop {
match TcpStream::connect_timeout(&addr, *DAEMON_CONNECTION_TIMEOUT) {
Expand All @@ -176,7 +176,7 @@ fn tcp_connect(addr: SocketAddr, signal: &Waiter) -> Result<TcpStream> {
}

impl Connection {
#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
#[instrumented]
fn new(
addr: SocketAddr,
cookie_getter: Arc<dyn CookieGetter>,
Expand All @@ -196,12 +196,12 @@ impl Connection {
})
}

#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
#[instrumented]
fn reconnect(&self) -> Result<Connection> {
Connection::new(self.addr, self.cookie_getter.clone(), self.signal.clone())
}

#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
#[instrumented]
fn send(&mut self, request: &str) -> Result<()> {
let cookie = &self.cookie_getter.get()?;
let msg = format!(
Expand All @@ -215,7 +215,7 @@ impl Connection {
})
}

#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
#[instrumented]
fn recv(&mut self) -> Result<String> {
// TODO: use proper HTTP parser.
let mut in_header = true;
Expand Down Expand Up @@ -381,7 +381,7 @@ impl Daemon {
Ok(daemon)
}

#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
#[instrumented]
pub fn reconnect(&self) -> Result<Daemon> {
Ok(Daemon {
daemon_dir: self.daemon_dir.clone(),
Expand All @@ -396,7 +396,7 @@ impl Daemon {
})
}

#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
#[instrumented]
pub fn list_blk_files(&self) -> Result<Vec<PathBuf>> {
let path = self.blocks_dir.join("blk*.dat");
debug!("listing block files at {:?}", path);
Expand Down Expand Up @@ -432,7 +432,7 @@ impl Daemon {
self.network.magic()
}

#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
#[instrumented]
fn call_jsonrpc(&self, method: &str, request: &Value) -> Result<Value> {
let mut conn = self.conn.lock().unwrap();
let timer = self.latency.with_label_values(&[method]).start_timer();
Expand All @@ -450,7 +450,7 @@ impl Daemon {
Ok(result)
}

#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!(), method = %method))]
#[instrumented(method = %method)]
fn handle_request(&self, method: &str, params: &Value) -> Result<Value> {
let id = self.message_id.next();
let req = json!({"method": method, "params": params, "id": id});
Expand All @@ -473,12 +473,12 @@ impl Daemon {
}
}

#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
#[instrumented]
fn request(&self, method: &str, params: Value) -> Result<Value> {
self.retry_request(method, &params)
}

#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
#[instrumented]
fn retry_reconnect(&self) -> Daemon {
// XXX add a max reconnection attempts limit?
loop {
Expand All @@ -493,14 +493,14 @@ impl Daemon {

// Send requests in parallel over multiple RPC connections as individual JSON-RPC requests (with no JSON-RPC batching),
// buffering the replies into a vector. If any of the requests fail, processing is terminated and an Err is returned.
#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
#[instrumented]
fn requests(&self, method: &str, params_list: Vec<Value>) -> Result<Vec<Value>> {
self.requests_iter(method, params_list).collect()
}

// Send requests in parallel over multiple RPC connections, iterating over the results without buffering them.
// Errors are included in the iterator and do not terminate other pending requests.
#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
#[instrumented]
fn requests_iter<'a>(
&'a self,
method: &'a str,
Expand All @@ -523,29 +523,29 @@ impl Daemon {

// bitcoind JSONRPC API:

#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
#[instrumented]
pub fn getblockchaininfo(&self) -> Result<BlockchainInfo> {
let info: Value = self.request("getblockchaininfo", json!([]))?;
Ok(from_value(info).chain_err(|| "invalid blockchain info")?)
}

#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
#[instrumented]
fn getnetworkinfo(&self) -> Result<NetworkInfo> {
let info: Value = self.request("getnetworkinfo", json!([]))?;
Ok(from_value(info).chain_err(|| "invalid network info")?)
}

#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
#[instrumented]
pub fn getbestblockhash(&self) -> Result<BlockHash> {
parse_hash(&self.request("getbestblockhash", json!([]))?)
}

#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
#[instrumented]
pub fn getblockheader(&self, blockhash: &BlockHash) -> Result<BlockHeader> {
header_from_value(self.request("getblockheader", json!([blockhash, /*verbose=*/ false]))?)
}

#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
#[instrumented]
pub fn getblockheaders(&self, heights: &[usize]) -> Result<Vec<BlockHeader>> {
let heights: Vec<Value> = heights.iter().map(|height| json!([height])).collect();
let params_list: Vec<Value> = self
Expand All @@ -560,20 +560,20 @@ impl Daemon {
Ok(result)
}

#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
#[instrumented]
pub fn getblock(&self, blockhash: &BlockHash) -> Result<Block> {
let block =
block_from_value(self.request("getblock", json!([blockhash, /*verbose=*/ false]))?)?;
assert_eq!(block.block_hash(), *blockhash);
Ok(block)
}

#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
#[instrumented]
pub fn getblock_raw(&self, blockhash: &BlockHash, verbose: u32) -> Result<Value> {
self.request("getblock", json!([blockhash, verbose]))
}

#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
#[instrumented]
pub fn getblocks(&self, blockhashes: &[BlockHash]) -> Result<Vec<Block>> {
let params_list: Vec<Value> = blockhashes
.iter()
Expand Down Expand Up @@ -610,7 +610,7 @@ impl Daemon {

/// Fetch the given transactions in parallel over multiple threads and RPC connections,
/// ignoring any missing ones and returning whatever is available.
#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
#[instrumented]
pub fn gettransactions_available(&self, txids: &[&Txid]) -> Result<Vec<(Txid, Transaction)>> {
const RPC_INVALID_ADDRESS_OR_KEY: i64 = -5;

Expand All @@ -635,7 +635,7 @@ impl Daemon {
.collect()
}

#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
#[instrumented]
pub fn gettransaction_raw(
&self,
txid: &Txid,
Expand All @@ -645,24 +645,24 @@ impl Daemon {
self.request("getrawtransaction", json!([txid, verbose, blockhash]))
}

#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
#[instrumented]
pub fn getmempooltx(&self, txhash: &Txid) -> Result<Transaction> {
let value = self.request("getrawtransaction", json!([txhash, /*verbose=*/ false]))?;
tx_from_value(value)
}

#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
#[instrumented]
pub fn getmempooltxids(&self) -> Result<HashSet<Txid>> {
let res = self.request("getrawmempool", json!([/*verbose=*/ false]))?;
Ok(serde_json::from_value(res).chain_err(|| "invalid getrawmempool reply")?)
}

#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
#[instrumented]
pub fn broadcast(&self, tx: &Transaction) -> Result<Txid> {
self.broadcast_raw(&serialize_hex(tx))
}

#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
#[instrumented]
pub fn broadcast_raw(&self, txhex: &str) -> Result<Txid> {
let txid = self.request("sendrawtransaction", json!([txhex]))?;
Ok(
Expand All @@ -674,7 +674,7 @@ impl Daemon {
// Get estimated feerates for the provided confirmation targets using a batch RPC request
// Missing estimates are logged but do not cause a failure, whatever is available is returned
#[allow(clippy::float_cmp)]
#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
#[instrumented]
pub fn estimatesmartfee_batch(&self, conf_targets: &[u16]) -> Result<HashMap<u16, f64>> {
let params_list: Vec<Value> = conf_targets
.iter()
Expand Down Expand Up @@ -709,7 +709,7 @@ impl Daemon {
.collect())
}

#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
#[instrumented]
fn get_all_headers(&self, tip: &BlockHash) -> Result<Vec<BlockHeader>> {
let info: Value = self.request("getblockheader", json!([tip]))?;
let tip_height = info
Expand Down Expand Up @@ -737,7 +737,7 @@ impl Daemon {
}

// Returns a list of BlockHeaders in ascending height (i.e. the tip is last).
#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
#[instrumented]
pub fn get_new_headers(
&self,
indexed_headers: &HeaderList,
Expand Down Expand Up @@ -770,7 +770,7 @@ impl Daemon {
Ok(new_headers)
}

#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
#[instrumented]
pub fn get_relayfee(&self) -> Result<f64> {
let relayfee = self.getnetworkinfo()?.relayfee;

Expand Down
18 changes: 9 additions & 9 deletions src/electrum/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crypto::sha2::Sha256;
use error_chain::ChainedError;
use serde_json::{from_str, Value};

use tracing::instrument;
use instrumented_macro::instrumented;

#[cfg(not(feature = "liquid"))]
use bitcoin::consensus::encode::serialize_hex;
Expand Down Expand Up @@ -71,7 +71,7 @@ fn bool_from_value_or(val: Option<&Value>, name: &str, default: bool) -> Result<
}

// TODO: implement caching and delta updates
#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
#[instrumented]
fn get_status_hash(txs: Vec<(Txid, Option<BlockId>)>, query: &Query) -> Option<FullHash> {
if txs.is_empty() {
None
Expand Down Expand Up @@ -264,7 +264,7 @@ impl Connection {
}))
}

#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
#[instrumented]
fn blockchain_estimatefee(&self, params: &[Value]) -> Result<Value> {
let conf_target = usize_from_value(params.get(0), "blocks_count")?;
let fee_rate = self
Expand Down Expand Up @@ -392,7 +392,7 @@ impl Connection {
Ok(json!(rawtx.to_lower_hex_string()))
}

#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
#[instrumented]
fn blockchain_transaction_get_merkle(&self, params: &[Value]) -> Result<Value> {
let txid = Txid::from(hash_from_value(params.get(0)).chain_err(|| "bad tx_hash")?);
let height = usize_from_value(params.get(1), "height")?;
Expand Down Expand Up @@ -430,7 +430,7 @@ impl Connection {
}))
}

#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!(), method = %method))]
#[instrumented(method = %method)]
fn handle_command(&mut self, method: &str, params: &[Value], id: &Value) -> Result<Value> {
let timer = self
.stats
Expand Down Expand Up @@ -487,7 +487,7 @@ impl Connection {
})
}

#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
#[instrumented]
fn update_subscriptions(&mut self) -> Result<Vec<Value>> {
let timer = self
.stats
Expand Down Expand Up @@ -545,7 +545,7 @@ impl Connection {
Ok(())
}

#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
#[instrumented]
fn handle_replies(&mut self, receiver: Receiver<Message>) -> Result<()> {
let empty_params = json!([]);
loop {
Expand Down Expand Up @@ -610,7 +610,7 @@ impl Connection {
}
}

#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
#[instrumented]
fn parse_requests(mut reader: BufReader<TcpStream>, tx: &SyncSender<Message>) -> Result<()> {
loop {
let mut line = Vec::<u8>::new();
Expand Down Expand Up @@ -673,7 +673,7 @@ impl Connection {
}
}

#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
#[instrumented]
fn get_history(
query: &Query,
scripthash: &[u8],
Expand Down
1 change: 0 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ extern crate log;
extern crate serde_derive;
#[macro_use]
extern crate serde_json;

#[macro_use]
extern crate lazy_static;

Expand Down
Loading

0 comments on commit 7fce6ba

Please sign in to comment.