Skip to content

Commit

Permalink
Merge pull request #171 from greatest-ape/ws-fix-backpressure-2
Browse files Browse the repository at this point in the history
aquatic_ws: remove ineffective backpressure implementation, improve load tester, remove glob exports
  • Loading branch information
greatest-ape authored Jan 8, 2024
2 parents bcd8988 + 0dae7fd commit d8bd964
Show file tree
Hide file tree
Showing 11 changed files with 204 additions and 236 deletions.
7 changes: 3 additions & 4 deletions crates/ws/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{net::IpAddr, sync::Arc};
use aquatic_common::access_list::AccessListArcSwap;

pub use aquatic_common::ValidUntil;
use aquatic_ws_protocol::{InfoHash, PeerId};
use aquatic_ws_protocol::common::{InfoHash, PeerId};

#[derive(Copy, Clone, Debug)]
pub enum IpVersion {
Expand Down Expand Up @@ -67,11 +67,10 @@ impl Into<OutMessageMeta> for InMessageMeta {
}
}

#[derive(Clone, Copy, Debug)]
#[derive(Clone, Debug)]
pub enum SwarmControlMessage {
ConnectionClosed {
info_hash: InfoHash,
peer_id: PeerId,
ip_version: IpVersion,
announced_info_hashes: Vec<(InfoHash, PeerId)>,
},
}
61 changes: 20 additions & 41 deletions crates/ws/src/workers/socket/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,21 @@ use anyhow::Context;
use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache};
use aquatic_common::rustls_config::RustlsConfig;
use aquatic_common::ServerStartInstant;
use aquatic_ws_protocol::*;
use aquatic_ws_protocol::common::{InfoHash, PeerId, ScrapeAction};
use aquatic_ws_protocol::incoming::{
AnnounceEvent, AnnounceRequest, InMessage, ScrapeRequest, ScrapeRequestInfoHashes,
};
use aquatic_ws_protocol::outgoing::{
ErrorResponse, ErrorResponseAction, OutMessage, ScrapeResponse, ScrapeStatistics,
};
use arc_swap::ArcSwap;
use async_tungstenite::WebSocketStream;
use futures::stream::{SplitSink, SplitStream};
use futures::{AsyncWriteExt, StreamExt};
use futures_lite::future::race;
use futures_rustls::TlsAcceptor;
use glommio::channels::channel_mesh::Senders;
use glommio::channels::local_channel::{new_bounded, LocalReceiver, LocalSender};
use glommio::channels::local_channel::{LocalReceiver, LocalSender};
use glommio::net::TcpStream;
use glommio::timer::timeout;
use glommio::{enclose, prelude::*};
Expand All @@ -35,13 +41,6 @@ use crate::workers::socket::calculate_in_message_consumer_index;
#[cfg(feature = "metrics")]
use crate::workers::socket::{ip_version_to_metrics_str, WORKER_INDEX};

/// Length of ConnectionReader backpressure channel
///
/// ConnectionReader awaits a message in a channel before proceeding with
/// reading a request. For each response sent, a message is sent to the
/// channel, up to a maximum of this constant.
const READ_PASS_CHANNEL_LEN: usize = 4;

pub struct ConnectionRunner {
pub config: Rc<Config>,
pub access_list: Arc<AccessListArcSwap>,
Expand Down Expand Up @@ -168,17 +167,6 @@ impl ConnectionRunner {
let pending_scrape_slab = Rc::new(RefCell::new(Slab::new()));
let access_list_cache = create_access_list_cache(&self.access_list);

let (read_pass_sender, read_pass_receiver) = new_bounded(READ_PASS_CHANNEL_LEN);

for _ in 0..READ_PASS_CHANNEL_LEN {
if let Err(err) = read_pass_sender.try_send(()) {
panic!(
"couldn't add initial entries to read pass channel: {:#}",
err
)
};
}

let config = self.config.clone();

let reader_future = enclose!((pending_scrape_slab, clean_up_data) async move {
Expand All @@ -187,7 +175,6 @@ impl ConnectionRunner {
access_list_cache,
in_message_senders: self.in_message_senders,
out_message_sender: self.out_message_sender,
read_pass_receiver,
pending_scrape_slab,
out_message_consumer_id: self.out_message_consumer_id,
ws_in,
Expand Down Expand Up @@ -217,7 +204,6 @@ impl ConnectionRunner {
let mut writer = ConnectionWriter {
config,
out_message_receiver: self.out_message_receiver,
read_pass_sender,
connection_valid_until: self.connection_valid_until,
ws_out,
pending_scrape_slab,
Expand All @@ -238,7 +224,6 @@ struct ConnectionReader<S> {
access_list_cache: AccessListCache,
in_message_senders: Rc<Senders<(InMessageMeta, InMessage)>>,
out_message_sender: Rc<LocalSender<(OutMessageMeta, OutMessage)>>,
read_pass_receiver: LocalReceiver<()>,
pending_scrape_slab: Rc<RefCell<Slab<PendingScrapeResponse>>>,
out_message_consumer_id: ConsumerId,
ws_in: SplitStream<WebSocketStream<S>>,
Expand All @@ -254,11 +239,6 @@ struct ConnectionReader<S> {
impl<S: futures::AsyncRead + futures::AsyncWrite + Unpin> ConnectionReader<S> {
async fn run_in_message_loop(&mut self) -> anyhow::Result<()> {
loop {
self.read_pass_receiver
.recv()
.await
.ok_or_else(|| anyhow::anyhow!("read pass channel closed"))?;

let message = self
.ws_in
.next()
Expand Down Expand Up @@ -496,7 +476,6 @@ impl<S: futures::AsyncRead + futures::AsyncWrite + Unpin> ConnectionReader<S> {
struct ConnectionWriter<S> {
config: Rc<Config>,
out_message_receiver: LocalReceiver<(OutMessageMeta, OutMessage)>,
read_pass_sender: LocalSender<()>,
connection_valid_until: Rc<RefCell<ValidUntil>>,
ws_out: SplitSink<WebSocketStream<S>, tungstenite::Message>,
pending_scrape_slab: Rc<RefCell<Slab<PendingScrapeResponse>>>,
Expand Down Expand Up @@ -549,10 +528,6 @@ impl<S: futures::AsyncRead + futures::AsyncWrite + Unpin> ConnectionWriter<S> {
}
};

if let Err(GlommioError::Closed(_)) = self.read_pass_sender.try_send(()) {
return Err(anyhow::anyhow!("read pass channel closed"));
}

yield_if_needed().await;
}
}
Expand Down Expand Up @@ -630,19 +605,23 @@ impl ConnectionCleanupData {
config: &Config,
control_message_senders: Rc<Senders<SwarmControlMessage>>,
) {
// Use RefCell::take to avoid issues with Rc borrow across await
let announced_info_hashes = self.announced_info_hashes.take();
let mut announced_info_hashes = HashMap::new();

for (info_hash, peer_id) in self.announced_info_hashes.take().into_iter() {
let consumer_index = calculate_in_message_consumer_index(&config, info_hash);

// Tell swarm workers to remove peer
for (info_hash, peer_id) in announced_info_hashes.into_iter() {
announced_info_hashes
.entry(consumer_index)
.or_insert(Vec::new())
.push((info_hash, peer_id));
}

for (consumer_index, announced_info_hashes) in announced_info_hashes.into_iter() {
let message = SwarmControlMessage::ConnectionClosed {
info_hash,
peer_id,
ip_version: self.ip_version,
announced_info_hashes,
};

let consumer_index = calculate_in_message_consumer_index(&config, info_hash);

control_message_senders
.send_to(consumer_index, message)
.await
Expand Down
4 changes: 3 additions & 1 deletion crates/ws/src/workers/socket/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ use anyhow::Context;
use aquatic_common::privileges::PrivilegeDropper;
use aquatic_common::rustls_config::RustlsConfig;
use aquatic_common::{PanicSentinel, ServerStartInstant};
use aquatic_ws_protocol::*;
use aquatic_ws_protocol::common::InfoHash;
use aquatic_ws_protocol::incoming::InMessage;
use aquatic_ws_protocol::outgoing::OutMessage;
use arc_swap::ArcSwap;
use futures::StreamExt;
use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role};
Expand Down
32 changes: 8 additions & 24 deletions crates/ws/src/workers/swarm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use std::cell::RefCell;
use std::rc::Rc;
use std::time::Duration;

use aquatic_ws_protocol::incoming::InMessage;
use aquatic_ws_protocol::outgoing::OutMessage;
use futures::StreamExt;
use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders};
use glommio::enclose;
Expand All @@ -12,7 +14,6 @@ use glommio::timer::TimerActionRepeat;
use rand::{rngs::SmallRng, SeedableRng};

use aquatic_common::{PanicSentinel, ServerStartInstant};
use aquatic_ws_protocol::*;

use crate::common::*;
use crate::config::Config;
Expand Down Expand Up @@ -102,13 +103,14 @@ where
while let Some(message) = stream.next().await {
match message {
SwarmControlMessage::ConnectionClosed {
info_hash,
peer_id,
ip_version,
announced_info_hashes,
} => {
torrents
.borrow_mut()
.handle_connection_closed(info_hash, peer_id, ip_version);
let mut torrents = torrents.borrow_mut();

for (info_hash, peer_id) in announced_info_hashes {
torrents.handle_connection_closed(info_hash, peer_id, ip_version);
}
}
}
}
Expand All @@ -124,25 +126,8 @@ async fn handle_request_stream<S>(
S: futures_lite::Stream<Item = (InMessageMeta, InMessage)> + ::std::marker::Unpin,
{
let rng = Rc::new(RefCell::new(SmallRng::from_entropy()));

let max_peer_age = config.cleaning.max_peer_age;
let peer_valid_until = Rc::new(RefCell::new(ValidUntil::new(
server_start_instant,
max_peer_age,
)));

// Periodically update peer_valid_until
TimerActionRepeat::repeat(enclose!((peer_valid_until) move || {
enclose!((peer_valid_until) move || async move {
*peer_valid_until.borrow_mut() = ValidUntil::new(server_start_instant, max_peer_age);

Some(Duration::from_secs(1))
})()
}));

let config = &config;
let torrents = &torrents;
let peer_valid_until = &peer_valid_until;
let rng = &rng;
let out_message_senders = &out_message_senders;

Expand All @@ -159,7 +144,6 @@ async fn handle_request_stream<S>(
&mut rng.borrow_mut(),
&mut out_messages,
server_start_instant,
peer_valid_until.borrow().to_owned(),
meta,
request,
)
Expand Down
10 changes: 8 additions & 2 deletions crates/ws/src/workers/swarm/storage.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
use std::sync::Arc;

use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache};
use aquatic_ws_protocol::incoming::{AnnounceEvent, AnnounceRequest, ScrapeRequest};
use aquatic_ws_protocol::outgoing::{
AnnounceResponse, AnswerOutMessage, ErrorResponse, ErrorResponseAction, OfferOutMessage,
OutMessage, ScrapeResponse, ScrapeStatistics,
};
use hashbrown::HashMap;
use metrics::Gauge;
use rand::rngs::SmallRng;

use aquatic_common::{
extract_response_peers, IndexMap, SecondsSinceServerStart, ServerStartInstant,
};
use aquatic_ws_protocol::*;
use aquatic_ws_protocol::common::*;

use crate::common::*;
use crate::config::Config;
Expand Down Expand Up @@ -62,7 +67,6 @@ impl TorrentMaps {
rng: &mut SmallRng,
out_messages: &mut Vec<(OutMessageMeta, OutMessage)>,
server_start_instant: ServerStartInstant,
valid_until: ValidUntil,
request_sender_meta: InMessageMeta,
request: AnnounceRequest,
) {
Expand All @@ -72,6 +76,8 @@ impl TorrentMaps {
self.ipv6.entry(request.info_hash).or_default()
};

let valid_until = ValidUntil::new(server_start_instant, config.cleaning.max_peer_age);

// If there is already a peer with this peer_id, check that connection id
// is same as that of request sender. Otherwise, ignore request. Since
// peers have access to each others peer_id's, they could send requests
Expand Down
5 changes: 2 additions & 3 deletions crates/ws_load_test/src/common.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use std::sync::{atomic::AtomicUsize, Arc};

use aquatic_ws_protocol::common::InfoHash;
use rand_distr::Gamma;

pub use aquatic_ws_protocol::*;

#[derive(Default)]
pub struct Statistics {
pub requests: AtomicUsize,
Expand All @@ -18,7 +17,7 @@ pub struct Statistics {

#[derive(Clone)]
pub struct LoadTestState {
pub info_hashes: Arc<Vec<InfoHash>>,
pub info_hashes: Arc<[InfoHash]>,
pub statistics: Arc<Statistics>,
pub gamma: Arc<Gamma<f64>>,
}
Expand Down
7 changes: 4 additions & 3 deletions crates/ws_load_test/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::time::{Duration, Instant};

use aquatic_common::cpu_pinning::glommio::{get_worker_placement, set_affinity_for_util_worker};
use aquatic_common::cpu_pinning::WorkerIndex;
use aquatic_ws_protocol::common::InfoHash;
use glommio::LocalExecutorBuilder;
use rand::prelude::*;
use rand_distr::Gamma;
Expand Down Expand Up @@ -36,10 +37,10 @@ fn run(config: Config) -> ::anyhow::Result<()> {

println!("Starting client with config: {:#?}", config);

let mut info_hashes = Vec::with_capacity(config.torrents.number_of_torrents);

let mut rng = SmallRng::from_entropy();

let mut info_hashes = Vec::with_capacity(config.torrents.number_of_torrents);

for _ in 0..config.torrents.number_of_torrents {
info_hashes.push(InfoHash(rng.gen()));
}
Expand All @@ -51,7 +52,7 @@ fn run(config: Config) -> ::anyhow::Result<()> {
.unwrap();

let state = LoadTestState {
info_hashes: Arc::new(info_hashes),
info_hashes: Arc::from(info_hashes.into_boxed_slice()),
statistics: Arc::new(Statistics::default()),
gamma: Arc::new(gamma),
};
Expand Down
Loading

0 comments on commit d8bd964

Please sign in to comment.