diff --git a/crates/ws/src/common.rs b/crates/ws/src/common.rs index dae682f9..32405460 100644 --- a/crates/ws/src/common.rs +++ b/crates/ws/src/common.rs @@ -3,7 +3,7 @@ use std::{net::IpAddr, sync::Arc}; use aquatic_common::access_list::AccessListArcSwap; pub use aquatic_common::ValidUntil; -use aquatic_ws_protocol::{InfoHash, PeerId}; +use aquatic_ws_protocol::common::{InfoHash, PeerId}; #[derive(Copy, Clone, Debug)] pub enum IpVersion { @@ -67,11 +67,10 @@ impl Into for InMessageMeta { } } -#[derive(Clone, Copy, Debug)] +#[derive(Clone, Debug)] pub enum SwarmControlMessage { ConnectionClosed { - info_hash: InfoHash, - peer_id: PeerId, ip_version: IpVersion, + announced_info_hashes: Vec<(InfoHash, PeerId)>, }, } diff --git a/crates/ws/src/workers/socket/connection.rs b/crates/ws/src/workers/socket/connection.rs index fca6537e..82d93ef8 100644 --- a/crates/ws/src/workers/socket/connection.rs +++ b/crates/ws/src/workers/socket/connection.rs @@ -9,7 +9,13 @@ use anyhow::Context; use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache}; use aquatic_common::rustls_config::RustlsConfig; use aquatic_common::ServerStartInstant; -use aquatic_ws_protocol::*; +use aquatic_ws_protocol::common::{InfoHash, PeerId, ScrapeAction}; +use aquatic_ws_protocol::incoming::{ + AnnounceEvent, AnnounceRequest, InMessage, ScrapeRequest, ScrapeRequestInfoHashes, +}; +use aquatic_ws_protocol::outgoing::{ + ErrorResponse, ErrorResponseAction, OutMessage, ScrapeResponse, ScrapeStatistics, +}; use arc_swap::ArcSwap; use async_tungstenite::WebSocketStream; use futures::stream::{SplitSink, SplitStream}; @@ -17,7 +23,7 @@ use futures::{AsyncWriteExt, StreamExt}; use futures_lite::future::race; use futures_rustls::TlsAcceptor; use glommio::channels::channel_mesh::Senders; -use glommio::channels::local_channel::{new_bounded, LocalReceiver, LocalSender}; +use glommio::channels::local_channel::{LocalReceiver, LocalSender}; use glommio::net::TcpStream; use glommio::timer::timeout; use glommio::{enclose, prelude::*}; @@ -35,13 +41,6 @@ use crate::workers::socket::calculate_in_message_consumer_index; #[cfg(feature = "metrics")] use crate::workers::socket::{ip_version_to_metrics_str, WORKER_INDEX}; -/// Length of ConnectionReader backpressure channel -/// -/// ConnectionReader awaits a message in a channel before proceeding with -/// reading a request. For each response sent, a message is sent to the -/// channel, up to a maximum of this constant. -const READ_PASS_CHANNEL_LEN: usize = 4; - pub struct ConnectionRunner { pub config: Rc, pub access_list: Arc, @@ -168,17 +167,6 @@ impl ConnectionRunner { let pending_scrape_slab = Rc::new(RefCell::new(Slab::new())); let access_list_cache = create_access_list_cache(&self.access_list); - let (read_pass_sender, read_pass_receiver) = new_bounded(READ_PASS_CHANNEL_LEN); - - for _ in 0..READ_PASS_CHANNEL_LEN { - if let Err(err) = read_pass_sender.try_send(()) { - panic!( - "couldn't add initial entries to read pass channel: {:#}", - err - ) - }; - } - let config = self.config.clone(); let reader_future = enclose!((pending_scrape_slab, clean_up_data) async move { @@ -187,7 +175,6 @@ impl ConnectionRunner { access_list_cache, in_message_senders: self.in_message_senders, out_message_sender: self.out_message_sender, - read_pass_receiver, pending_scrape_slab, out_message_consumer_id: self.out_message_consumer_id, ws_in, @@ -217,7 +204,6 @@ impl ConnectionRunner { let mut writer = ConnectionWriter { config, out_message_receiver: self.out_message_receiver, - read_pass_sender, connection_valid_until: self.connection_valid_until, ws_out, pending_scrape_slab, @@ -238,7 +224,6 @@ struct ConnectionReader { access_list_cache: AccessListCache, in_message_senders: Rc>, out_message_sender: Rc>, - read_pass_receiver: LocalReceiver<()>, pending_scrape_slab: Rc>>, out_message_consumer_id: ConsumerId, ws_in: SplitStream>, @@ -254,11 +239,6 @@ struct ConnectionReader { impl ConnectionReader { async fn run_in_message_loop(&mut self) -> anyhow::Result<()> { loop { - self.read_pass_receiver - .recv() - .await - .ok_or_else(|| anyhow::anyhow!("read pass channel closed"))?; - let message = self .ws_in .next() @@ -496,7 +476,6 @@ impl ConnectionReader { struct ConnectionWriter { config: Rc, out_message_receiver: LocalReceiver<(OutMessageMeta, OutMessage)>, - read_pass_sender: LocalSender<()>, connection_valid_until: Rc>, ws_out: SplitSink, tungstenite::Message>, pending_scrape_slab: Rc>>, @@ -549,10 +528,6 @@ impl ConnectionWriter { } }; - if let Err(GlommioError::Closed(_)) = self.read_pass_sender.try_send(()) { - return Err(anyhow::anyhow!("read pass channel closed")); - } - yield_if_needed().await; } } @@ -630,19 +605,23 @@ impl ConnectionCleanupData { config: &Config, control_message_senders: Rc>, ) { - // Use RefCell::take to avoid issues with Rc borrow across await - let announced_info_hashes = self.announced_info_hashes.take(); + let mut announced_info_hashes = HashMap::new(); + + for (info_hash, peer_id) in self.announced_info_hashes.take().into_iter() { + let consumer_index = calculate_in_message_consumer_index(&config, info_hash); - // Tell swarm workers to remove peer - for (info_hash, peer_id) in announced_info_hashes.into_iter() { + announced_info_hashes + .entry(consumer_index) + .or_insert(Vec::new()) + .push((info_hash, peer_id)); + } + + for (consumer_index, announced_info_hashes) in announced_info_hashes.into_iter() { let message = SwarmControlMessage::ConnectionClosed { - info_hash, - peer_id, ip_version: self.ip_version, + announced_info_hashes, }; - let consumer_index = calculate_in_message_consumer_index(&config, info_hash); - control_message_senders .send_to(consumer_index, message) .await diff --git a/crates/ws/src/workers/socket/mod.rs b/crates/ws/src/workers/socket/mod.rs index eb284ad6..03e17c59 100644 --- a/crates/ws/src/workers/socket/mod.rs +++ b/crates/ws/src/workers/socket/mod.rs @@ -8,7 +8,9 @@ use anyhow::Context; use aquatic_common::privileges::PrivilegeDropper; use aquatic_common::rustls_config::RustlsConfig; use aquatic_common::{PanicSentinel, ServerStartInstant}; -use aquatic_ws_protocol::*; +use aquatic_ws_protocol::common::InfoHash; +use aquatic_ws_protocol::incoming::InMessage; +use aquatic_ws_protocol::outgoing::OutMessage; use arc_swap::ArcSwap; use futures::StreamExt; use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role}; diff --git a/crates/ws/src/workers/swarm/mod.rs b/crates/ws/src/workers/swarm/mod.rs index 8fff3377..7788d122 100644 --- a/crates/ws/src/workers/swarm/mod.rs +++ b/crates/ws/src/workers/swarm/mod.rs @@ -4,6 +4,8 @@ use std::cell::RefCell; use std::rc::Rc; use std::time::Duration; +use aquatic_ws_protocol::incoming::InMessage; +use aquatic_ws_protocol::outgoing::OutMessage; use futures::StreamExt; use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders}; use glommio::enclose; @@ -12,7 +14,6 @@ use glommio::timer::TimerActionRepeat; use rand::{rngs::SmallRng, SeedableRng}; use aquatic_common::{PanicSentinel, ServerStartInstant}; -use aquatic_ws_protocol::*; use crate::common::*; use crate::config::Config; @@ -102,13 +103,14 @@ where while let Some(message) = stream.next().await { match message { SwarmControlMessage::ConnectionClosed { - info_hash, - peer_id, ip_version, + announced_info_hashes, } => { - torrents - .borrow_mut() - .handle_connection_closed(info_hash, peer_id, ip_version); + let mut torrents = torrents.borrow_mut(); + + for (info_hash, peer_id) in announced_info_hashes { + torrents.handle_connection_closed(info_hash, peer_id, ip_version); + } } } } @@ -124,25 +126,8 @@ async fn handle_request_stream( S: futures_lite::Stream + ::std::marker::Unpin, { let rng = Rc::new(RefCell::new(SmallRng::from_entropy())); - - let max_peer_age = config.cleaning.max_peer_age; - let peer_valid_until = Rc::new(RefCell::new(ValidUntil::new( - server_start_instant, - max_peer_age, - ))); - - // Periodically update peer_valid_until - TimerActionRepeat::repeat(enclose!((peer_valid_until) move || { - enclose!((peer_valid_until) move || async move { - *peer_valid_until.borrow_mut() = ValidUntil::new(server_start_instant, max_peer_age); - - Some(Duration::from_secs(1)) - })() - })); - let config = &config; let torrents = &torrents; - let peer_valid_until = &peer_valid_until; let rng = &rng; let out_message_senders = &out_message_senders; @@ -159,7 +144,6 @@ async fn handle_request_stream( &mut rng.borrow_mut(), &mut out_messages, server_start_instant, - peer_valid_until.borrow().to_owned(), meta, request, ) diff --git a/crates/ws/src/workers/swarm/storage.rs b/crates/ws/src/workers/swarm/storage.rs index 7ad311db..37cf1538 100644 --- a/crates/ws/src/workers/swarm/storage.rs +++ b/crates/ws/src/workers/swarm/storage.rs @@ -1,6 +1,11 @@ use std::sync::Arc; use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache}; +use aquatic_ws_protocol::incoming::{AnnounceEvent, AnnounceRequest, ScrapeRequest}; +use aquatic_ws_protocol::outgoing::{ + AnnounceResponse, AnswerOutMessage, ErrorResponse, ErrorResponseAction, OfferOutMessage, + OutMessage, ScrapeResponse, ScrapeStatistics, +}; use hashbrown::HashMap; use metrics::Gauge; use rand::rngs::SmallRng; @@ -8,7 +13,7 @@ use rand::rngs::SmallRng; use aquatic_common::{ extract_response_peers, IndexMap, SecondsSinceServerStart, ServerStartInstant, }; -use aquatic_ws_protocol::*; +use aquatic_ws_protocol::common::*; use crate::common::*; use crate::config::Config; @@ -62,7 +67,6 @@ impl TorrentMaps { rng: &mut SmallRng, out_messages: &mut Vec<(OutMessageMeta, OutMessage)>, server_start_instant: ServerStartInstant, - valid_until: ValidUntil, request_sender_meta: InMessageMeta, request: AnnounceRequest, ) { @@ -72,6 +76,8 @@ impl TorrentMaps { self.ipv6.entry(request.info_hash).or_default() }; + let valid_until = ValidUntil::new(server_start_instant, config.cleaning.max_peer_age); + // If there is already a peer with this peer_id, check that connection id // is same as that of request sender. Otherwise, ignore request. Since // peers have access to each others peer_id's, they could send requests diff --git a/crates/ws_load_test/src/common.rs b/crates/ws_load_test/src/common.rs index ceecacb5..d1d04c5f 100644 --- a/crates/ws_load_test/src/common.rs +++ b/crates/ws_load_test/src/common.rs @@ -1,9 +1,8 @@ use std::sync::{atomic::AtomicUsize, Arc}; +use aquatic_ws_protocol::common::InfoHash; use rand_distr::Gamma; -pub use aquatic_ws_protocol::*; - #[derive(Default)] pub struct Statistics { pub requests: AtomicUsize, @@ -18,7 +17,7 @@ pub struct Statistics { #[derive(Clone)] pub struct LoadTestState { - pub info_hashes: Arc>, + pub info_hashes: Arc<[InfoHash]>, pub statistics: Arc, pub gamma: Arc>, } diff --git a/crates/ws_load_test/src/main.rs b/crates/ws_load_test/src/main.rs index 479f2aa8..91cd8ff5 100644 --- a/crates/ws_load_test/src/main.rs +++ b/crates/ws_load_test/src/main.rs @@ -4,6 +4,7 @@ use std::time::{Duration, Instant}; use aquatic_common::cpu_pinning::glommio::{get_worker_placement, set_affinity_for_util_worker}; use aquatic_common::cpu_pinning::WorkerIndex; +use aquatic_ws_protocol::common::InfoHash; use glommio::LocalExecutorBuilder; use rand::prelude::*; use rand_distr::Gamma; @@ -36,10 +37,10 @@ fn run(config: Config) -> ::anyhow::Result<()> { println!("Starting client with config: {:#?}", config); - let mut info_hashes = Vec::with_capacity(config.torrents.number_of_torrents); - let mut rng = SmallRng::from_entropy(); + let mut info_hashes = Vec::with_capacity(config.torrents.number_of_torrents); + for _ in 0..config.torrents.number_of_torrents { info_hashes.push(InfoHash(rng.gen())); } @@ -51,7 +52,7 @@ fn run(config: Config) -> ::anyhow::Result<()> { .unwrap(); let state = LoadTestState { - info_hashes: Arc::new(info_hashes), + info_hashes: Arc::from(info_hashes.into_boxed_slice()), statistics: Arc::new(Statistics::default()), gamma: Arc::new(gamma), }; diff --git a/crates/ws_load_test/src/network.rs b/crates/ws_load_test/src/network.rs index 2a03ad26..55da243b 100644 --- a/crates/ws_load_test/src/network.rs +++ b/crates/ws_load_test/src/network.rs @@ -6,15 +6,30 @@ use std::{ time::Duration, }; -use aquatic_ws_protocol::{InMessage, OfferId, OutMessage, PeerId, RtcAnswer, RtcAnswerType}; +use aquatic_ws_protocol::incoming::{ + AnnounceEvent, AnnounceRequest, AnnounceRequestOffer, InMessage, ScrapeRequestInfoHashes, +}; +use aquatic_ws_protocol::outgoing::OutMessage; +use aquatic_ws_protocol::{ + common::{ + AnnounceAction, InfoHash, OfferId, PeerId, RtcAnswer, RtcAnswerType, RtcOffer, + RtcOfferType, ScrapeAction, + }, + incoming::ScrapeRequest, +}; use async_tungstenite::{client_async, WebSocketStream}; use futures::{SinkExt, StreamExt}; use futures_rustls::{client::TlsStream, TlsConnector}; use glommio::net::TcpStream; use glommio::{prelude::*, timer::TimerActionRepeat}; use rand::{prelude::SmallRng, Rng, SeedableRng}; +use rand_distr::{Distribution, WeightedIndex}; -use crate::{common::LoadTestState, config::Config, utils::create_random_request}; +use crate::{ + common::{LoadTestState, RequestType}, + config::Config, + utils::select_info_hash_index, +}; pub async fn run_socket_thread( config: Config, @@ -22,7 +37,10 @@ pub async fn run_socket_thread( load_test_state: LoadTestState, ) -> anyhow::Result<()> { let config = Rc::new(config); + let rng = Rc::new(RefCell::new(SmallRng::from_entropy())); let num_active_connections = Rc::new(RefCell::new(0usize)); + let connection_creation_interval = + Duration::from_millis(config.connection_creation_interval_ms); TimerActionRepeat::repeat(move || { periodically_open_connections( @@ -30,10 +48,12 @@ pub async fn run_socket_thread( tls_config.clone(), load_test_state.clone(), num_active_connections.clone(), + rng.clone(), + connection_creation_interval, ) - }); - - futures::future::pending::().await; + }) + .join() + .await; Ok(()) } @@ -43,13 +63,19 @@ async fn periodically_open_connections( tls_config: Arc, load_test_state: LoadTestState, num_active_connections: Rc>, + rng: Rc>, + connection_creation_interval: Duration, ) -> Option { - let wait = Duration::from_millis(config.connection_creation_interval_ms); - if *num_active_connections.borrow() < config.num_connections_per_worker { spawn_local(async move { - if let Err(err) = - Connection::run(config, tls_config, load_test_state, num_active_connections).await + if let Err(err) = Connection::run( + config, + tls_config, + load_test_state, + num_active_connections, + rng, + ) + .await { ::log::info!("connection creation error: {:#}", err); } @@ -57,16 +83,15 @@ async fn periodically_open_connections( .detach(); } - Some(wait) + Some(connection_creation_interval) } struct Connection { config: Rc, load_test_state: LoadTestState, - rng: SmallRng, - can_send: bool, + rng: Rc>, peer_id: PeerId, - send_answer: Option<(PeerId, OfferId)>, + can_send_answer: Option<(InfoHash, PeerId, OfferId)>, stream: WebSocketStream>, } @@ -76,9 +101,9 @@ impl Connection { tls_config: Arc, load_test_state: LoadTestState, num_active_connections: Rc>, + rng: Rc>, ) -> anyhow::Result<()> { - let mut rng = SmallRng::from_entropy(); - let peer_id = PeerId(rng.gen()); + let peer_id = PeerId(rng.borrow_mut().gen()); let stream = TcpStream::connect(config.server_address) .await .map_err(|err| anyhow::anyhow!("connect: {:?}", err))?; @@ -99,9 +124,8 @@ impl Connection { load_test_state, rng, stream, - can_send: true, peer_id, - send_answer: None, + can_send_answer: None, }; *num_active_connections.borrow_mut() += 1; @@ -119,48 +143,101 @@ impl Connection { async fn run_connection_loop(&mut self) -> anyhow::Result<()> { loop { - if self.can_send { - let request = create_random_request( - &self.config, - &self.load_test_state, - &mut self.rng, - self.peer_id, - self.send_answer.is_none(), - ); + self.send_message().await?; + self.read_message().await?; + } + } - // If self.send_answer is set and request is announce request, make - // the request an offer answer - let request = if let InMessage::AnnounceRequest(mut r) = request { - if let Some((peer_id, offer_id)) = self.send_answer { - r.answer_to_peer_id = Some(peer_id); - r.answer_offer_id = Some(offer_id); - r.answer = Some(RtcAnswer { - t: RtcAnswerType::Answer, - sdp: "abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-".into() - }); - r.event = None; - r.offers = None; + async fn send_message(&mut self) -> anyhow::Result<()> { + let mut rng = self.rng.borrow_mut(); + + let request = match random_request_type(&self.config, &mut *rng) { + RequestType::Announce => { + let (event, bytes_left) = { + if rng.gen_bool(self.config.torrents.peer_seeder_probability) { + (AnnounceEvent::Completed, 0) + } else { + (AnnounceEvent::Started, 50) } + }; - self.send_answer = None; + const SDP: &str = "abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg"; - InMessage::AnnounceRequest(r) + if let Some((info_hash, peer_id, offer_id)) = self.can_send_answer { + InMessage::AnnounceRequest(AnnounceRequest { + info_hash, + answer_to_peer_id: Some(peer_id), + answer_offer_id: Some(offer_id), + answer: Some(RtcAnswer { + t: RtcAnswerType::Answer, + sdp: SDP.into(), + }), + event: None, + offers: None, + action: AnnounceAction::Announce, + peer_id: self.peer_id, + bytes_left: Some(bytes_left), + numwant: Some(0), + }) } else { - request - }; + let info_hash_index = + select_info_hash_index(&self.config, &self.load_test_state, &mut *rng); + + let mut offers = Vec::with_capacity(self.config.torrents.offers_per_request); + + for _ in 0..self.config.torrents.offers_per_request { + offers.push(AnnounceRequestOffer { + offer_id: OfferId(rng.gen()), + offer: RtcOffer { + t: RtcOfferType::Offer, + sdp: SDP.into(), + }, + }) + } - self.stream.send(request.to_ws_message()).await?; + InMessage::AnnounceRequest(AnnounceRequest { + action: AnnounceAction::Announce, + info_hash: self.load_test_state.info_hashes[info_hash_index], + peer_id: self.peer_id, + bytes_left: Some(bytes_left), + event: Some(event), + numwant: Some(offers.len()), + offers: Some(offers), + answer: None, + answer_to_peer_id: None, + answer_offer_id: None, + }) + } + } + RequestType::Scrape => { + let mut scrape_hashes = Vec::with_capacity(5); - self.load_test_state - .statistics - .requests - .fetch_add(1, Ordering::Relaxed); + for _ in 0..5 { + let info_hash_index = + select_info_hash_index(&self.config, &self.load_test_state, &mut *rng); - self.can_send = false; + scrape_hashes.push(self.load_test_state.info_hashes[info_hash_index]); + } + + InMessage::ScrapeRequest(ScrapeRequest { + action: ScrapeAction::Scrape, + info_hashes: Some(ScrapeRequestInfoHashes::Multiple(scrape_hashes)), + }) } + }; - self.read_message().await?; - } + drop(rng); + + self.can_send_answer = None; + + self.stream.send(request.to_ws_message()).await?; + + self.load_test_state + .statistics + .requests + .fetch_add(1, Ordering::Relaxed); + + Ok(()) } async fn read_message(&mut self) -> anyhow::Result<()> { @@ -190,33 +267,25 @@ impl Connection { .responses_offer .fetch_add(1, Ordering::Relaxed); - self.send_answer = Some((offer.peer_id, offer.offer_id)); - - self.can_send = true; + self.can_send_answer = Some((offer.info_hash, offer.peer_id, offer.offer_id)); } Ok(OutMessage::AnswerOutMessage(_)) => { self.load_test_state .statistics .responses_answer .fetch_add(1, Ordering::Relaxed); - - self.can_send = true; } Ok(OutMessage::AnnounceResponse(_)) => { self.load_test_state .statistics .responses_announce .fetch_add(1, Ordering::Relaxed); - - self.can_send = true; } Ok(OutMessage::ScrapeResponse(_)) => { self.load_test_state .statistics .responses_scrape .fetch_add(1, Ordering::Relaxed); - - self.can_send = true; } Ok(OutMessage::ErrorResponse(response)) => { self.load_test_state @@ -225,8 +294,6 @@ impl Connection { .fetch_add(1, Ordering::Relaxed); ::log::warn!("received error response: {:?}", response.failure_reason); - - self.can_send = true; } Err(err) => { ::log::error!("error deserializing message: {:#}", err); @@ -236,3 +303,16 @@ impl Connection { Ok(()) } } + +pub fn random_request_type(config: &Config, rng: &mut impl Rng) -> RequestType { + let weights = [ + config.torrents.weight_announce as u32, + config.torrents.weight_scrape as u32, + ]; + + let items = [RequestType::Announce, RequestType::Scrape]; + + let dist = WeightedIndex::new(&weights).expect("random request weighted index"); + + items[dist.sample(rng)] +} diff --git a/crates/ws_load_test/src/utils.rs b/crates/ws_load_test/src/utils.rs index bb174438..1c5c5822 100644 --- a/crates/ws_load_test/src/utils.rs +++ b/crates/ws_load_test/src/utils.rs @@ -1,104 +1,13 @@ use std::sync::Arc; -use rand::distributions::WeightedIndex; use rand::prelude::*; use rand_distr::Gamma; use crate::common::*; use crate::config::*; -pub fn create_random_request( - config: &Config, - state: &LoadTestState, - rng: &mut impl Rng, - peer_id: PeerId, - announce_gen_offers: bool, -) -> InMessage { - let weights = [ - config.torrents.weight_announce as u32, - config.torrents.weight_scrape as u32, - ]; - - let items = [RequestType::Announce, RequestType::Scrape]; - - let dist = WeightedIndex::new(&weights).expect("random request weighted index"); - - match items[dist.sample(rng)] { - RequestType::Announce => { - create_announce_request(config, state, rng, peer_id, announce_gen_offers) - } - RequestType::Scrape => create_scrape_request(config, state, rng), - } -} - -#[inline] -fn create_announce_request( - config: &Config, - state: &LoadTestState, - rng: &mut impl Rng, - peer_id: PeerId, - gen_offers: bool, -) -> InMessage { - let (event, bytes_left) = { - if rng.gen_bool(config.torrents.peer_seeder_probability) { - (AnnounceEvent::Completed, 0) - } else { - (AnnounceEvent::Started, 50) - } - }; - - let info_hash_index = select_info_hash_index(config, &state, rng); - - let offers = if gen_offers { - let mut offers = Vec::with_capacity(config.torrents.offers_per_request); - - for _ in 0..config.torrents.offers_per_request { - offers.push(AnnounceRequestOffer { - offer_id: OfferId(rng.gen()), - offer: RtcOffer { - t: RtcOfferType::Offer, - sdp: "abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-".into() - }, - }) - } - - offers - } else { - Vec::new() - }; - - InMessage::AnnounceRequest(AnnounceRequest { - action: AnnounceAction::Announce, - info_hash: state.info_hashes[info_hash_index], - peer_id, - bytes_left: Some(bytes_left), - event: Some(event), - numwant: Some(offers.len()), - offers: Some(offers), - answer: None, - answer_to_peer_id: None, - answer_offer_id: None, - }) -} - -#[inline] -fn create_scrape_request(config: &Config, state: &LoadTestState, rng: &mut impl Rng) -> InMessage { - let mut scrape_hashes = Vec::with_capacity(5); - - for _ in 0..5 { - let info_hash_index = select_info_hash_index(config, &state, rng); - - scrape_hashes.push(state.info_hashes[info_hash_index]); - } - - InMessage::ScrapeRequest(ScrapeRequest { - action: ScrapeAction::Scrape, - info_hashes: Some(ScrapeRequestInfoHashes::Multiple(scrape_hashes)), - }) -} - #[inline] -fn select_info_hash_index(config: &Config, state: &LoadTestState, rng: &mut impl Rng) -> usize { +pub fn select_info_hash_index(config: &Config, state: &LoadTestState, rng: &mut impl Rng) -> usize { gamma_usize(rng, &state.gamma, config.torrents.number_of_torrents - 1) } diff --git a/crates/ws_protocol/benches/bench_deserialize_announce_request.rs b/crates/ws_protocol/benches/bench_deserialize_announce_request.rs index 7b961e7e..c239b162 100644 --- a/crates/ws_protocol/benches/bench_deserialize_announce_request.rs +++ b/crates/ws_protocol/benches/bench_deserialize_announce_request.rs @@ -1,7 +1,10 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion}; use std::time::Duration; -use aquatic_ws_protocol::*; +use aquatic_ws_protocol::{ + common::*, + incoming::{AnnounceEvent, AnnounceRequest, AnnounceRequestOffer, InMessage}, +}; pub fn bench(c: &mut Criterion) { let info_hash = InfoHash([ diff --git a/crates/ws_protocol/src/lib.rs b/crates/ws_protocol/src/lib.rs index 8a6bed96..c61681a9 100644 --- a/crates/ws_protocol/src/lib.rs +++ b/crates/ws_protocol/src/lib.rs @@ -14,16 +14,22 @@ pub mod common; pub mod incoming; pub mod outgoing; -pub use common::*; -pub use incoming::*; -pub use outgoing::*; - #[cfg(test)] mod tests { use quickcheck::Arbitrary; use quickcheck_macros::quickcheck; - use super::*; + use crate::{ + common::*, + incoming::{ + AnnounceEvent, AnnounceRequest, AnnounceRequestOffer, InMessage, ScrapeRequest, + ScrapeRequestInfoHashes, + }, + outgoing::{ + AnnounceResponse, AnswerOutMessage, OfferOutMessage, OutMessage, ScrapeResponse, + ScrapeStatistics, + }, + }; fn arbitrary_20_bytes(g: &mut quickcheck::Gen) -> [u8; 20] { let mut bytes = [0u8; 20];