From 94e3af2463d0ced6654917930a38cbf7bba21561 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Thu, 16 Jan 2025 22:53:56 +0100 Subject: [PATCH] http: use separate sockets for IPv4 and IPv6 (#221) --- .../actions/test-file-transfers/entrypoint.sh | 4 +- crates/http/README.md | 4 +- crates/http/src/common.rs | 1 + crates/http/src/config.rs | 51 +-- crates/http/src/lib.rs | 24 +- crates/http/src/workers/socket/mod.rs | 290 +++++++++++------- 6 files changed, 244 insertions(+), 130 deletions(-) diff --git a/.github/actions/test-file-transfers/entrypoint.sh b/.github/actions/test-file-transfers/entrypoint.sh index 868c88c8..7e97f39e 100755 --- a/.github/actions/test-file-transfers/entrypoint.sh +++ b/.github/actions/test-file-transfers/entrypoint.sh @@ -66,14 +66,14 @@ address_ipv4 = '127.0.0.1:3000'" > udp.toml echo "log_level = 'debug' [network] -address = '127.0.0.1:3004'" > http.toml +address_ipv4 = '127.0.0.1:3004'" > http.toml ./target/debug/aquatic http -c http.toml > "$HOME/http.log" 2>&1 & # HTTP with TLS echo "log_level = 'debug' [network] -address = '127.0.0.1:3001' +address_ipv4 = '127.0.0.1:3001' enable_tls = true tls_certificate_path = './server.crt' tls_private_key_path = './key.pk8' diff --git a/crates/http/README.md b/crates/http/README.md index 8fa800e0..2f17319f 100644 --- a/crates/http/README.md +++ b/crates/http/README.md @@ -47,8 +47,8 @@ Generate the configuration file: ./target/release/aquatic_http -p > "aquatic-http-config.toml" ``` -Make necessary adjustments to the file. You will likely want to adjust `address` -(listening address) under the `network` section. +Make necessary adjustments to the file. You will likely want to adjust +listening addresses under the `network` section. To run over TLS, configure certificate and private key files. diff --git a/crates/http/src/common.rs b/crates/http/src/common.rs index 9c326d6f..01b851aa 100644 --- a/crates/http/src/common.rs +++ b/crates/http/src/common.rs @@ -12,6 +12,7 @@ use aquatic_http_protocol::{ use glommio::channels::shared_channel::SharedSender; use slotmap::new_key_type; +#[allow(dead_code)] #[derive(Copy, Clone, Debug)] pub struct ConsumerId(pub usize); diff --git a/crates/http/src/config.rs b/crates/http/src/config.rs index 644844bb..cad61e34 100644 --- a/crates/http/src/config.rs +++ b/crates/http/src/config.rs @@ -1,4 +1,7 @@ -use std::{net::SocketAddr, path::PathBuf}; +use std::{ + net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}, + path::PathBuf, +}; use aquatic_common::{access_list::AccessListConfig, privileges::PrivilegeConfig}; use aquatic_toml_config::TomlConfig; @@ -70,25 +73,24 @@ impl aquatic_common::cli::Config for Config { #[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] #[serde(default, deny_unknown_fields)] pub struct NetworkConfig { - /// Bind to this address + /// Use IPv4 + pub use_ipv4: bool, + /// Use IPv6 + pub use_ipv6: bool, + /// IPv4 address and port /// - /// When providing an IPv4 style address, only IPv4 traffic will be - /// handled. Examples: - /// - "0.0.0.0:3000" binds to port 3000 on all network interfaces - /// - "127.0.0.1:3000" binds to port 3000 on the loopback interface - /// (localhost) + /// Examples: + /// - Use 0.0.0.0:3000 to bind to all interfaces on port 3000 + /// - Use 127.0.0.1:3000 to bind to the loopback interface (localhost) on + /// port 3000 + pub address_ipv4: SocketAddrV4, + /// IPv6 address and port /// - /// When it comes to IPv6-style addresses, behaviour is more complex and - /// differs between operating systems. On Linux, to accept both IPv4 and - /// IPv6 traffic on any interface, use "[::]:3000". Set the "only_ipv6" - /// flag below to limit traffic to IPv6. To bind to the loopback interface - /// and only accept IPv6 packets, use "[::1]:3000" and set the only_ipv6 - /// flag. Receiving both IPv4 and IPv6 traffic on loopback is currently - /// not supported. For other operating systems, please refer to their - /// respective documentation. - pub address: SocketAddr, - /// Only allow access over IPv6 - pub only_ipv6: bool, + /// Examples: + /// - Use [::]:3000 to bind to all interfaces on port 3000 + /// - Use [::1]:3000 to bind to the loopback interface (localhost) on + /// port 3000 + pub address_ipv6: SocketAddrV6, /// Maximum number of pending TCP connections pub tcp_backlog: i32, /// Enable TLS @@ -125,21 +127,30 @@ pub struct NetworkConfig { /// header. Works with typical multi-IP setups (e.g., "X-Forwarded-For") /// as well as for single-IP setups (e.g., nginx "X-Real-IP") pub reverse_proxy_ip_header_format: ReverseProxyPeerIpHeaderFormat, + /// Set flag on IPv6 socket to only accept IPv6 traffic. + /// + /// This should typically be set to true unless your OS does not support + /// double-stack sockets (that is, sockets that receive both IPv4 and IPv6 + /// packets). + pub set_only_ipv6: bool, } impl Default for NetworkConfig { fn default() -> Self { Self { - address: SocketAddr::from(([0, 0, 0, 0], 3000)), + use_ipv4: true, + use_ipv6: true, + address_ipv4: SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 3000), + address_ipv6: SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, 3000, 0, 0), enable_tls: false, tls_certificate_path: "".into(), tls_private_key_path: "".into(), - only_ipv6: false, tcp_backlog: 1024, keep_alive: true, runs_behind_reverse_proxy: false, reverse_proxy_ip_header_name: "X-Forwarded-For".into(), reverse_proxy_ip_header_format: Default::default(), + set_only_ipv6: true, } } } diff --git a/crates/http/src/lib.rs b/crates/http/src/lib.rs index a7300d08..f376c9c5 100644 --- a/crates/http/src/lib.rs +++ b/crates/http/src/lib.rs @@ -27,6 +27,12 @@ const SHARED_CHANNEL_SIZE: usize = 1024; pub fn run(config: Config) -> ::anyhow::Result<()> { let mut signals = Signals::new([SIGUSR1])?; + if !(config.network.use_ipv4 || config.network.use_ipv6) { + return Result::Err(anyhow::anyhow!( + "Both use_ipv4 and use_ipv6 can not be set to false" + )); + } + let state = State::default(); update_access_list(&config.access_list, &state.access_list)?; @@ -35,7 +41,14 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { config.socket_workers + config.swarm_workers, SHARED_CHANNEL_SIZE, ); - let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers); + + let num_sockets_per_worker = + if config.network.use_ipv4 { 1 } else { 0 } + if config.network.use_ipv6 { 1 } else { 0 }; + + let priv_dropper = PrivilegeDropper::new( + config.privileges.clone(), + config.socket_workers * num_sockets_per_worker, + ); let opt_tls_config = if config.network.enable_tls { Some(Arc::new(ArcSwap::from_pointee(create_rustls_config( @@ -55,7 +68,12 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let state = state.clone(); let opt_tls_config = opt_tls_config.clone(); let request_mesh_builder = request_mesh_builder.clone(); - let priv_dropper = priv_dropper.clone(); + + let mut priv_droppers = Vec::new(); + + for _ in 0..num_sockets_per_worker { + priv_droppers.push(priv_dropper.clone()); + } let handle = Builder::new() .name(format!("socket-{:02}", i + 1)) @@ -68,7 +86,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { state, opt_tls_config, request_mesh_builder, - priv_dropper, + priv_droppers, server_start_instant, i, )) diff --git a/crates/http/src/workers/socket/mod.rs b/crates/http/src/workers/socket/mod.rs index b5ae5264..9ac6ac37 100644 --- a/crates/http/src/workers/socket/mod.rs +++ b/crates/http/src/workers/socket/mod.rs @@ -2,21 +2,23 @@ mod connection; mod request; use std::cell::RefCell; +use std::net::SocketAddr; use std::os::unix::prelude::{FromRawFd, IntoRawFd}; use std::rc::Rc; use std::sync::Arc; use std::time::Duration; use anyhow::Context; +use aquatic_common::access_list::AccessList; use aquatic_common::privileges::PrivilegeDropper; use aquatic_common::rustls_config::RustlsConfig; use aquatic_common::{CanonicalSocketAddr, ServerStartInstant}; -use arc_swap::ArcSwap; +use arc_swap::{ArcSwap, ArcSwapAny}; use futures_lite::future::race; use futures_lite::StreamExt; -use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role}; -use glommio::channels::local_channel::{new_bounded, LocalSender}; -use glommio::net::TcpListener; +use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders}; +use glommio::channels::local_channel::{new_bounded, LocalReceiver, LocalSender}; +use glommio::net::{TcpListener, TcpStream}; use glommio::timer::TimerActionRepeat; use glommio::{enclose, prelude::*}; use slotmap::HopSlotMap; @@ -36,14 +38,43 @@ pub async fn run_socket_worker( state: State, opt_tls_config: Option>>, request_mesh_builder: MeshBuilder, - priv_dropper: PrivilegeDropper, + mut priv_droppers: Vec, server_start_instant: ServerStartInstant, worker_index: usize, ) -> anyhow::Result<()> { let config = Rc::new(config); - let access_list = state.access_list; - let listener = create_tcp_listener(&config, priv_dropper).context("create tcp listener")?; + let tcp_listeners = { + let opt_listener_ipv4 = if config.network.use_ipv4 { + let priv_dropper = priv_droppers + .pop() + .ok_or(anyhow::anyhow!("no enough priv droppers"))?; + let socket = + create_tcp_listener(&config, priv_dropper, config.network.address_ipv4.into()) + .context("create tcp listener")?; + + Some(socket) + } else { + None + }; + let opt_listener_ipv6 = if config.network.use_ipv6 { + let priv_dropper = priv_droppers + .pop() + .ok_or(anyhow::anyhow!("no enough priv droppers"))?; + let socket = + create_tcp_listener(&config, priv_dropper, config.network.address_ipv6.into()) + .context("create tcp listener")?; + + Some(socket) + } else { + None + }; + + [opt_listener_ipv4, opt_listener_ipv6] + .into_iter() + .flatten() + .collect::>() + }; let (request_senders, _) = request_mesh_builder .join(Role::Producer) @@ -61,94 +92,138 @@ pub async fn run_socket_worker( ) })); - let mut incoming = listener.incoming(); - - while let Some(stream) = incoming.next().await { - match stream { - Ok(stream) => { - let (close_conn_sender, close_conn_receiver) = new_bounded(1); - - let valid_until = Rc::new(RefCell::new(ValidUntil::new( - server_start_instant, - config.cleaning.max_connection_idle, - ))); - - let connection_id = connection_handles.borrow_mut().insert(ConnectionHandle { - close_conn_sender, - valid_until: valid_until.clone(), - }); - - spawn_local(enclose!( - ( - config, - access_list, - request_senders, - opt_tls_config, - connection_handles, - valid_until - ) - async move { - #[cfg(feature = "metrics")] - let active_connections_gauge = ::metrics::gauge!( - "aquatic_active_connections", - "worker_index" => worker_index.to_string(), - ); - - #[cfg(feature = "metrics")] - active_connections_gauge.increment(1.0); - - let f1 = async { run_connection( - config, - access_list, - request_senders, - server_start_instant, - opt_tls_config, - valid_until.clone(), - stream, - worker_index, - ).await - }; - let f2 = async { - close_conn_receiver.recv().await; - - Err(ConnectionError::Inactive) - }; - - let result = race(f1, f2).await; - - #[cfg(feature = "metrics")] - active_connections_gauge.decrement(1.0); - - match result { - Ok(()) => (), - Err(err@( - ConnectionError::ResponseBufferWrite(_) | - ConnectionError::ResponseBufferFull | - ConnectionError::ScrapeChannelError(_) | - ConnectionError::ResponseSenderClosed - )) => { - ::log::error!("connection closed: {:#}", err); - } - Err(err@ConnectionError::RequestBufferFull) => { - ::log::info!("connection closed: {:#}", err); - } - Err(err) => { - ::log::debug!("connection closed: {:#}", err); - } - } - - connection_handles.borrow_mut().remove(connection_id); - } - )) - .detach(); + let tasks = tcp_listeners + .into_iter() + .map(|tcp_listener| { + let listener_state = ListenerState { + config: config.clone(), + access_list: state.access_list.clone(), + opt_tls_config: opt_tls_config.clone(), + server_start_instant, + connection_handles: connection_handles.clone(), + request_senders: request_senders.clone(), + worker_index, + }; + + spawn_local(listener_state.accept_connections(tcp_listener)) + }) + .collect::>(); + + for task in tasks { + task.await; + } + + Ok(()) +} + +#[derive(Clone)] +struct ListenerState { + config: Rc, + access_list: Arc>>, + opt_tls_config: Option>>, + server_start_instant: ServerStartInstant, + connection_handles: Rc>>, + request_senders: Rc>, + worker_index: usize, +} + +impl ListenerState { + async fn accept_connections(self, listener: TcpListener) { + let mut incoming = listener.incoming(); + + while let Some(stream) = incoming.next().await { + match stream { + Ok(stream) => { + let (close_conn_sender, close_conn_receiver) = new_bounded(1); + + let valid_until = Rc::new(RefCell::new(ValidUntil::new( + self.server_start_instant, + self.config.cleaning.max_connection_idle, + ))); + + let connection_id = + self.connection_handles + .borrow_mut() + .insert(ConnectionHandle { + close_conn_sender, + valid_until: valid_until.clone(), + }); + + spawn_local(self.clone().handle_connection( + close_conn_receiver, + valid_until, + connection_id, + stream, + )) + .detach(); + } + Err(err) => { + ::log::error!("accept connection: {:?}", err); + } + } + } + } + + async fn handle_connection( + self, + close_conn_receiver: LocalReceiver<()>, + valid_until: Rc>, + connection_id: ConnectionId, + stream: TcpStream, + ) { + #[cfg(feature = "metrics")] + let active_connections_gauge = ::metrics::gauge!( + "aquatic_active_connections", + "worker_index" => self.worker_index.to_string(), + ); + + #[cfg(feature = "metrics")] + active_connections_gauge.increment(1.0); + + let f1 = async { + run_connection( + self.config, + self.access_list, + self.request_senders, + self.server_start_instant, + self.opt_tls_config, + valid_until.clone(), + stream, + self.worker_index, + ) + .await + }; + let f2 = async { + close_conn_receiver.recv().await; + + Err(ConnectionError::Inactive) + }; + + let result = race(f1, f2).await; + + #[cfg(feature = "metrics")] + active_connections_gauge.decrement(1.0); + + match result { + Ok(()) => (), + Err( + err @ (ConnectionError::ResponseBufferWrite(_) + | ConnectionError::ResponseBufferFull + | ConnectionError::ScrapeChannelError(_) + | ConnectionError::ResponseSenderClosed), + ) => { + ::log::error!("connection closed: {:#}", err); + } + Err(err @ ConnectionError::RequestBufferFull) => { + ::log::info!("connection closed: {:#}", err); } Err(err) => { - ::log::error!("accept connection: {:?}", err); + ::log::debug!("connection closed: {:#}", err); } } - } - Ok(()) + self.connection_handles.borrow_mut().remove(connection_id); + } } async fn clean_connections( @@ -176,32 +251,41 @@ async fn clean_connections( fn create_tcp_listener( config: &Config, priv_dropper: PrivilegeDropper, + address: SocketAddr, ) -> anyhow::Result { - let domain = if config.network.address.is_ipv4() { - socket2::Domain::IPV4 + let socket = if address.is_ipv4() { + socket2::Socket::new( + socket2::Domain::IPV4, + socket2::Type::STREAM, + Some(socket2::Protocol::TCP), + )? } else { - socket2::Domain::IPV6 - }; - - let socket = socket2::Socket::new(domain, socket2::Type::STREAM, Some(socket2::Protocol::TCP))?; + let socket = socket2::Socket::new( + socket2::Domain::IPV6, + socket2::Type::STREAM, + Some(socket2::Protocol::TCP), + )?; + + if config.network.set_only_ipv6 { + socket + .set_only_v6(true) + .with_context(|| "socket: set only ipv6")?; + } - if config.network.only_ipv6 { socket - .set_only_v6(true) - .with_context(|| "socket: set only ipv6")?; - } + }; socket .set_reuse_port(true) .with_context(|| "socket: set reuse port")?; socket - .bind(&config.network.address.into()) - .with_context(|| format!("socket: bind to {}", config.network.address))?; + .bind(&address.into()) + .with_context(|| format!("socket: bind to {}", address))?; socket .listen(config.network.tcp_backlog) - .with_context(|| format!("socket: listen on {}", config.network.address))?; + .with_context(|| format!("socket: listen on {}", address))?; priv_dropper.after_socket_creation()?;