Skip to content

Commit

Permalink
Instrumented macro refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
m-reichert committed Jan 15, 2025
1 parent a579e79 commit 750af6e
Show file tree
Hide file tree
Showing 14 changed files with 121 additions and 121 deletions.
20 changes: 10 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
workspace = { members = ["instrumented_macro"] }
workspace = { members = ["electrs_macros"] }
[package]
name = "electrs"
version = "0.4.1"
Expand Down Expand Up @@ -77,7 +77,7 @@ tracing = { version = "0.1.40", default-features = false, features = ["attribute
# optional dependencies for electrum-discovery
electrum-client = { version = "0.8", optional = true }
zmq = "0.10.0"
instrumented_macro = { path = "instrumented_macro" }
electrs_macros = { path = "electrs_macros" }

[dev-dependencies]
bitcoind = { version = "0.34.3", features = ["25_0"] }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[package]
name = "instrumented_macro"
name = "electrs_macros"
version = "0.1.0"
edition = "2021"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use quote::quote;
use syn::{parse_macro_input, ItemFn};

#[proc_macro_attribute]
pub fn instrumented(attr: TokenStream, item: TokenStream) -> TokenStream {
pub fn trace(attr: TokenStream, item: TokenStream) -> TokenStream {
let additional_fields = if !attr.is_empty() {
let attr_tokens: proc_macro2::TokenStream = attr.into();
quote! {, #attr_tokens }
Expand Down
68 changes: 34 additions & 34 deletions src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use bitcoin::consensus::encode::{deserialize, serialize_hex};
#[cfg(feature = "liquid")]
use elements::encode::{deserialize, serialize_hex};

use instrumented_macro::instrumented;
use electrs_macros::trace;

use crate::chain::{Block, BlockHash, BlockHeader, Network, Transaction, Txid};
use crate::metrics::{HistogramOpts, HistogramVec, Metrics};
Expand All @@ -44,7 +44,7 @@ lazy_static! {
const MAX_ATTEMPTS: u32 = 5;
const RETRY_WAIT_DURATION: Duration = Duration::from_secs(1);

#[instrumented]
#[trace]
fn parse_hash<T>(value: &Value) -> Result<T>
where
T: FromStr,
Expand All @@ -58,7 +58,7 @@ where
.chain_err(|| format!("non-hex value: {}", value))?)
}

#[instrumented]
#[trace]
fn header_from_value(value: Value) -> Result<BlockHeader> {
let header_hex = value
.as_str()
Expand Down Expand Up @@ -153,7 +153,7 @@ struct Connection {
signal: Waiter,
}

#[instrumented]
#[trace]
fn tcp_connect(addr: SocketAddr, signal: &Waiter) -> Result<TcpStream> {
loop {
match TcpStream::connect_timeout(&addr, *DAEMON_CONNECTION_TIMEOUT) {
Expand All @@ -176,7 +176,7 @@ fn tcp_connect(addr: SocketAddr, signal: &Waiter) -> Result<TcpStream> {
}

impl Connection {
#[instrumented]
#[trace]
fn new(
addr: SocketAddr,
cookie_getter: Arc<dyn CookieGetter>,
Expand All @@ -196,12 +196,12 @@ impl Connection {
})
}

#[instrumented]
#[trace]
fn reconnect(&self) -> Result<Connection> {
Connection::new(self.addr, self.cookie_getter.clone(), self.signal.clone())
}

#[instrumented]
#[trace]
fn send(&mut self, request: &str) -> Result<()> {
let cookie = &self.cookie_getter.get()?;
let msg = format!(
Expand All @@ -215,7 +215,7 @@ impl Connection {
})
}

#[instrumented]
#[trace]
fn recv(&mut self) -> Result<String> {
// TODO: use proper HTTP parser.
let mut in_header = true;
Expand Down Expand Up @@ -381,7 +381,7 @@ impl Daemon {
Ok(daemon)
}

#[instrumented]
#[trace]
pub fn reconnect(&self) -> Result<Daemon> {
Ok(Daemon {
daemon_dir: self.daemon_dir.clone(),
Expand All @@ -396,7 +396,7 @@ impl Daemon {
})
}

#[instrumented]
#[trace]
pub fn list_blk_files(&self) -> Result<Vec<PathBuf>> {
let path = self.blocks_dir.join("blk*.dat");
debug!("listing block files at {:?}", path);
Expand Down Expand Up @@ -432,7 +432,7 @@ impl Daemon {
self.network.magic()
}

#[instrumented]
#[trace]
fn call_jsonrpc(&self, method: &str, request: &Value) -> Result<Value> {
let mut conn = self.conn.lock().unwrap();
let timer = self.latency.with_label_values(&[method]).start_timer();
Expand All @@ -450,7 +450,7 @@ impl Daemon {
Ok(result)
}

#[instrumented(method = %method)]
#[trace(method = %method)]
fn handle_request(&self, method: &str, params: &Value) -> Result<Value> {
let id = self.message_id.next();
let req = json!({"method": method, "params": params, "id": id});
Expand All @@ -473,12 +473,12 @@ impl Daemon {
}
}

#[instrumented]
#[trace]
fn request(&self, method: &str, params: Value) -> Result<Value> {
self.retry_request(method, &params)
}

#[instrumented]
#[trace]
fn retry_reconnect(&self) -> Daemon {
// XXX add a max reconnection attempts limit?
loop {
Expand All @@ -493,14 +493,14 @@ impl Daemon {

// Send requests in parallel over multiple RPC connections as individual JSON-RPC requests (with no JSON-RPC batching),
// buffering the replies into a vector. If any of the requests fail, processing is terminated and an Err is returned.
#[instrumented]
#[trace]
fn requests(&self, method: &str, params_list: Vec<Value>) -> Result<Vec<Value>> {
self.requests_iter(method, params_list).collect()
}

// Send requests in parallel over multiple RPC connections, iterating over the results without buffering them.
// Errors are included in the iterator and do not terminate other pending requests.
#[instrumented]
#[trace]
fn requests_iter<'a>(
&'a self,
method: &'a str,
Expand All @@ -523,29 +523,29 @@ impl Daemon {

// bitcoind JSONRPC API:

#[instrumented]
#[trace]
pub fn getblockchaininfo(&self) -> Result<BlockchainInfo> {
let info: Value = self.request("getblockchaininfo", json!([]))?;
Ok(from_value(info).chain_err(|| "invalid blockchain info")?)
}

#[instrumented]
#[trace]
fn getnetworkinfo(&self) -> Result<NetworkInfo> {
let info: Value = self.request("getnetworkinfo", json!([]))?;
Ok(from_value(info).chain_err(|| "invalid network info")?)
}

#[instrumented]
#[trace]
pub fn getbestblockhash(&self) -> Result<BlockHash> {
parse_hash(&self.request("getbestblockhash", json!([]))?)
}

#[instrumented]
#[trace]
pub fn getblockheader(&self, blockhash: &BlockHash) -> Result<BlockHeader> {
header_from_value(self.request("getblockheader", json!([blockhash, /*verbose=*/ false]))?)
}

#[instrumented]
#[trace]
pub fn getblockheaders(&self, heights: &[usize]) -> Result<Vec<BlockHeader>> {
let heights: Vec<Value> = heights.iter().map(|height| json!([height])).collect();
let params_list: Vec<Value> = self
Expand All @@ -560,20 +560,20 @@ impl Daemon {
Ok(result)
}

#[instrumented]
#[trace]
pub fn getblock(&self, blockhash: &BlockHash) -> Result<Block> {
let block =
block_from_value(self.request("getblock", json!([blockhash, /*verbose=*/ false]))?)?;
assert_eq!(block.block_hash(), *blockhash);
Ok(block)
}

#[instrumented]
#[trace]
pub fn getblock_raw(&self, blockhash: &BlockHash, verbose: u32) -> Result<Value> {
self.request("getblock", json!([blockhash, verbose]))
}

#[instrumented]
#[trace]
pub fn getblocks(&self, blockhashes: &[BlockHash]) -> Result<Vec<Block>> {
let params_list: Vec<Value> = blockhashes
.iter()
Expand Down Expand Up @@ -610,7 +610,7 @@ impl Daemon {

/// Fetch the given transactions in parallel over multiple threads and RPC connections,
/// ignoring any missing ones and returning whatever is available.
#[instrumented]
#[trace]
pub fn gettransactions_available(&self, txids: &[&Txid]) -> Result<Vec<(Txid, Transaction)>> {
const RPC_INVALID_ADDRESS_OR_KEY: i64 = -5;

Expand All @@ -635,7 +635,7 @@ impl Daemon {
.collect()
}

#[instrumented]
#[trace]
pub fn gettransaction_raw(
&self,
txid: &Txid,
Expand All @@ -645,24 +645,24 @@ impl Daemon {
self.request("getrawtransaction", json!([txid, verbose, blockhash]))
}

#[instrumented]
#[trace]
pub fn getmempooltx(&self, txhash: &Txid) -> Result<Transaction> {
let value = self.request("getrawtransaction", json!([txhash, /*verbose=*/ false]))?;
tx_from_value(value)
}

#[instrumented]
#[trace]
pub fn getmempooltxids(&self) -> Result<HashSet<Txid>> {
let res = self.request("getrawmempool", json!([/*verbose=*/ false]))?;
Ok(serde_json::from_value(res).chain_err(|| "invalid getrawmempool reply")?)
}

#[instrumented]
#[trace]
pub fn broadcast(&self, tx: &Transaction) -> Result<Txid> {
self.broadcast_raw(&serialize_hex(tx))
}

#[instrumented]
#[trace]
pub fn broadcast_raw(&self, txhex: &str) -> Result<Txid> {
let txid = self.request("sendrawtransaction", json!([txhex]))?;
Ok(
Expand All @@ -674,7 +674,7 @@ impl Daemon {
// Get estimated feerates for the provided confirmation targets using a batch RPC request
// Missing estimates are logged but do not cause a failure, whatever is available is returned
#[allow(clippy::float_cmp)]
#[instrumented]
#[trace]
pub fn estimatesmartfee_batch(&self, conf_targets: &[u16]) -> Result<HashMap<u16, f64>> {
let params_list: Vec<Value> = conf_targets
.iter()
Expand Down Expand Up @@ -709,7 +709,7 @@ impl Daemon {
.collect())
}

#[instrumented]
#[trace]
fn get_all_headers(&self, tip: &BlockHash) -> Result<Vec<BlockHeader>> {
let info: Value = self.request("getblockheader", json!([tip]))?;
let tip_height = info
Expand Down Expand Up @@ -737,7 +737,7 @@ impl Daemon {
}

// Returns a list of BlockHeaders in ascending height (i.e. the tip is last).
#[instrumented]
#[trace]
pub fn get_new_headers(
&self,
indexed_headers: &HeaderList,
Expand Down Expand Up @@ -770,7 +770,7 @@ impl Daemon {
Ok(new_headers)
}

#[instrumented]
#[trace]
pub fn get_relayfee(&self) -> Result<f64> {
let relayfee = self.getnetworkinfo()?.relayfee;

Expand Down
Loading

0 comments on commit 750af6e

Please sign in to comment.