Skip to content

Commit

Permalink
feat(wire): add v2 and v1 transport handler
Browse files Browse the repository at this point in the history
  • Loading branch information
nyonson committed Feb 25, 2025
1 parent 31f532c commit 9b6b614
Show file tree
Hide file tree
Showing 10 changed files with 600 additions and 334 deletions.
543 changes: 326 additions & 217 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions crates/floresta-electrum/src/electrum_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1057,6 +1057,7 @@ mod test {
backfill: false,
filter_start_height: None,
user_agent: "floresta".to_string(),
allow_v1_fallback: true,
};

let chain_provider: UtreexoNode<Arc<ChainState<KvChainStore>>, RunningNode> =
Expand Down
1 change: 1 addition & 0 deletions crates/floresta-wire/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ floresta-common = { path = "../floresta-common" }
oneshot = "0.1.5"
ahash = "0.8.11"
metrics = { path = "../../metrics", optional = true }
bip324 = { version = "0.6.0", features = [ "tokio" ] }

[dev-dependencies]
zstd = "0.13.2"
Expand Down
16 changes: 16 additions & 0 deletions crates/floresta-wire/src/p2p_wire/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use floresta_compact_filters::IterableFilterStoreError;
use thiserror::Error;

use super::peer::PeerError;
use super::socks::Socks5Error;
use super::transport::TransportError;
use crate::node::NodeRequest;

#[derive(Error, Debug)]
Expand Down Expand Up @@ -41,9 +43,14 @@ pub enum WireError {
PoisonedLock,
#[error("We couldn't parse the provided address due to: {0}")]
InvalidAddress(AddrParseError),
#[error("Transport error: {0}")]
Transport(TransportError),
#[error("Socks error")]
Socks(Socks5Error),
}

impl_error_from!(WireError, PeerError, PeerError);
impl_error_from!(WireError, Socks5Error, Socks);
impl_error_from!(WireError, BlockchainError, Blockchain);
impl_error_from!(
WireError,
Expand All @@ -64,6 +71,15 @@ impl From<io::Error> for WireError {
}
}

impl From<TransportError> for WireError {
fn from(e: TransportError) -> Self {
match e {
TransportError::Io(io) => WireError::Io(io),
other => WireError::Transport(other),
}
}
}

#[derive(Debug, Clone)]
pub enum AddrParseError {
InvalidIpv6,
Expand Down
5 changes: 5 additions & 0 deletions crates/floresta-wire/src/p2p_wire/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ pub struct UtreexoNodeConfig {
pub filter_start_height: Option<i32>,
/// The user agent that we will advertise to our peers. Defaults to `floresta:<version>`.
pub user_agent: String,
/// Whether to allow fallback to v1 transport if v2 connection fails.
/// Defaults to true.
pub allow_v1_fallback: bool,
}

impl Default for UtreexoNodeConfig {
Expand All @@ -82,6 +85,7 @@ impl Default for UtreexoNodeConfig {
assume_utreexo: None,
filter_start_height: None,
user_agent: format!("floresta:{}", env!("CARGO_PKG_VERSION")),
allow_v1_fallback: true,
}
}
}
Expand All @@ -100,3 +104,4 @@ pub mod sync_node;
#[cfg(test)]
#[doc(hidden)]
pub mod tests;
pub mod transport;
21 changes: 12 additions & 9 deletions crates/floresta-wire/src/p2p_wire/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,15 @@ use super::node_context::NodeContext;
use super::node_interface::NodeInterface;
use super::node_interface::PeerInfo;
use super::node_interface::UserRequest;
use super::peer::create_tcp_stream_actor;
use super::peer::create_actors;
use super::peer::Peer;
use super::peer::PeerMessages;
use super::peer::Version;
use super::running_node::RunningNode;
use super::socks::Socks5Addr;
use super::socks::Socks5Error;
use super::socks::Socks5StreamBuilder;
use super::transport;
use super::UtreexoNodeConfig;
use crate::node_context::PeerId;

Expand Down Expand Up @@ -1017,9 +1018,11 @@ where

stream.set_nodelay(true)?;
let (reader, writer) = tokio::io::split(stream);
let (transport_reader, transport_writer) =
transport::new(reader, writer, network, true).await?;

let (cancellation_sender, cancellation_receiver) = tokio::sync::oneshot::channel();
let (actor_receiver, actor) = create_tcp_stream_actor(reader);
let (actor_receiver, actor) = create_actors(transport_reader);
tokio::spawn(async move {
tokio::select! {
_ = cancellation_receiver => {}
Expand All @@ -1031,13 +1034,12 @@ where
Peer::<WriteHalf>::create_peer(
peer_id_count,
mempool,
network,
node_tx.clone(),
requests_rx,
peer_id,
kind,
actor_receiver,
writer,
transport_writer,
user_agent,
cancellation_sender,
)
Expand All @@ -1058,7 +1060,7 @@ where
requests_rx: UnboundedReceiver<NodeRequest>,
peer_id_count: u32,
user_agent: String,
) -> Result<(), Socks5Error> {
) -> Result<(), WireError> {
let addr = match address.get_address() {
AddrV2::Cjdns(addr) => Socks5Addr::Ipv6(addr),
AddrV2::I2p(addr) => Socks5Addr::Domain(addr.into()),
Expand All @@ -1067,16 +1069,18 @@ where
AddrV2::TorV2(addr) => Socks5Addr::Domain(addr.into()),
AddrV2::TorV3(addr) => Socks5Addr::Domain(addr.into()),
AddrV2::Unknown(_, _) => {
return Err(Socks5Error::InvalidAddress);
return Err(WireError::Socks(Socks5Error::InvalidAddress));
}
};

let proxy = TcpStream::connect(proxy).await?;
let stream = Socks5StreamBuilder::connect(proxy, addr, address.get_port()).await?;
let (reader, writer) = tokio::io::split(stream);
let (transport_reader, transport_writer) =
transport::new(reader, writer, network, true).await?;

let (cancellation_sender, cancellation_receiver) = tokio::sync::oneshot::channel();
let (actor_receiver, actor) = create_tcp_stream_actor(reader);
let (actor_receiver, actor) = create_actors(transport_reader);
tokio::spawn(async move {
tokio::select! {
_ = cancellation_receiver => {}
Expand All @@ -1087,13 +1091,12 @@ where
Peer::<WriteHalf>::create_peer(
peer_id_count,
mempool,
network,
node_tx,
requests_rx,
peer_id,
kind,
actor_receiver,
writer,
transport_writer,
user_agent,
cancellation_sender,
)
Expand Down
Loading

0 comments on commit 9b6b614

Please sign in to comment.