From b527af7195738de2a12b036edc19c2b6a9b8f11a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 1 Jan 2024 13:32:28 +0100 Subject: [PATCH 1/9] udp: distribute announce swarm responses among socket workers They don't have to be sent from the same worker that received the request, so we can decrease performance loss from underutilized threads this way. --- crates/udp/src/common.rs | 23 ++++- crates/udp/src/workers/swarm/mod.rs | 125 +++++++++++++++------------- 2 files changed, 91 insertions(+), 57 deletions(-) diff --git a/crates/udp/src/common.rs b/crates/udp/src/common.rs index 439a4974..8cc322c2 100644 --- a/crates/udp/src/common.rs +++ b/crates/udp/src/common.rs @@ -171,13 +171,17 @@ impl ConnectedRequestSender { pub struct ConnectedResponseSender { senders: Vec>, + to_any_last_index_picked: usize, } impl ConnectedResponseSender { pub fn new( senders: Vec>, ) -> Self { - Self { senders } + Self { + senders, + to_any_last_index_picked: 0, + } } pub fn try_send_ref_to( @@ -193,6 +197,23 @@ impl ConnectedResponseSender { ) -> Result, thingbuf::mpsc::errors::Closed> { self.senders[index.0].send_ref() } + + pub fn send_ref_to_any( + &mut self, + ) -> Result, thingbuf::mpsc::errors::Closed> { + let start = self.to_any_last_index_picked + 1; + + for i in (start..start + self.senders.len()).map(|i| i % self.senders.len()) { + if let Ok(sender) = self.senders[i].try_send_ref() { + self.to_any_last_index_picked = i; + + return Ok(sender); + } + } + + self.to_any_last_index_picked = start % self.senders.len(); + self.send_ref_to(SocketWorkerIndex(self.to_any_last_index_picked)) + } } pub type ConnectedResponseReceiver = diff --git a/crates/udp/src/workers/swarm/mod.rs b/crates/udp/src/workers/swarm/mod.rs index 43ee480d..ea551f32 100644 --- a/crates/udp/src/workers/swarm/mod.rs +++ b/crates/udp/src/workers/swarm/mod.rs @@ -23,7 +23,7 @@ pub fn run_swarm_worker( state: State, server_start_instant: ServerStartInstant, request_receiver: Receiver<(SocketWorkerIndex, ConnectedRequest, CanonicalSocketAddr)>, - response_sender: ConnectedResponseSender, + mut response_sender: ConnectedResponseSender, statistics_sender: Sender, worker_index: SwarmWorkerIndex, ) { @@ -43,65 +43,78 @@ pub fn run_swarm_worker( loop { if let Ok((sender_index, request, src)) = request_receiver.recv_timeout(timeout) { - // It is OK to block here as long as we don't do blocking sends - // in socket workers, which could cause a deadlock - match response_sender.send_ref_to(sender_index) { - Ok(mut send_ref) => { + // It is OK to block here as long as we don't also do blocking + // sends in socket workers (doing both could cause a deadlock) + match (request, src.get().ip()) { + (ConnectedRequest::Announce(request), IpAddr::V4(ip)) => { + // It doesn't matter which socket worker receives announce responses + let mut send_ref = response_sender + .send_ref_to_any() + .expect("swarm response channel is closed"); + send_ref.addr = src; + send_ref.kind = ConnectedResponseKind::AnnounceIpv4; + + torrents + .ipv4 + .0 + .entry(request.info_hash) + .or_default() + .announce( + &config, + &statistics_sender, + &mut rng, + &request, + ip.into(), + peer_valid_until, + &mut send_ref.announce_ipv4, + ); + } + (ConnectedRequest::Announce(request), IpAddr::V6(ip)) => { + // It doesn't matter which socket worker receives announce responses + let mut send_ref = response_sender + .send_ref_to_any() + .expect("swarm response channel is closed"); - match (request, src.get().ip()) { - (ConnectedRequest::Announce(request), IpAddr::V4(ip)) => { - send_ref.kind = ConnectedResponseKind::AnnounceIpv4; - - torrents - .ipv4 - .0 - .entry(request.info_hash) - .or_default() - .announce( - &config, - &statistics_sender, - &mut rng, - &request, - ip.into(), - peer_valid_until, - &mut send_ref.announce_ipv4, - ); - } - (ConnectedRequest::Announce(request), IpAddr::V6(ip)) => { - send_ref.kind = ConnectedResponseKind::AnnounceIpv6; - - torrents - .ipv6 - .0 - .entry(request.info_hash) - .or_default() - .announce( - &config, - &statistics_sender, - &mut rng, - &request, - ip.into(), - peer_valid_until, - &mut send_ref.announce_ipv6, - ); - } - (ConnectedRequest::Scrape(request), IpAddr::V4(_)) => { - send_ref.kind = ConnectedResponseKind::Scrape; - - torrents.ipv4.scrape(request, &mut send_ref.scrape); - } - (ConnectedRequest::Scrape(request), IpAddr::V6(_)) => { - send_ref.kind = ConnectedResponseKind::Scrape; - - torrents.ipv6.scrape(request, &mut send_ref.scrape); - } - }; + send_ref.addr = src; + send_ref.kind = ConnectedResponseKind::AnnounceIpv6; + + torrents + .ipv6 + .0 + .entry(request.info_hash) + .or_default() + .announce( + &config, + &statistics_sender, + &mut rng, + &request, + ip.into(), + peer_valid_until, + &mut send_ref.announce_ipv6, + ); } - Err(_) => { - panic!("swarm response channel closed"); + (ConnectedRequest::Scrape(request), IpAddr::V4(_)) => { + let mut send_ref = response_sender + .send_ref_to(sender_index) + .expect("swarm response channel is closed"); + + send_ref.addr = src; + send_ref.kind = ConnectedResponseKind::Scrape; + + torrents.ipv4.scrape(request, &mut send_ref.scrape); } - } + (ConnectedRequest::Scrape(request), IpAddr::V6(_)) => { + let mut send_ref = response_sender + .send_ref_to(sender_index) + .expect("swarm response channel is closed"); + + send_ref.addr = src; + send_ref.kind = ConnectedResponseKind::Scrape; + + torrents.ipv6.scrape(request, &mut send_ref.scrape); + } + }; } // Run periodic tasks From 29e243ac81dee651e31ad7edc76db863f9e9c64a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 1 Jan 2024 15:44:39 +0100 Subject: [PATCH 2/9] bencher: move html gen into own module --- crates/bencher/src/html.rs | 80 +++++++++++++++++++++++++++++ crates/bencher/src/main.rs | 1 + crates/bencher/src/set.rs | 100 +++++-------------------------------- 3 files changed, 93 insertions(+), 88 deletions(-) create mode 100644 crates/bencher/src/html.rs diff --git a/crates/bencher/src/html.rs b/crates/bencher/src/html.rs new file mode 100644 index 00000000..e848741e --- /dev/null +++ b/crates/bencher/src/html.rs @@ -0,0 +1,80 @@ +use indexmap::{IndexMap, IndexSet}; +use itertools::Itertools; + +use crate::set::TrackerCoreCountResults; + +pub fn html_best_results(results: &[TrackerCoreCountResults]) -> String { + let mut all_implementation_names = IndexSet::new(); + + for core_count_results in results { + all_implementation_names.extend( + core_count_results + .implementations + .iter() + .map(|r| r.name.clone()), + ); + } + + let mut data_rows = Vec::new(); + + for core_count_results in results { + let best_results = core_count_results + .implementations + .iter() + .map(|implementation| (implementation.name.clone(), implementation.best_result())) + .collect::>(); + + let best_results_for_all_implementations = all_implementation_names + .iter() + .map(|name| best_results.get(name).cloned().flatten()) + .collect::>(); + + let data_row = format!( + " + + {} + {} + + ", + core_count_results.core_count, + best_results_for_all_implementations + .into_iter() + .map(|result| { + if let Some(r) = result { + format!( + r#"{}"#, + r.tracker_info, + r.tracker_process_stats.avg_cpu_utilization, + r.average_responses, + ) + } else { + "-".to_string() + } + }) + .join("\n"), + ); + + data_rows.push(data_row); + } + + format!( + " + + + + + {} + + + + {} + +
CPU cores
+ ", + all_implementation_names + .iter() + .map(|name| format!("{name}")) + .join("\n"), + data_rows.join("\n") + ) +} diff --git a/crates/bencher/src/main.rs b/crates/bencher/src/main.rs index 2ed2be81..5494d5c8 100644 --- a/crates/bencher/src/main.rs +++ b/crates/bencher/src/main.rs @@ -1,4 +1,5 @@ pub mod common; +pub mod html; pub mod protocols; pub mod run; pub mod set; diff --git a/crates/bencher/src/set.rs b/crates/bencher/src/set.rs index 597b8fa5..837aae28 100644 --- a/crates/bencher/src/set.rs +++ b/crates/bencher/src/set.rs @@ -1,10 +1,10 @@ use std::rc::Rc; -use indexmap::{IndexMap, IndexSet}; -use itertools::Itertools; +use indexmap::IndexMap; use crate::{ common::{CpuDirection, CpuMode, TaskSetCpuList}, + html::html_best_results, run::{ProcessRunner, ProcessStats, RunConfig}, }; @@ -106,21 +106,21 @@ pub fn run_sets( }) .collect::>(); - html_summary(&results); + html_best_results(&results); } pub struct TrackerCoreCountResults { - core_count: usize, - implementations: Vec, + pub core_count: usize, + pub implementations: Vec, } pub struct ImplementationResults { - name: String, - configurations: Vec, + pub name: String, + pub configurations: Vec, } impl ImplementationResults { - fn best_result(&self) -> Option { + pub fn best_result(&self) -> Option { self.configurations .iter() .filter_map(|c| c.best_result()) @@ -135,7 +135,7 @@ impl ImplementationResults { } pub struct TrackerConfigurationResults { - load_tests: Vec, + pub load_tests: Vec, } impl TrackerConfigurationResults { @@ -227,89 +227,13 @@ impl LoadTestRunResults { #[derive(Clone)] pub struct LoadTestRunResultsSuccess { - average_responses: f32, + pub average_responses: f32, // tracker_keys: IndexMap, - tracker_info: String, - tracker_process_stats: ProcessStats, + pub tracker_info: String, + pub tracker_process_stats: ProcessStats, // load_test_keys: IndexMap, } pub struct LoadTestRunResultsFailure { // load_test_keys: IndexMap, } - -pub fn html_summary(results: &[TrackerCoreCountResults]) { - let mut all_implementation_names = IndexSet::new(); - - for core_count_results in results { - all_implementation_names.extend( - core_count_results - .implementations - .iter() - .map(|r| r.name.clone()), - ); - } - - let mut data_rows = Vec::new(); - - for core_count_results in results { - let best_results = core_count_results - .implementations - .iter() - .map(|implementation| (implementation.name.clone(), implementation.best_result())) - .collect::>(); - - let best_results_for_all_implementations = all_implementation_names - .iter() - .map(|name| best_results.get(name).cloned().flatten()) - .collect::>(); - - let data_row = format!( - " - - {} - {} - - ", - core_count_results.core_count, - best_results_for_all_implementations - .into_iter() - .map(|result| { - if let Some(r) = result { - format!( - r#"{}"#, - r.tracker_info, - r.tracker_process_stats.avg_cpu_utilization, - r.average_responses, - ) - } else { - "-".to_string() - } - }) - .join("\n"), - ); - - data_rows.push(data_row); - } - - println!( - " - - - - - {} - - - - {} - -
CPU cores
- ", - all_implementation_names - .iter() - .map(|name| format!("{name}")) - .join("\n"), - data_rows.join("\n") - ) -} From 4db1fe75f2b82e1a88dfa17f3d1acde6deac63fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 1 Jan 2024 16:42:27 +0100 Subject: [PATCH 3/9] bencher: print html table with full results too, use num_format --- Cargo.lock | 1 + crates/bencher/Cargo.toml | 1 + crates/bencher/src/html.rs | 133 ++++++++++++++++++++++++++++++++++++- crates/bencher/src/run.rs | 9 ++- crates/bencher/src/set.rs | 39 +++++++---- 5 files changed, 164 insertions(+), 19 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 65a00cd9..84d76370 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -139,6 +139,7 @@ dependencies = [ "indoc", "itertools 0.12.0", "nonblock", + "num-format", "once_cell", "regex", "serde", diff --git a/crates/bencher/Cargo.toml b/crates/bencher/Cargo.toml index bdcdaf65..5fd56312 100644 --- a/crates/bencher/Cargo.toml +++ b/crates/bencher/Cargo.toml @@ -27,6 +27,7 @@ clap = { version = "4", features = ["derive"] } indexmap = "2" indoc = "2" itertools = "0.12" +num-format = "0.4" nonblock = "0.2" once_cell = "1" regex = "1" diff --git a/crates/bencher/src/html.rs b/crates/bencher/src/html.rs index e848741e..e87802a1 100644 --- a/crates/bencher/src/html.rs +++ b/crates/bencher/src/html.rs @@ -1,7 +1,9 @@ use indexmap::{IndexMap, IndexSet}; +use indoc::formatdoc; use itertools::Itertools; +use num_format::{Locale, ToFormattedString}; -use crate::set::TrackerCoreCountResults; +use crate::set::{LoadTestRunResults, TrackerCoreCountResults}; pub fn html_best_results(results: &[TrackerCoreCountResults]) -> String { let mut all_implementation_names = IndexSet::new(); @@ -45,7 +47,7 @@ pub fn html_best_results(results: &[TrackerCoreCountResults]) -> String { r#"{}"#, r.tracker_info, r.tracker_process_stats.avg_cpu_utilization, - r.average_responses, + r.average_responses.to_formatted_string(&Locale::en), ) } else { "-".to_string() @@ -59,6 +61,7 @@ pub fn html_best_results(results: &[TrackerCoreCountResults]) -> String { format!( " +

Best results

@@ -78,3 +81,129 @@ pub fn html_best_results(results: &[TrackerCoreCountResults]) -> String { data_rows.join("\n") ) } + +pub fn html_all_runs(all_results: &[TrackerCoreCountResults]) -> String { + let mut all_implementation_names = IndexSet::new(); + + for core_count_results in all_results { + all_implementation_names.extend( + core_count_results + .implementations + .iter() + .map(|r| r.name.clone()), + ); + } + + struct R { + core_count: usize, + avg_responses: Option, + tracker_keys: IndexMap, + tracker_vcpus: String, + load_test_keys: IndexMap, + load_test_vcpus: String, + } + + let mut output = String::new(); + + let mut results_by_implementation: IndexMap> = Default::default(); + + for implementation_name in all_implementation_names { + let results = results_by_implementation + .entry(implementation_name.clone()) + .or_default(); + + let mut tracker_key_names: IndexSet = Default::default(); + let mut load_test_key_names: IndexSet = Default::default(); + + for r in all_results { + for i in r + .implementations + .iter() + .filter(|i| i.name == implementation_name) + { + for c in i.configurations.iter() { + for l in c.load_tests.iter() { + match l { + LoadTestRunResults::Success(l) => { + tracker_key_names.extend(l.tracker_keys.keys().cloned()); + load_test_key_names.extend(l.load_test_keys.keys().cloned()); + + results.push(R { + core_count: r.core_count, + avg_responses: Some(l.average_responses), + tracker_keys: l.tracker_keys.clone(), + tracker_vcpus: l.tracker_vcpus.as_cpu_list(), + load_test_keys: l.load_test_keys.clone(), + load_test_vcpus: l.load_test_vcpus.as_cpu_list(), + }) + } + LoadTestRunResults::Failure(l) => { + tracker_key_names.extend(l.tracker_keys.keys().cloned()); + load_test_key_names.extend(l.load_test_keys.keys().cloned()); + + results.push(R { + core_count: r.core_count, + avg_responses: None, + tracker_keys: l.tracker_keys.clone(), + tracker_vcpus: l.tracker_vcpus.as_cpu_list(), + load_test_keys: l.load_test_keys.clone(), + load_test_vcpus: l.load_test_vcpus.as_cpu_list(), + }) + } + } + } + } + } + } + + output.push_str(&formatdoc! { + " +

Results for {}

+
+ + + + + {} + {} + + + + + + {} + +
CoresResponsesTracker vCPUsLoad test vCPUs
+ ", + implementation_name, + tracker_key_names.iter().map(|name| format!("Tracker {}", name)).join("\n"), + load_test_key_names.iter().map(|name| format!("Load test {}", name)).join("\n"), + results.into_iter().map(|r| { + formatdoc! { + " + + {} + {} + {} + {} + {} + {} + + ", + r.core_count, + r.avg_responses.map(|v| v.to_formatted_string(&Locale::en)).unwrap_or_else(|| "-".to_string()), + tracker_key_names.iter().map(|name| { + format!("{}", r.tracker_keys.get(name).cloned().unwrap_or_else(|| "-".to_string())) + }).join("\n"), + load_test_key_names.iter().map(|name| { + format!("{}", r.load_test_keys.get(name).cloned().unwrap_or_else(|| "-".to_string())) + }).join("\n"), + r.tracker_vcpus, + r.load_test_vcpus, + } + }).join("\n") + }); + } + + output +} diff --git a/crates/bencher/src/run.rs b/crates/bencher/src/run.rs index 09573e07..293eb8ca 100644 --- a/crates/bencher/src/run.rs +++ b/crates/bencher/src/run.rs @@ -163,9 +163,8 @@ impl RunConfig { }; let avg_responses = { - static RE: Lazy = Lazy::new(|| { - Regex::new(r"Average responses per second: ([0-9]+\.?[0-9]+)").unwrap() - }); + static RE: Lazy = + Lazy::new(|| Regex::new(r"Average responses per second: ([0-9]+\.?)").unwrap()); let opt_avg_responses = RE .captures_iter(&load_test_stdout) @@ -175,7 +174,7 @@ impl RunConfig { avg_responses.to_string() }) - .and_then(|v| v.parse::().ok()); + .and_then(|v| v.parse::().ok()); if let Some(avg_responses) = opt_avg_responses { avg_responses @@ -199,7 +198,7 @@ impl RunConfig { pub struct RunSuccessResults { pub tracker_process_stats: ProcessStats, - pub avg_responses: f32, + pub avg_responses: u64, } #[derive(Debug)] diff --git a/crates/bencher/src/set.rs b/crates/bencher/src/set.rs index 837aae28..3aee777c 100644 --- a/crates/bencher/src/set.rs +++ b/crates/bencher/src/set.rs @@ -1,10 +1,11 @@ use std::rc::Rc; use indexmap::IndexMap; +use num_format::{Locale, ToFormattedString}; use crate::{ common::{CpuDirection, CpuMode, TaskSetCpuList}, - html::html_best_results, + html::{html_all_runs, html_best_results}, run::{ProcessRunner, ProcessStats, RunConfig}, }; @@ -106,7 +107,8 @@ pub fn run_sets( }) .collect::>(); - html_best_results(&results); + println!("{}", html_all_runs(&results)); + println!("{}", html_best_results(&results)); } pub struct TrackerCoreCountResults { @@ -185,18 +187,21 @@ impl LoadTestRunResults { ); let load_test_runner = load_test_gen(workers); - // let load_test_keys = load_test_runner.keys(); + let load_test_keys = load_test_runner.keys(); let run_config = RunConfig { tracker_runner: tracker_process.clone(), tracker_vcpus: tracker_vcpus.clone(), load_test_runner, - load_test_vcpus, + load_test_vcpus: load_test_vcpus.clone(), }; match run_config.run(command) { Ok(r) => { - println!("- Average responses per second: {}", r.avg_responses); + println!( + "- Average responses per second: {}", + r.avg_responses.to_formatted_string(&Locale::en) + ); println!( "- Average tracker CPU utilization: {}%", r.tracker_process_stats.avg_cpu_utilization, @@ -208,17 +213,22 @@ impl LoadTestRunResults { LoadTestRunResults::Success(LoadTestRunResultsSuccess { average_responses: r.avg_responses, - // tracker_keys: tracker_process.keys(), + tracker_keys: tracker_process.keys(), tracker_info: tracker_process.info(), tracker_process_stats: r.tracker_process_stats, - // load_test_keys, + tracker_vcpus, + load_test_keys, + load_test_vcpus, }) } Err(results) => { println!("\nRun failed:\n{:#}\n", results); LoadTestRunResults::Failure(LoadTestRunResultsFailure { - // load_test_keys + tracker_keys: tracker_process.keys(), + tracker_vcpus, + load_test_keys, + load_test_vcpus, }) } } @@ -227,13 +237,18 @@ impl LoadTestRunResults { #[derive(Clone)] pub struct LoadTestRunResultsSuccess { - pub average_responses: f32, - // tracker_keys: IndexMap, + pub average_responses: u64, + pub tracker_keys: IndexMap, pub tracker_info: String, pub tracker_process_stats: ProcessStats, - // load_test_keys: IndexMap, + pub tracker_vcpus: TaskSetCpuList, + pub load_test_keys: IndexMap, + pub load_test_vcpus: TaskSetCpuList, } pub struct LoadTestRunResultsFailure { - // load_test_keys: IndexMap, + pub tracker_keys: IndexMap, + pub tracker_vcpus: TaskSetCpuList, + pub load_test_keys: IndexMap, + pub load_test_vcpus: TaskSetCpuList, } From 00f53e307be1a9638846fbb9c44532bbf40b7429 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 1 Jan 2024 17:00:24 +0100 Subject: [PATCH 4/9] bencher: improve html output --- crates/bencher/src/html.rs | 65 ++++++++++++++++++++++++-------------- 1 file changed, 42 insertions(+), 23 deletions(-) diff --git a/crates/bencher/src/html.rs b/crates/bencher/src/html.rs index e87802a1..a232bf9d 100644 --- a/crates/bencher/src/html.rs +++ b/crates/bencher/src/html.rs @@ -3,7 +3,10 @@ use indoc::formatdoc; use itertools::Itertools; use num_format::{Locale, ToFormattedString}; -use crate::set::{LoadTestRunResults, TrackerCoreCountResults}; +use crate::{ + run::ProcessStats, + set::{LoadTestRunResults, TrackerCoreCountResults}, +}; pub fn html_best_results(results: &[TrackerCoreCountResults]) -> String { let mut all_implementation_names = IndexSet::new(); @@ -99,6 +102,7 @@ pub fn html_all_runs(all_results: &[TrackerCoreCountResults]) -> String { avg_responses: Option, tracker_keys: IndexMap, tracker_vcpus: String, + tracker_stats: Option, load_test_keys: IndexMap, load_test_vcpus: String, } @@ -133,6 +137,7 @@ pub fn html_all_runs(all_results: &[TrackerCoreCountResults]) -> String { avg_responses: Some(l.average_responses), tracker_keys: l.tracker_keys.clone(), tracker_vcpus: l.tracker_vcpus.as_cpu_list(), + tracker_stats: Some(l.tracker_process_stats), load_test_keys: l.load_test_keys.clone(), load_test_vcpus: l.load_test_vcpus.as_cpu_list(), }) @@ -146,6 +151,7 @@ pub fn html_all_runs(all_results: &[TrackerCoreCountResults]) -> String { avg_responses: None, tracker_keys: l.tracker_keys.clone(), tracker_vcpus: l.tracker_vcpus.as_cpu_list(), + tracker_stats: None, load_test_keys: l.load_test_keys.clone(), load_test_vcpus: l.load_test_vcpus.as_cpu_list(), }) @@ -158,48 +164,61 @@ pub fn html_all_runs(all_results: &[TrackerCoreCountResults]) -> String { output.push_str(&formatdoc! { " -

Results for {}

+

Results for {implementation}

- {} - {} + {tracker_key_names} + + + {load_test_key_names} - {} + {body}
Cores ResponsesTracker avg CPUTracker peak RSS Tracker vCPUsLoad test vCPUs
", - implementation_name, - tracker_key_names.iter().map(|name| format!("Tracker {}", name)).join("\n"), - load_test_key_names.iter().map(|name| format!("Load test {}", name)).join("\n"), - results.into_iter().map(|r| { + implementation = implementation_name, + tracker_key_names = tracker_key_names.iter() + .map(|name| format!("{}", name)) + .join("\n"), + load_test_key_names = load_test_key_names.iter() + .map(|name| format!("Load test {}", name)) + .join("\n"), + body = results.into_iter().map(|r| { formatdoc! { " - {} - {} - {} - {} - {} - {} + {cores} + {avg_responses} + {tracker_key_values} + {cpu}% + {mem} kB + {tracker_vcpus} + {load_test_key_values} + {load_test_vcpus} ", - r.core_count, - r.avg_responses.map(|v| v.to_formatted_string(&Locale::en)).unwrap_or_else(|| "-".to_string()), - tracker_key_names.iter().map(|name| { - format!("{}", r.tracker_keys.get(name).cloned().unwrap_or_else(|| "-".to_string())) + cores = r.core_count, + avg_responses = r.avg_responses.map(|v| v.to_formatted_string(&Locale::en)) + .unwrap_or_else(|| "-".to_string()), + tracker_key_values = tracker_key_names.iter().map(|name| { + format!("{}", r.tracker_keys.get(name).cloned().unwrap_or_else(|| "-".to_string())) }).join("\n"), - load_test_key_names.iter().map(|name| { - format!("{}", r.load_test_keys.get(name).cloned().unwrap_or_else(|| "-".to_string())) + cpu = r.tracker_stats.map(|stats| stats.avg_cpu_utilization.to_string()) + .unwrap_or_else(|| "-".to_string()), + mem = r.tracker_stats.map(|stats| stats.peak_rss_kb.to_string()) + .unwrap_or_else(|| "-".to_string()), + tracker_vcpus = r.tracker_vcpus, + load_test_key_values = load_test_key_names.iter().map(|name| { + format!("{}", r.load_test_keys.get(name).cloned().unwrap_or_else(|| "-".to_string())) }).join("\n"), - r.tracker_vcpus, - r.load_test_vcpus, + load_test_vcpus = r.load_test_vcpus, } }).join("\n") }); From d944f944643c2f3ad6866d13c3240487ee12722f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 1 Jan 2024 17:10:22 +0100 Subject: [PATCH 5/9] bencher: tweak udp sets --- crates/bencher/src/protocols/udp.rs | 20 +++++++------------- 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/crates/bencher/src/protocols/udp.rs b/crates/bencher/src/protocols/udp.rs index fce0ecdd..52d0a722 100644 --- a/crates/bencher/src/protocols/udp.rs +++ b/crates/bencher/src/protocols/udp.rs @@ -161,54 +161,48 @@ impl UdpCommand { implementations: indexmap! { UdpTracker::Aquatic => vec![ AquaticUdpRunner::new(7, 1), - AquaticUdpRunner::new(14, 1), AquaticUdpRunner::new(6, 2), AquaticUdpRunner::new(12, 2), + AquaticUdpRunner::new(5, 3), ], UdpTracker::OpenTracker => vec![ OpenTrackerUdpRunner::new(8), OpenTrackerUdpRunner::new(16), ], }, - load_test_runs: simple_load_test_runs(cpu_mode, &[4, 8, 12]), + load_test_runs: simple_load_test_runs(cpu_mode, &[4, 8, 12, 16]), }, 12 => SetConfig { implementations: indexmap! { UdpTracker::Aquatic => vec![ - AquaticUdpRunner::new(11, 1), - AquaticUdpRunner::new(22, 1), AquaticUdpRunner::new(10, 2), - AquaticUdpRunner::new(20, 2), AquaticUdpRunner::new(9, 3), - AquaticUdpRunner::new(18, 3), AquaticUdpRunner::new(8, 4), AquaticUdpRunner::new(16, 4), + AquaticUdpRunner::new(9, 5), ], UdpTracker::OpenTracker => vec![ OpenTrackerUdpRunner::new(12), OpenTrackerUdpRunner::new(24), ], }, - load_test_runs: simple_load_test_runs(cpu_mode, &[8, 12, 16]), + load_test_runs: simple_load_test_runs(cpu_mode, &[8, 12, 16, 24]), }, 16 => SetConfig { implementations: indexmap! { UdpTracker::Aquatic => vec![ - AquaticUdpRunner::new(14, 2), - AquaticUdpRunner::new(28, 2), AquaticUdpRunner::new(13, 3), - AquaticUdpRunner::new(26, 3), AquaticUdpRunner::new(12, 4), - AquaticUdpRunner::new(24, 4), AquaticUdpRunner::new(11, 5), - AquaticUdpRunner::new(22, 5), + AquaticUdpRunner::new(10, 6), + AquaticUdpRunner::new(20, 6), ], UdpTracker::OpenTracker => vec![ OpenTrackerUdpRunner::new(16), OpenTrackerUdpRunner::new(32), ], }, - load_test_runs: simple_load_test_runs(cpu_mode, &[8, 12, 16]), + load_test_runs: simple_load_test_runs(cpu_mode, &[8, 12, 16, 24]), }, } } From efd357a3ece5b9ccb4d3fcf759c714fa749a5ab7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 1 Jan 2024 17:11:02 +0100 Subject: [PATCH 6/9] bencher: fix avg responses extraction bug --- crates/bencher/src/run.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/bencher/src/run.rs b/crates/bencher/src/run.rs index 293eb8ca..800f6e8d 100644 --- a/crates/bencher/src/run.rs +++ b/crates/bencher/src/run.rs @@ -164,7 +164,7 @@ impl RunConfig { let avg_responses = { static RE: Lazy = - Lazy::new(|| Regex::new(r"Average responses per second: ([0-9]+\.?)").unwrap()); + Lazy::new(|| Regex::new(r"Average responses per second: ([0-9]+)").unwrap()); let opt_avg_responses = RE .captures_iter(&load_test_stdout) From d1f9f88850ad6abe85084250055d7b27624afaff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 1 Jan 2024 17:16:23 +0100 Subject: [PATCH 7/9] bencher: tweak udp sets --- crates/bencher/src/protocols/udp.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/crates/bencher/src/protocols/udp.rs b/crates/bencher/src/protocols/udp.rs index 52d0a722..cf5e0239 100644 --- a/crates/bencher/src/protocols/udp.rs +++ b/crates/bencher/src/protocols/udp.rs @@ -89,7 +89,7 @@ impl UdpCommand { ChihayaUdpRunner::new(), ], }, - load_test_runs: simple_load_test_runs(cpu_mode, &[1, 2, 4, 6]), + load_test_runs: simple_load_test_runs(cpu_mode, &[1, 2, 4, 6, 8]), }, 2 => SetConfig { implementations: indexmap! { @@ -106,7 +106,7 @@ impl UdpCommand { ChihayaUdpRunner::new(), ], }, - load_test_runs: simple_load_test_runs(cpu_mode, &[1, 2, 4, 6]), + load_test_runs: simple_load_test_runs(cpu_mode, &[1, 2, 4, 6, 8]), }, 3 => SetConfig { implementations: indexmap! { @@ -196,6 +196,7 @@ impl UdpCommand { AquaticUdpRunner::new(11, 5), AquaticUdpRunner::new(10, 6), AquaticUdpRunner::new(20, 6), + AquaticUdpRunner::new(9, 7), ], UdpTracker::OpenTracker => vec![ OpenTrackerUdpRunner::new(16), From 3d9a35d3765bd67463b30a1d36d5e5d2b86d5d36 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 1 Jan 2024 17:22:36 +0100 Subject: [PATCH 8/9] Update CHANGELOG --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index f46af1ea..08f2c3fd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,8 @@ #### Added * Add `aquatic_peer_id` crate with peer client information logic +* Add `aquatic_bencher` crate for automated benchmarking of aquatic and other + BitTorrent trackers ### aquatic_udp @@ -22,6 +24,8 @@ * Reuse allocations in swarm response channel * Remove config key `network.poll_event_capacity` * Harden ConnectionValidator to make IP spoofing even more costly +* Distribute announce responses from swarm workers over socket workers to + decrease performance loss due to underutilized threads ### aquatic_http From 98ce4ca01988243a8425feb206a8cc24b42a393c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 1 Jan 2024 17:32:55 +0100 Subject: [PATCH 9/9] bencher: pretty-print RSS --- Cargo.lock | 21 +++++++++++++++++++++ crates/bencher/Cargo.toml | 1 + crates/bencher/src/html.rs | 6 ++++-- crates/bencher/src/run.rs | 9 ++++++--- crates/bencher/src/set.rs | 5 +++-- 5 files changed, 35 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 84d76370..8dd6b2d0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -135,6 +135,7 @@ dependencies = [ "aquatic_udp", "aquatic_udp_load_test", "clap 4.4.11", + "humanize-bytes", "indexmap 2.1.0", "indoc", "itertools 0.12.0", @@ -1398,6 +1399,15 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" +[[package]] +name = "humanize-bytes" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c8f5a0ffae64f844e5e311016d1d8184dd496c7136af420f665a877ac2f0681" +dependencies = [ + "smartstring", +] + [[package]] name = "hwloc" version = "0.5.0" @@ -2605,6 +2615,17 @@ version = "1.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4dccd0940a2dcdf68d092b8cbab7dc0ad8fa938bf95787e1b916b0e3d0e8e970" +[[package]] +name = "smartstring" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fb72c633efbaa2dd666986505016c32c3044395ceaf881518399d2f4127ee29" +dependencies = [ + "autocfg", + "static_assertions", + "version_check", +] + [[package]] name = "snafu" version = "0.7.5" diff --git a/crates/bencher/Cargo.toml b/crates/bencher/Cargo.toml index 5fd56312..2be6137a 100644 --- a/crates/bencher/Cargo.toml +++ b/crates/bencher/Cargo.toml @@ -24,6 +24,7 @@ aquatic_udp_load_test = { optional = true, workspace = true } anyhow = "1" clap = { version = "4", features = ["derive"] } +humanize-bytes = "1" indexmap = "2" indoc = "2" itertools = "0.12" diff --git a/crates/bencher/src/html.rs b/crates/bencher/src/html.rs index a232bf9d..15a0b936 100644 --- a/crates/bencher/src/html.rs +++ b/crates/bencher/src/html.rs @@ -1,3 +1,4 @@ +use humanize_bytes::humanize_bytes_binary; use indexmap::{IndexMap, IndexSet}; use indoc::formatdoc; use itertools::Itertools; @@ -198,7 +199,7 @@ pub fn html_all_runs(all_results: &[TrackerCoreCountResults]) -> String { {avg_responses} {tracker_key_values} {cpu}% - {mem} kB + {mem} {tracker_vcpus} {load_test_key_values} {load_test_vcpus} @@ -212,7 +213,8 @@ pub fn html_all_runs(all_results: &[TrackerCoreCountResults]) -> String { }).join("\n"), cpu = r.tracker_stats.map(|stats| stats.avg_cpu_utilization.to_string()) .unwrap_or_else(|| "-".to_string()), - mem = r.tracker_stats.map(|stats| stats.peak_rss_kb.to_string()) + mem = r.tracker_stats + .map(|stats| humanize_bytes_binary!(stats.peak_rss_bytes).to_string()) .unwrap_or_else(|| "-".to_string()), tracker_vcpus = r.tracker_vcpus, load_test_key_values = load_test_key_names.iter().map(|name| { diff --git a/crates/bencher/src/run.rs b/crates/bencher/src/run.rs index 800f6e8d..d3b02107 100644 --- a/crates/bencher/src/run.rs +++ b/crates/bencher/src/run.rs @@ -315,7 +315,7 @@ impl std::fmt::Display for RunErrorResults { #[derive(Debug, Clone, Copy)] pub struct ProcessStats { pub avg_cpu_utilization: f32, - pub peak_rss_kb: f32, + pub peak_rss_bytes: u64, } impl FromStr for ProcessStats { @@ -324,9 +324,12 @@ impl FromStr for ProcessStats { fn from_str(s: &str) -> Result { let mut parts = s.trim().split_whitespace(); + let avg_cpu_utilization = parts.next().ok_or(())?.parse().map_err(|_| ())?; + let peak_rss_kb: f32 = parts.next().ok_or(())?.parse().map_err(|_| ())?; + Ok(Self { - avg_cpu_utilization: parts.next().ok_or(())?.parse().map_err(|_| ())?, - peak_rss_kb: parts.next().ok_or(())?.parse().map_err(|_| ())?, + avg_cpu_utilization, + peak_rss_bytes: (peak_rss_kb * 1000.0) as u64, }) } } diff --git a/crates/bencher/src/set.rs b/crates/bencher/src/set.rs index 3aee777c..d4f7f803 100644 --- a/crates/bencher/src/set.rs +++ b/crates/bencher/src/set.rs @@ -1,5 +1,6 @@ use std::rc::Rc; +use humanize_bytes::humanize_bytes_binary; use indexmap::IndexMap; use num_format::{Locale, ToFormattedString}; @@ -207,8 +208,8 @@ impl LoadTestRunResults { r.tracker_process_stats.avg_cpu_utilization, ); println!( - "- Peak tracker RSS: {} kB", - r.tracker_process_stats.peak_rss_kb + "- Peak tracker RSS: {}", + humanize_bytes_binary!(r.tracker_process_stats.peak_rss_bytes) ); LoadTestRunResults::Success(LoadTestRunResultsSuccess {