From 4c831643b168dd2ed5c7b64f1baa688622f0d74e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 8 Jan 2024 18:41:24 +0100 Subject: [PATCH 1/8] ws: remove ineffective backpressure implementation --- crates/ws/src/workers/socket/connection.rs | 33 +--------------------- 1 file changed, 1 insertion(+), 32 deletions(-) diff --git a/crates/ws/src/workers/socket/connection.rs b/crates/ws/src/workers/socket/connection.rs index fca6537e..74908e94 100644 --- a/crates/ws/src/workers/socket/connection.rs +++ b/crates/ws/src/workers/socket/connection.rs @@ -17,7 +17,7 @@ use futures::{AsyncWriteExt, StreamExt}; use futures_lite::future::race; use futures_rustls::TlsAcceptor; use glommio::channels::channel_mesh::Senders; -use glommio::channels::local_channel::{new_bounded, LocalReceiver, LocalSender}; +use glommio::channels::local_channel::{LocalReceiver, LocalSender}; use glommio::net::TcpStream; use glommio::timer::timeout; use glommio::{enclose, prelude::*}; @@ -35,13 +35,6 @@ use crate::workers::socket::calculate_in_message_consumer_index; #[cfg(feature = "metrics")] use crate::workers::socket::{ip_version_to_metrics_str, WORKER_INDEX}; -/// Length of ConnectionReader backpressure channel -/// -/// ConnectionReader awaits a message in a channel before proceeding with -/// reading a request. For each response sent, a message is sent to the -/// channel, up to a maximum of this constant. -const READ_PASS_CHANNEL_LEN: usize = 4; - pub struct ConnectionRunner { pub config: Rc, pub access_list: Arc, @@ -168,17 +161,6 @@ impl ConnectionRunner { let pending_scrape_slab = Rc::new(RefCell::new(Slab::new())); let access_list_cache = create_access_list_cache(&self.access_list); - let (read_pass_sender, read_pass_receiver) = new_bounded(READ_PASS_CHANNEL_LEN); - - for _ in 0..READ_PASS_CHANNEL_LEN { - if let Err(err) = read_pass_sender.try_send(()) { - panic!( - "couldn't add initial entries to read pass channel: {:#}", - err - ) - }; - } - let config = self.config.clone(); let reader_future = enclose!((pending_scrape_slab, clean_up_data) async move { @@ -187,7 +169,6 @@ impl ConnectionRunner { access_list_cache, in_message_senders: self.in_message_senders, out_message_sender: self.out_message_sender, - read_pass_receiver, pending_scrape_slab, out_message_consumer_id: self.out_message_consumer_id, ws_in, @@ -217,7 +198,6 @@ impl ConnectionRunner { let mut writer = ConnectionWriter { config, out_message_receiver: self.out_message_receiver, - read_pass_sender, connection_valid_until: self.connection_valid_until, ws_out, pending_scrape_slab, @@ -238,7 +218,6 @@ struct ConnectionReader { access_list_cache: AccessListCache, in_message_senders: Rc>, out_message_sender: Rc>, - read_pass_receiver: LocalReceiver<()>, pending_scrape_slab: Rc>>, out_message_consumer_id: ConsumerId, ws_in: SplitStream>, @@ -254,11 +233,6 @@ struct ConnectionReader { impl ConnectionReader { async fn run_in_message_loop(&mut self) -> anyhow::Result<()> { loop { - self.read_pass_receiver - .recv() - .await - .ok_or_else(|| anyhow::anyhow!("read pass channel closed"))?; - let message = self .ws_in .next() @@ -496,7 +470,6 @@ impl ConnectionReader { struct ConnectionWriter { config: Rc, out_message_receiver: LocalReceiver<(OutMessageMeta, OutMessage)>, - read_pass_sender: LocalSender<()>, connection_valid_until: Rc>, ws_out: SplitSink, tungstenite::Message>, pending_scrape_slab: Rc>>, @@ -549,10 +522,6 @@ impl ConnectionWriter { } }; - if let Err(GlommioError::Closed(_)) = self.read_pass_sender.try_send(()) { - return Err(anyhow::anyhow!("read pass channel closed")); - } - yield_if_needed().await; } } From 36954e5f487c39b9b5b715049dfc15083764c38f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 8 Jan 2024 18:50:17 +0100 Subject: [PATCH 2/8] ws: SwarmControlMessage::ConnectionClosed: use Vec for info hashes --- crates/ws/src/common.rs | 5 ++--- crates/ws/src/workers/socket/connection.rs | 20 ++++++++++++-------- crates/ws/src/workers/swarm/mod.rs | 11 ++++++----- 3 files changed, 20 insertions(+), 16 deletions(-) diff --git a/crates/ws/src/common.rs b/crates/ws/src/common.rs index dae682f9..8c41fb19 100644 --- a/crates/ws/src/common.rs +++ b/crates/ws/src/common.rs @@ -67,11 +67,10 @@ impl Into for InMessageMeta { } } -#[derive(Clone, Copy, Debug)] +#[derive(Clone, Debug)] pub enum SwarmControlMessage { ConnectionClosed { - info_hash: InfoHash, - peer_id: PeerId, ip_version: IpVersion, + announced_info_hashes: Vec<(InfoHash, PeerId)>, }, } diff --git a/crates/ws/src/workers/socket/connection.rs b/crates/ws/src/workers/socket/connection.rs index 74908e94..bf241865 100644 --- a/crates/ws/src/workers/socket/connection.rs +++ b/crates/ws/src/workers/socket/connection.rs @@ -599,19 +599,23 @@ impl ConnectionCleanupData { config: &Config, control_message_senders: Rc>, ) { - // Use RefCell::take to avoid issues with Rc borrow across await - let announced_info_hashes = self.announced_info_hashes.take(); + let mut announced_info_hashes = HashMap::new(); - // Tell swarm workers to remove peer - for (info_hash, peer_id) in announced_info_hashes.into_iter() { + for (info_hash, peer_id) in self.announced_info_hashes.take().into_iter() { + let consumer_index = calculate_in_message_consumer_index(&config, info_hash); + + announced_info_hashes + .entry(consumer_index) + .or_insert(Vec::new()) + .push((info_hash, peer_id)); + } + + for (consumer_index, announced_info_hashes) in announced_info_hashes.into_iter() { let message = SwarmControlMessage::ConnectionClosed { - info_hash, - peer_id, ip_version: self.ip_version, + announced_info_hashes, }; - let consumer_index = calculate_in_message_consumer_index(&config, info_hash); - control_message_senders .send_to(consumer_index, message) .await diff --git a/crates/ws/src/workers/swarm/mod.rs b/crates/ws/src/workers/swarm/mod.rs index 8fff3377..599ad673 100644 --- a/crates/ws/src/workers/swarm/mod.rs +++ b/crates/ws/src/workers/swarm/mod.rs @@ -102,13 +102,14 @@ where while let Some(message) = stream.next().await { match message { SwarmControlMessage::ConnectionClosed { - info_hash, - peer_id, ip_version, + announced_info_hashes, } => { - torrents - .borrow_mut() - .handle_connection_closed(info_hash, peer_id, ip_version); + let mut torrents = torrents.borrow_mut(); + + for (info_hash, peer_id) in announced_info_hashes { + torrents.handle_connection_closed(info_hash, peer_id, ip_version); + } } } } From d4c95d89c0f8c4a56326912193682c0aa5ec2330 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 8 Jan 2024 18:54:11 +0100 Subject: [PATCH 3/8] ws: swarm: remove peer ValidUntil update task, just create when needed --- crates/ws/src/workers/swarm/mod.rs | 18 ------------------ crates/ws/src/workers/swarm/storage.rs | 3 ++- 2 files changed, 2 insertions(+), 19 deletions(-) diff --git a/crates/ws/src/workers/swarm/mod.rs b/crates/ws/src/workers/swarm/mod.rs index 599ad673..9c1f601d 100644 --- a/crates/ws/src/workers/swarm/mod.rs +++ b/crates/ws/src/workers/swarm/mod.rs @@ -125,25 +125,8 @@ async fn handle_request_stream( S: futures_lite::Stream + ::std::marker::Unpin, { let rng = Rc::new(RefCell::new(SmallRng::from_entropy())); - - let max_peer_age = config.cleaning.max_peer_age; - let peer_valid_until = Rc::new(RefCell::new(ValidUntil::new( - server_start_instant, - max_peer_age, - ))); - - // Periodically update peer_valid_until - TimerActionRepeat::repeat(enclose!((peer_valid_until) move || { - enclose!((peer_valid_until) move || async move { - *peer_valid_until.borrow_mut() = ValidUntil::new(server_start_instant, max_peer_age); - - Some(Duration::from_secs(1)) - })() - })); - let config = &config; let torrents = &torrents; - let peer_valid_until = &peer_valid_until; let rng = &rng; let out_message_senders = &out_message_senders; @@ -160,7 +143,6 @@ async fn handle_request_stream( &mut rng.borrow_mut(), &mut out_messages, server_start_instant, - peer_valid_until.borrow().to_owned(), meta, request, ) diff --git a/crates/ws/src/workers/swarm/storage.rs b/crates/ws/src/workers/swarm/storage.rs index 7ad311db..43c99b3a 100644 --- a/crates/ws/src/workers/swarm/storage.rs +++ b/crates/ws/src/workers/swarm/storage.rs @@ -62,7 +62,6 @@ impl TorrentMaps { rng: &mut SmallRng, out_messages: &mut Vec<(OutMessageMeta, OutMessage)>, server_start_instant: ServerStartInstant, - valid_until: ValidUntil, request_sender_meta: InMessageMeta, request: AnnounceRequest, ) { @@ -72,6 +71,8 @@ impl TorrentMaps { self.ipv6.entry(request.info_hash).or_default() }; + let valid_until = ValidUntil::new(server_start_instant, config.cleaning.max_peer_age); + // If there is already a peer with this peer_id, check that connection id // is same as that of request sender. Otherwise, ignore request. Since // peers have access to each others peer_id's, they could send requests From 2279e8390e6045cc00172d04fcd55fe7ec2ea149 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 8 Jan 2024 19:09:17 +0100 Subject: [PATCH 4/8] ws load test: send answers regarding correct info_hash --- crates/ws_load_test/src/network.rs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/crates/ws_load_test/src/network.rs b/crates/ws_load_test/src/network.rs index 2a03ad26..0ec7d37e 100644 --- a/crates/ws_load_test/src/network.rs +++ b/crates/ws_load_test/src/network.rs @@ -6,7 +6,7 @@ use std::{ time::Duration, }; -use aquatic_ws_protocol::{InMessage, OfferId, OutMessage, PeerId, RtcAnswer, RtcAnswerType}; +use aquatic_ws_protocol::{InMessage, OfferId, OutMessage, PeerId, RtcAnswer, RtcAnswerType, InfoHash}; use async_tungstenite::{client_async, WebSocketStream}; use futures::{SinkExt, StreamExt}; use futures_rustls::{client::TlsStream, TlsConnector}; @@ -66,7 +66,7 @@ struct Connection { rng: SmallRng, can_send: bool, peer_id: PeerId, - send_answer: Option<(PeerId, OfferId)>, + send_answer: Option<(InfoHash, PeerId, OfferId)>, stream: WebSocketStream>, } @@ -131,7 +131,8 @@ impl Connection { // If self.send_answer is set and request is announce request, make // the request an offer answer let request = if let InMessage::AnnounceRequest(mut r) = request { - if let Some((peer_id, offer_id)) = self.send_answer { + if let Some((info_hash, peer_id, offer_id)) = self.send_answer { + r.info_hash = info_hash; r.answer_to_peer_id = Some(peer_id); r.answer_offer_id = Some(offer_id); r.answer = Some(RtcAnswer { @@ -142,13 +143,13 @@ impl Connection { r.offers = None; } - self.send_answer = None; - InMessage::AnnounceRequest(r) } else { request }; + self.send_answer = None; + self.stream.send(request.to_ws_message()).await?; self.load_test_state @@ -190,7 +191,7 @@ impl Connection { .responses_offer .fetch_add(1, Ordering::Relaxed); - self.send_answer = Some((offer.peer_id, offer.offer_id)); + self.send_answer = Some((offer.info_hash, offer.peer_id, offer.offer_id)); self.can_send = true; } From 64926ba46a0d231469465a2c8295024cccb852cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 8 Jan 2024 19:19:43 +0100 Subject: [PATCH 5/8] ws load test: clean up, slight code refactor --- crates/ws_load_test/src/network.rs | 118 ++++++++++++++--------------- 1 file changed, 55 insertions(+), 63 deletions(-) diff --git a/crates/ws_load_test/src/network.rs b/crates/ws_load_test/src/network.rs index 0ec7d37e..7cdbb7ba 100644 --- a/crates/ws_load_test/src/network.rs +++ b/crates/ws_load_test/src/network.rs @@ -6,7 +6,9 @@ use std::{ time::Duration, }; -use aquatic_ws_protocol::{InMessage, OfferId, OutMessage, PeerId, RtcAnswer, RtcAnswerType, InfoHash}; +use aquatic_ws_protocol::{ + InMessage, InfoHash, OfferId, OutMessage, PeerId, RtcAnswer, RtcAnswerType, +}; use async_tungstenite::{client_async, WebSocketStream}; use futures::{SinkExt, StreamExt}; use futures_rustls::{client::TlsStream, TlsConnector}; @@ -23,6 +25,8 @@ pub async fn run_socket_thread( ) -> anyhow::Result<()> { let config = Rc::new(config); let num_active_connections = Rc::new(RefCell::new(0usize)); + let connection_creation_interval = + Duration::from_millis(config.connection_creation_interval_ms); TimerActionRepeat::repeat(move || { periodically_open_connections( @@ -30,12 +34,12 @@ pub async fn run_socket_thread( tls_config.clone(), load_test_state.clone(), num_active_connections.clone(), + connection_creation_interval, ) - }); - - futures::future::pending::().await; - - Ok(()) + }) + .join() + .await + .ok_or_else(|| anyhow::anyhow!("connection opener timer cancelled")) } async fn periodically_open_connections( @@ -43,9 +47,8 @@ async fn periodically_open_connections( tls_config: Arc, load_test_state: LoadTestState, num_active_connections: Rc>, + connection_creation_interval: Duration, ) -> Option { - let wait = Duration::from_millis(config.connection_creation_interval_ms); - if *num_active_connections.borrow() < config.num_connections_per_worker { spawn_local(async move { if let Err(err) = @@ -57,16 +60,15 @@ async fn periodically_open_connections( .detach(); } - Some(wait) + Some(connection_creation_interval) } struct Connection { config: Rc, load_test_state: LoadTestState, rng: SmallRng, - can_send: bool, peer_id: PeerId, - send_answer: Option<(InfoHash, PeerId, OfferId)>, + can_send_answer: Option<(InfoHash, PeerId, OfferId)>, stream: WebSocketStream>, } @@ -99,9 +101,8 @@ impl Connection { load_test_state, rng, stream, - can_send: true, peer_id, - send_answer: None, + can_send_answer: None, }; *num_active_connections.borrow_mut() += 1; @@ -119,49 +120,50 @@ impl Connection { async fn run_connection_loop(&mut self) -> anyhow::Result<()> { loop { - if self.can_send { - let request = create_random_request( - &self.config, - &self.load_test_state, - &mut self.rng, - self.peer_id, - self.send_answer.is_none(), - ); - - // If self.send_answer is set and request is announce request, make - // the request an offer answer - let request = if let InMessage::AnnounceRequest(mut r) = request { - if let Some((info_hash, peer_id, offer_id)) = self.send_answer { - r.info_hash = info_hash; - r.answer_to_peer_id = Some(peer_id); - r.answer_offer_id = Some(offer_id); - r.answer = Some(RtcAnswer { - t: RtcAnswerType::Answer, - sdp: "abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-".into() - }); - r.event = None; - r.offers = None; - } - - InMessage::AnnounceRequest(r) - } else { - request - }; - - self.send_answer = None; - - self.stream.send(request.to_ws_message()).await?; + self.send_message().await?; + self.read_message().await?; + } + } - self.load_test_state - .statistics - .requests - .fetch_add(1, Ordering::Relaxed); + async fn send_message(&mut self) -> anyhow::Result<()> { + let request = create_random_request( + &self.config, + &self.load_test_state, + &mut self.rng, + self.peer_id, + self.can_send_answer.is_none(), + ); - self.can_send = false; + // If self.send_answer is set and request is announce request, make + // the request an offer answer + let request = if let InMessage::AnnounceRequest(mut r) = request { + if let Some((info_hash, peer_id, offer_id)) = self.can_send_answer { + r.info_hash = info_hash; + r.answer_to_peer_id = Some(peer_id); + r.answer_offer_id = Some(offer_id); + r.answer = Some(RtcAnswer { + t: RtcAnswerType::Answer, + sdp: "abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-".into() + }); + r.event = None; + r.offers = None; } - self.read_message().await?; - } + self.can_send_answer = None; + + InMessage::AnnounceRequest(r) + } else { + request + }; + + self.stream.send(request.to_ws_message()).await?; + + self.load_test_state + .statistics + .requests + .fetch_add(1, Ordering::Relaxed); + + Ok(()) } async fn read_message(&mut self) -> anyhow::Result<()> { @@ -191,33 +193,25 @@ impl Connection { .responses_offer .fetch_add(1, Ordering::Relaxed); - self.send_answer = Some((offer.info_hash, offer.peer_id, offer.offer_id)); - - self.can_send = true; + self.can_send_answer = Some((offer.info_hash, offer.peer_id, offer.offer_id)); } Ok(OutMessage::AnswerOutMessage(_)) => { self.load_test_state .statistics .responses_answer .fetch_add(1, Ordering::Relaxed); - - self.can_send = true; } Ok(OutMessage::AnnounceResponse(_)) => { self.load_test_state .statistics .responses_announce .fetch_add(1, Ordering::Relaxed); - - self.can_send = true; } Ok(OutMessage::ScrapeResponse(_)) => { self.load_test_state .statistics .responses_scrape .fetch_add(1, Ordering::Relaxed); - - self.can_send = true; } Ok(OutMessage::ErrorResponse(response)) => { self.load_test_state @@ -226,8 +220,6 @@ impl Connection { .fetch_add(1, Ordering::Relaxed); ::log::warn!("received error response: {:?}", response.failure_reason); - - self.can_send = true; } Err(err) => { ::log::error!("error deserializing message: {:#}", err); From 34167371b09c1e4c628206cff24a3d0debfa3d4f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 8 Jan 2024 19:58:02 +0100 Subject: [PATCH 6/8] ws load test: refactor network module --- crates/ws_load_test/src/network.rs | 147 ++++++++++++++++++++++------- crates/ws_load_test/src/utils.rs | 93 +----------------- 2 files changed, 115 insertions(+), 125 deletions(-) diff --git a/crates/ws_load_test/src/network.rs b/crates/ws_load_test/src/network.rs index 7cdbb7ba..ee045fa4 100644 --- a/crates/ws_load_test/src/network.rs +++ b/crates/ws_load_test/src/network.rs @@ -7,7 +7,9 @@ use std::{ }; use aquatic_ws_protocol::{ - InMessage, InfoHash, OfferId, OutMessage, PeerId, RtcAnswer, RtcAnswerType, + AnnounceAction, AnnounceEvent, AnnounceRequest, AnnounceRequestOffer, InMessage, InfoHash, + OfferId, OutMessage, PeerId, RtcAnswer, RtcAnswerType, RtcOffer, RtcOfferType, ScrapeAction, + ScrapeRequest, ScrapeRequestInfoHashes, }; use async_tungstenite::{client_async, WebSocketStream}; use futures::{SinkExt, StreamExt}; @@ -15,8 +17,13 @@ use futures_rustls::{client::TlsStream, TlsConnector}; use glommio::net::TcpStream; use glommio::{prelude::*, timer::TimerActionRepeat}; use rand::{prelude::SmallRng, Rng, SeedableRng}; +use rand_distr::{Distribution, WeightedIndex}; -use crate::{common::LoadTestState, config::Config, utils::create_random_request}; +use crate::{ + common::{LoadTestState, RequestType}, + config::Config, + utils::select_info_hash_index, +}; pub async fn run_socket_thread( config: Config, @@ -24,6 +31,7 @@ pub async fn run_socket_thread( load_test_state: LoadTestState, ) -> anyhow::Result<()> { let config = Rc::new(config); + let rng = Rc::new(RefCell::new(SmallRng::from_entropy())); let num_active_connections = Rc::new(RefCell::new(0usize)); let connection_creation_interval = Duration::from_millis(config.connection_creation_interval_ms); @@ -34,12 +42,14 @@ pub async fn run_socket_thread( tls_config.clone(), load_test_state.clone(), num_active_connections.clone(), + rng.clone(), connection_creation_interval, ) }) .join() - .await - .ok_or_else(|| anyhow::anyhow!("connection opener timer cancelled")) + .await; + + Ok(()) } async fn periodically_open_connections( @@ -47,12 +57,19 @@ async fn periodically_open_connections( tls_config: Arc, load_test_state: LoadTestState, num_active_connections: Rc>, + rng: Rc>, connection_creation_interval: Duration, ) -> Option { if *num_active_connections.borrow() < config.num_connections_per_worker { spawn_local(async move { - if let Err(err) = - Connection::run(config, tls_config, load_test_state, num_active_connections).await + if let Err(err) = Connection::run( + config, + tls_config, + load_test_state, + num_active_connections, + rng, + ) + .await { ::log::info!("connection creation error: {:#}", err); } @@ -66,7 +83,7 @@ async fn periodically_open_connections( struct Connection { config: Rc, load_test_state: LoadTestState, - rng: SmallRng, + rng: Rc>, peer_id: PeerId, can_send_answer: Option<(InfoHash, PeerId, OfferId)>, stream: WebSocketStream>, @@ -78,9 +95,9 @@ impl Connection { tls_config: Arc, load_test_state: LoadTestState, num_active_connections: Rc>, + rng: Rc>, ) -> anyhow::Result<()> { - let mut rng = SmallRng::from_entropy(); - let peer_id = PeerId(rng.gen()); + let peer_id = PeerId(rng.borrow_mut().gen()); let stream = TcpStream::connect(config.server_address) .await .map_err(|err| anyhow::anyhow!("connect: {:?}", err))?; @@ -126,36 +143,87 @@ impl Connection { } async fn send_message(&mut self) -> anyhow::Result<()> { - let request = create_random_request( - &self.config, - &self.load_test_state, - &mut self.rng, - self.peer_id, - self.can_send_answer.is_none(), - ); + let mut rng = self.rng.borrow_mut(); + + let request = match random_request_type(&self.config, &mut *rng) { + RequestType::Announce => { + let (event, bytes_left) = { + if rng.gen_bool(self.config.torrents.peer_seeder_probability) { + (AnnounceEvent::Completed, 0) + } else { + (AnnounceEvent::Started, 50) + } + }; + + const SDP: &str = "abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg"; + + if let Some((info_hash, peer_id, offer_id)) = self.can_send_answer { + InMessage::AnnounceRequest(AnnounceRequest { + info_hash, + answer_to_peer_id: Some(peer_id), + answer_offer_id: Some(offer_id), + answer: Some(RtcAnswer { + t: RtcAnswerType::Answer, + sdp: SDP.into(), + }), + event: None, + offers: None, + action: aquatic_ws_protocol::AnnounceAction::Announce, + peer_id: self.peer_id, + bytes_left: Some(bytes_left), + numwant: Some(0), + }) + } else { + let info_hash_index = + select_info_hash_index(&self.config, &self.load_test_state, &mut *rng); + + let mut offers = Vec::with_capacity(self.config.torrents.offers_per_request); + + for _ in 0..self.config.torrents.offers_per_request { + offers.push(AnnounceRequestOffer { + offer_id: OfferId(rng.gen()), + offer: RtcOffer { + t: RtcOfferType::Offer, + sdp: SDP.into(), + }, + }) + } - // If self.send_answer is set and request is announce request, make - // the request an offer answer - let request = if let InMessage::AnnounceRequest(mut r) = request { - if let Some((info_hash, peer_id, offer_id)) = self.can_send_answer { - r.info_hash = info_hash; - r.answer_to_peer_id = Some(peer_id); - r.answer_offer_id = Some(offer_id); - r.answer = Some(RtcAnswer { - t: RtcAnswerType::Answer, - sdp: "abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-".into() - }); - r.event = None; - r.offers = None; + InMessage::AnnounceRequest(AnnounceRequest { + action: AnnounceAction::Announce, + info_hash: self.load_test_state.info_hashes[info_hash_index], + peer_id: self.peer_id, + bytes_left: Some(bytes_left), + event: Some(event), + numwant: Some(offers.len()), + offers: Some(offers), + answer: None, + answer_to_peer_id: None, + answer_offer_id: None, + }) + } } + RequestType::Scrape => { + let mut scrape_hashes = Vec::with_capacity(5); - self.can_send_answer = None; + for _ in 0..5 { + let info_hash_index = + select_info_hash_index(&self.config, &self.load_test_state, &mut *rng); - InMessage::AnnounceRequest(r) - } else { - request + scrape_hashes.push(self.load_test_state.info_hashes[info_hash_index]); + } + + InMessage::ScrapeRequest(ScrapeRequest { + action: ScrapeAction::Scrape, + info_hashes: Some(ScrapeRequestInfoHashes::Multiple(scrape_hashes)), + }) + } }; + drop(rng); + + self.can_send_answer = None; + self.stream.send(request.to_ws_message()).await?; self.load_test_state @@ -229,3 +297,16 @@ impl Connection { Ok(()) } } + +pub fn random_request_type(config: &Config, rng: &mut impl Rng) -> RequestType { + let weights = [ + config.torrents.weight_announce as u32, + config.torrents.weight_scrape as u32, + ]; + + let items = [RequestType::Announce, RequestType::Scrape]; + + let dist = WeightedIndex::new(&weights).expect("random request weighted index"); + + items[dist.sample(rng)] +} diff --git a/crates/ws_load_test/src/utils.rs b/crates/ws_load_test/src/utils.rs index bb174438..1c5c5822 100644 --- a/crates/ws_load_test/src/utils.rs +++ b/crates/ws_load_test/src/utils.rs @@ -1,104 +1,13 @@ use std::sync::Arc; -use rand::distributions::WeightedIndex; use rand::prelude::*; use rand_distr::Gamma; use crate::common::*; use crate::config::*; -pub fn create_random_request( - config: &Config, - state: &LoadTestState, - rng: &mut impl Rng, - peer_id: PeerId, - announce_gen_offers: bool, -) -> InMessage { - let weights = [ - config.torrents.weight_announce as u32, - config.torrents.weight_scrape as u32, - ]; - - let items = [RequestType::Announce, RequestType::Scrape]; - - let dist = WeightedIndex::new(&weights).expect("random request weighted index"); - - match items[dist.sample(rng)] { - RequestType::Announce => { - create_announce_request(config, state, rng, peer_id, announce_gen_offers) - } - RequestType::Scrape => create_scrape_request(config, state, rng), - } -} - -#[inline] -fn create_announce_request( - config: &Config, - state: &LoadTestState, - rng: &mut impl Rng, - peer_id: PeerId, - gen_offers: bool, -) -> InMessage { - let (event, bytes_left) = { - if rng.gen_bool(config.torrents.peer_seeder_probability) { - (AnnounceEvent::Completed, 0) - } else { - (AnnounceEvent::Started, 50) - } - }; - - let info_hash_index = select_info_hash_index(config, &state, rng); - - let offers = if gen_offers { - let mut offers = Vec::with_capacity(config.torrents.offers_per_request); - - for _ in 0..config.torrents.offers_per_request { - offers.push(AnnounceRequestOffer { - offer_id: OfferId(rng.gen()), - offer: RtcOffer { - t: RtcOfferType::Offer, - sdp: "abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-".into() - }, - }) - } - - offers - } else { - Vec::new() - }; - - InMessage::AnnounceRequest(AnnounceRequest { - action: AnnounceAction::Announce, - info_hash: state.info_hashes[info_hash_index], - peer_id, - bytes_left: Some(bytes_left), - event: Some(event), - numwant: Some(offers.len()), - offers: Some(offers), - answer: None, - answer_to_peer_id: None, - answer_offer_id: None, - }) -} - -#[inline] -fn create_scrape_request(config: &Config, state: &LoadTestState, rng: &mut impl Rng) -> InMessage { - let mut scrape_hashes = Vec::with_capacity(5); - - for _ in 0..5 { - let info_hash_index = select_info_hash_index(config, &state, rng); - - scrape_hashes.push(state.info_hashes[info_hash_index]); - } - - InMessage::ScrapeRequest(ScrapeRequest { - action: ScrapeAction::Scrape, - info_hashes: Some(ScrapeRequestInfoHashes::Multiple(scrape_hashes)), - }) -} - #[inline] -fn select_info_hash_index(config: &Config, state: &LoadTestState, rng: &mut impl Rng) -> usize { +pub fn select_info_hash_index(config: &Config, state: &LoadTestState, rng: &mut impl Rng) -> usize { gamma_usize(rng, &state.gamma, config.torrents.number_of_torrents - 1) } From 27ecccd98442c298d93b45c2c9e6260dc88f2966 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 8 Jan 2024 20:07:15 +0100 Subject: [PATCH 7/8] ws load test: store global info hashes as Arc<[InfoHash]> --- crates/ws_load_test/src/common.rs | 2 +- crates/ws_load_test/src/main.rs | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/ws_load_test/src/common.rs b/crates/ws_load_test/src/common.rs index ceecacb5..1e8266de 100644 --- a/crates/ws_load_test/src/common.rs +++ b/crates/ws_load_test/src/common.rs @@ -18,7 +18,7 @@ pub struct Statistics { #[derive(Clone)] pub struct LoadTestState { - pub info_hashes: Arc>, + pub info_hashes: Arc<[InfoHash]>, pub statistics: Arc, pub gamma: Arc>, } diff --git a/crates/ws_load_test/src/main.rs b/crates/ws_load_test/src/main.rs index 479f2aa8..8be81ccb 100644 --- a/crates/ws_load_test/src/main.rs +++ b/crates/ws_load_test/src/main.rs @@ -36,10 +36,10 @@ fn run(config: Config) -> ::anyhow::Result<()> { println!("Starting client with config: {:#?}", config); - let mut info_hashes = Vec::with_capacity(config.torrents.number_of_torrents); - let mut rng = SmallRng::from_entropy(); + let mut info_hashes = Vec::with_capacity(config.torrents.number_of_torrents); + for _ in 0..config.torrents.number_of_torrents { info_hashes.push(InfoHash(rng.gen())); } @@ -51,7 +51,7 @@ fn run(config: Config) -> ::anyhow::Result<()> { .unwrap(); let state = LoadTestState { - info_hashes: Arc::new(info_hashes), + info_hashes: Arc::from(info_hashes.into_boxed_slice()), statistics: Arc::new(Statistics::default()), gamma: Arc::new(gamma), }; From 0dae7fd533776c7fcb85e43a4ba8632542bb86f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 8 Jan 2024 20:16:46 +0100 Subject: [PATCH 8/8] ws protocol: remove glob exports; adjust dependent crates --- crates/ws/src/common.rs | 2 +- crates/ws/src/workers/socket/connection.rs | 8 +++++++- crates/ws/src/workers/socket/mod.rs | 4 +++- crates/ws/src/workers/swarm/mod.rs | 3 ++- crates/ws/src/workers/swarm/storage.rs | 7 ++++++- crates/ws_load_test/src/common.rs | 3 +-- crates/ws_load_test/src/main.rs | 1 + crates/ws_load_test/src/network.rs | 14 ++++++++++---- .../bench_deserialize_announce_request.rs | 5 ++++- crates/ws_protocol/src/lib.rs | 16 +++++++++++----- 10 files changed, 46 insertions(+), 17 deletions(-) diff --git a/crates/ws/src/common.rs b/crates/ws/src/common.rs index 8c41fb19..32405460 100644 --- a/crates/ws/src/common.rs +++ b/crates/ws/src/common.rs @@ -3,7 +3,7 @@ use std::{net::IpAddr, sync::Arc}; use aquatic_common::access_list::AccessListArcSwap; pub use aquatic_common::ValidUntil; -use aquatic_ws_protocol::{InfoHash, PeerId}; +use aquatic_ws_protocol::common::{InfoHash, PeerId}; #[derive(Copy, Clone, Debug)] pub enum IpVersion { diff --git a/crates/ws/src/workers/socket/connection.rs b/crates/ws/src/workers/socket/connection.rs index bf241865..82d93ef8 100644 --- a/crates/ws/src/workers/socket/connection.rs +++ b/crates/ws/src/workers/socket/connection.rs @@ -9,7 +9,13 @@ use anyhow::Context; use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache}; use aquatic_common::rustls_config::RustlsConfig; use aquatic_common::ServerStartInstant; -use aquatic_ws_protocol::*; +use aquatic_ws_protocol::common::{InfoHash, PeerId, ScrapeAction}; +use aquatic_ws_protocol::incoming::{ + AnnounceEvent, AnnounceRequest, InMessage, ScrapeRequest, ScrapeRequestInfoHashes, +}; +use aquatic_ws_protocol::outgoing::{ + ErrorResponse, ErrorResponseAction, OutMessage, ScrapeResponse, ScrapeStatistics, +}; use arc_swap::ArcSwap; use async_tungstenite::WebSocketStream; use futures::stream::{SplitSink, SplitStream}; diff --git a/crates/ws/src/workers/socket/mod.rs b/crates/ws/src/workers/socket/mod.rs index eb284ad6..03e17c59 100644 --- a/crates/ws/src/workers/socket/mod.rs +++ b/crates/ws/src/workers/socket/mod.rs @@ -8,7 +8,9 @@ use anyhow::Context; use aquatic_common::privileges::PrivilegeDropper; use aquatic_common::rustls_config::RustlsConfig; use aquatic_common::{PanicSentinel, ServerStartInstant}; -use aquatic_ws_protocol::*; +use aquatic_ws_protocol::common::InfoHash; +use aquatic_ws_protocol::incoming::InMessage; +use aquatic_ws_protocol::outgoing::OutMessage; use arc_swap::ArcSwap; use futures::StreamExt; use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role}; diff --git a/crates/ws/src/workers/swarm/mod.rs b/crates/ws/src/workers/swarm/mod.rs index 9c1f601d..7788d122 100644 --- a/crates/ws/src/workers/swarm/mod.rs +++ b/crates/ws/src/workers/swarm/mod.rs @@ -4,6 +4,8 @@ use std::cell::RefCell; use std::rc::Rc; use std::time::Duration; +use aquatic_ws_protocol::incoming::InMessage; +use aquatic_ws_protocol::outgoing::OutMessage; use futures::StreamExt; use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders}; use glommio::enclose; @@ -12,7 +14,6 @@ use glommio::timer::TimerActionRepeat; use rand::{rngs::SmallRng, SeedableRng}; use aquatic_common::{PanicSentinel, ServerStartInstant}; -use aquatic_ws_protocol::*; use crate::common::*; use crate::config::Config; diff --git a/crates/ws/src/workers/swarm/storage.rs b/crates/ws/src/workers/swarm/storage.rs index 43c99b3a..37cf1538 100644 --- a/crates/ws/src/workers/swarm/storage.rs +++ b/crates/ws/src/workers/swarm/storage.rs @@ -1,6 +1,11 @@ use std::sync::Arc; use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache}; +use aquatic_ws_protocol::incoming::{AnnounceEvent, AnnounceRequest, ScrapeRequest}; +use aquatic_ws_protocol::outgoing::{ + AnnounceResponse, AnswerOutMessage, ErrorResponse, ErrorResponseAction, OfferOutMessage, + OutMessage, ScrapeResponse, ScrapeStatistics, +}; use hashbrown::HashMap; use metrics::Gauge; use rand::rngs::SmallRng; @@ -8,7 +13,7 @@ use rand::rngs::SmallRng; use aquatic_common::{ extract_response_peers, IndexMap, SecondsSinceServerStart, ServerStartInstant, }; -use aquatic_ws_protocol::*; +use aquatic_ws_protocol::common::*; use crate::common::*; use crate::config::Config; diff --git a/crates/ws_load_test/src/common.rs b/crates/ws_load_test/src/common.rs index 1e8266de..d1d04c5f 100644 --- a/crates/ws_load_test/src/common.rs +++ b/crates/ws_load_test/src/common.rs @@ -1,9 +1,8 @@ use std::sync::{atomic::AtomicUsize, Arc}; +use aquatic_ws_protocol::common::InfoHash; use rand_distr::Gamma; -pub use aquatic_ws_protocol::*; - #[derive(Default)] pub struct Statistics { pub requests: AtomicUsize, diff --git a/crates/ws_load_test/src/main.rs b/crates/ws_load_test/src/main.rs index 8be81ccb..91cd8ff5 100644 --- a/crates/ws_load_test/src/main.rs +++ b/crates/ws_load_test/src/main.rs @@ -4,6 +4,7 @@ use std::time::{Duration, Instant}; use aquatic_common::cpu_pinning::glommio::{get_worker_placement, set_affinity_for_util_worker}; use aquatic_common::cpu_pinning::WorkerIndex; +use aquatic_ws_protocol::common::InfoHash; use glommio::LocalExecutorBuilder; use rand::prelude::*; use rand_distr::Gamma; diff --git a/crates/ws_load_test/src/network.rs b/crates/ws_load_test/src/network.rs index ee045fa4..55da243b 100644 --- a/crates/ws_load_test/src/network.rs +++ b/crates/ws_load_test/src/network.rs @@ -6,10 +6,16 @@ use std::{ time::Duration, }; +use aquatic_ws_protocol::incoming::{ + AnnounceEvent, AnnounceRequest, AnnounceRequestOffer, InMessage, ScrapeRequestInfoHashes, +}; +use aquatic_ws_protocol::outgoing::OutMessage; use aquatic_ws_protocol::{ - AnnounceAction, AnnounceEvent, AnnounceRequest, AnnounceRequestOffer, InMessage, InfoHash, - OfferId, OutMessage, PeerId, RtcAnswer, RtcAnswerType, RtcOffer, RtcOfferType, ScrapeAction, - ScrapeRequest, ScrapeRequestInfoHashes, + common::{ + AnnounceAction, InfoHash, OfferId, PeerId, RtcAnswer, RtcAnswerType, RtcOffer, + RtcOfferType, ScrapeAction, + }, + incoming::ScrapeRequest, }; use async_tungstenite::{client_async, WebSocketStream}; use futures::{SinkExt, StreamExt}; @@ -168,7 +174,7 @@ impl Connection { }), event: None, offers: None, - action: aquatic_ws_protocol::AnnounceAction::Announce, + action: AnnounceAction::Announce, peer_id: self.peer_id, bytes_left: Some(bytes_left), numwant: Some(0), diff --git a/crates/ws_protocol/benches/bench_deserialize_announce_request.rs b/crates/ws_protocol/benches/bench_deserialize_announce_request.rs index 7b961e7e..c239b162 100644 --- a/crates/ws_protocol/benches/bench_deserialize_announce_request.rs +++ b/crates/ws_protocol/benches/bench_deserialize_announce_request.rs @@ -1,7 +1,10 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion}; use std::time::Duration; -use aquatic_ws_protocol::*; +use aquatic_ws_protocol::{ + common::*, + incoming::{AnnounceEvent, AnnounceRequest, AnnounceRequestOffer, InMessage}, +}; pub fn bench(c: &mut Criterion) { let info_hash = InfoHash([ diff --git a/crates/ws_protocol/src/lib.rs b/crates/ws_protocol/src/lib.rs index 8a6bed96..c61681a9 100644 --- a/crates/ws_protocol/src/lib.rs +++ b/crates/ws_protocol/src/lib.rs @@ -14,16 +14,22 @@ pub mod common; pub mod incoming; pub mod outgoing; -pub use common::*; -pub use incoming::*; -pub use outgoing::*; - #[cfg(test)] mod tests { use quickcheck::Arbitrary; use quickcheck_macros::quickcheck; - use super::*; + use crate::{ + common::*, + incoming::{ + AnnounceEvent, AnnounceRequest, AnnounceRequestOffer, InMessage, ScrapeRequest, + ScrapeRequestInfoHashes, + }, + outgoing::{ + AnnounceResponse, AnswerOutMessage, OfferOutMessage, OutMessage, ScrapeResponse, + ScrapeStatistics, + }, + }; fn arbitrary_20_bytes(g: &mut quickcheck::Gen) -> [u8; 20] { let mut bytes = [0u8; 20];