Skip to content

Commit

Permalink
feat(wire): add v2 and v1 transport handler
Browse files Browse the repository at this point in the history
  • Loading branch information
nyonson committed Mar 4, 2025
1 parent d0e0708 commit bc3b68e
Show file tree
Hide file tree
Showing 13 changed files with 770 additions and 347 deletions.
520 changes: 312 additions & 208 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions crates/floresta-electrum/src/electrum_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1057,6 +1057,7 @@ mod test {
backfill: false,
filter_start_height: None,
user_agent: "floresta".to_string(),
allow_v1_fallback: true,
};

let chain_provider: UtreexoNode<Arc<ChainState<KvChainStore>>, RunningNode> =
Expand Down
1 change: 1 addition & 0 deletions crates/floresta-wire/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ floresta-common = { path = "../floresta-common" }
oneshot = "0.1.5"
ahash = "0.8.11"
metrics = { path = "../../metrics", optional = true }
bip324 = { version = "0.6.0", features = [ "tokio" ] }

[dev-dependencies]
zstd = "0.13.3"
Expand Down
12 changes: 12 additions & 0 deletions crates/floresta-wire/src/p2p_wire/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use floresta_compact_filters::IterableFilterStoreError;
use thiserror::Error;

use super::peer::PeerError;
use super::transport::TransportError;
use crate::node::NodeRequest;

#[derive(Error, Debug)]
Expand Down Expand Up @@ -41,6 +42,8 @@ pub enum WireError {
PoisonedLock,
#[error("We couldn't parse the provided address due to: {0}")]
InvalidAddress(AddrParseError),
#[error("Transport error: {0}")]
Transport(TransportError),
}

impl_error_from!(WireError, PeerError, PeerError);
Expand All @@ -64,6 +67,15 @@ impl From<io::Error> for WireError {
}
}

impl From<TransportError> for WireError {
fn from(e: TransportError) -> Self {
match e {
TransportError::Io(io) => WireError::Io(io),
other => WireError::Transport(other),
}
}
}

#[derive(Debug, Clone)]
pub enum AddrParseError {
InvalidIpv6,
Expand Down
5 changes: 5 additions & 0 deletions crates/floresta-wire/src/p2p_wire/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ pub struct UtreexoNodeConfig {
pub filter_start_height: Option<i32>,
/// The user agent that we will advertise to our peers. Defaults to `floresta:<version>`.
pub user_agent: String,
/// Whether to allow fallback to v1 transport if v2 connection fails.
/// Defaults to true.
pub allow_v1_fallback: bool,
}

impl Default for UtreexoNodeConfig {
Expand All @@ -82,6 +85,7 @@ impl Default for UtreexoNodeConfig {
assume_utreexo: None,
filter_start_height: None,
user_agent: format!("floresta:{}", env!("CARGO_PKG_VERSION")),
allow_v1_fallback: true,
}
}
}
Expand All @@ -100,3 +104,4 @@ pub mod sync_node;
#[cfg(test)]
#[doc(hidden)]
pub mod tests;
pub mod transport;
44 changes: 15 additions & 29 deletions crates/floresta-wire/src/p2p_wire/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ use log::warn;
use serde::Deserialize;
use serde::Serialize;
use tokio::net::tcp::WriteHalf;
use tokio::net::TcpStream;
use tokio::spawn;
use tokio::sync::mpsc::unbounded_channel;
use tokio::sync::mpsc::UnboundedReceiver;
Expand All @@ -53,14 +52,13 @@ use super::node_context::NodeContext;
use super::node_interface::NodeInterface;
use super::node_interface::PeerInfo;
use super::node_interface::UserRequest;
use super::peer::create_tcp_stream_actor;
use super::peer::create_actors;
use super::peer::Peer;
use super::peer::PeerMessages;
use super::peer::Version;
use super::running_node::RunningNode;
use super::socks::Socks5Addr;
use super::socks::Socks5Error;
use super::socks::Socks5StreamBuilder;
use super::transport;
use super::UtreexoNodeConfig;
use crate::node_context::PeerId;

Expand Down Expand Up @@ -1011,15 +1009,15 @@ where
network: bitcoin::Network,
node_tx: UnboundedSender<NodeNotification>,
user_agent: String,
allow_v1_fallback: bool,
) -> Result<(), WireError> {
let address = (address.get_net_address(), address.get_port());
let stream = TcpStream::connect(address).await?;

stream.set_nodelay(true)?;
let (reader, writer) = tokio::io::split(stream);
let (transport_reader, transport_writer) =
transport::connect(address, network, allow_v1_fallback).await?;

let (cancellation_sender, cancellation_receiver) = tokio::sync::oneshot::channel();
let (actor_receiver, actor) = create_tcp_stream_actor(reader);
let (actor_receiver, actor) = create_actors(transport_reader);
tokio::spawn(async move {
tokio::select! {
_ = cancellation_receiver => {}
Expand All @@ -1031,13 +1029,12 @@ where
Peer::<WriteHalf>::create_peer(
peer_id_count,
mempool,
network,
node_tx.clone(),
requests_rx,
peer_id,
kind,
actor_receiver,
writer,
transport_writer,
user_agent,
cancellation_sender,
)
Expand All @@ -1058,25 +1055,13 @@ where
requests_rx: UnboundedReceiver<NodeRequest>,
peer_id_count: u32,
user_agent: String,
) -> Result<(), Socks5Error> {
let addr = match address.get_address() {
AddrV2::Cjdns(addr) => Socks5Addr::Ipv6(addr),
AddrV2::I2p(addr) => Socks5Addr::Domain(addr.into()),
AddrV2::Ipv4(addr) => Socks5Addr::Ipv4(addr),
AddrV2::Ipv6(addr) => Socks5Addr::Ipv6(addr),
AddrV2::TorV2(addr) => Socks5Addr::Domain(addr.into()),
AddrV2::TorV3(addr) => Socks5Addr::Domain(addr.into()),
AddrV2::Unknown(_, _) => {
return Err(Socks5Error::InvalidAddress);
}
};

let proxy = TcpStream::connect(proxy).await?;
let stream = Socks5StreamBuilder::connect(proxy, addr, address.get_port()).await?;
let (reader, writer) = tokio::io::split(stream);
allow_v1_fallback: bool,
) -> Result<(), WireError> {
let (transport_reader, transport_writer) =
transport::connect_proxy(proxy, address, network, allow_v1_fallback).await?;

let (cancellation_sender, cancellation_receiver) = tokio::sync::oneshot::channel();
let (actor_receiver, actor) = create_tcp_stream_actor(reader);
let (actor_receiver, actor) = create_actors(transport_reader);
tokio::spawn(async move {
tokio::select! {
_ = cancellation_receiver => {}
Expand All @@ -1087,13 +1072,12 @@ where
Peer::<WriteHalf>::create_peer(
peer_id_count,
mempool,
network,
node_tx,
requests_rx,
peer_id,
kind,
actor_receiver,
writer,
transport_writer,
user_agent,
cancellation_sender,
)
Expand Down Expand Up @@ -1125,6 +1109,7 @@ where
requests_rx,
self.peer_id_count,
self.config.user_agent.clone(),
self.config.allow_v1_fallback,
),
));
} else {
Expand All @@ -1140,6 +1125,7 @@ where
self.network.into(),
self.node_tx.clone(),
self.config.user_agent.clone(),
self.config.allow_v1_fallback,
),
));
}
Expand Down
Loading

0 comments on commit bc3b68e

Please sign in to comment.