diff --git a/CHANGELOG.md b/CHANGELOG.md index f46af1ea..08f2c3fd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,8 @@ #### Added * Add `aquatic_peer_id` crate with peer client information logic +* Add `aquatic_bencher` crate for automated benchmarking of aquatic and other + BitTorrent trackers ### aquatic_udp @@ -22,6 +24,8 @@ * Reuse allocations in swarm response channel * Remove config key `network.poll_event_capacity` * Harden ConnectionValidator to make IP spoofing even more costly +* Distribute announce responses from swarm workers over socket workers to + decrease performance loss due to underutilized threads ### aquatic_http diff --git a/Cargo.lock b/Cargo.lock index 65a00cd9..8dd6b2d0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -135,10 +135,12 @@ dependencies = [ "aquatic_udp", "aquatic_udp_load_test", "clap 4.4.11", + "humanize-bytes", "indexmap 2.1.0", "indoc", "itertools 0.12.0", "nonblock", + "num-format", "once_cell", "regex", "serde", @@ -1397,6 +1399,15 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" +[[package]] +name = "humanize-bytes" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c8f5a0ffae64f844e5e311016d1d8184dd496c7136af420f665a877ac2f0681" +dependencies = [ + "smartstring", +] + [[package]] name = "hwloc" version = "0.5.0" @@ -2604,6 +2615,17 @@ version = "1.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4dccd0940a2dcdf68d092b8cbab7dc0ad8fa938bf95787e1b916b0e3d0e8e970" +[[package]] +name = "smartstring" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fb72c633efbaa2dd666986505016c32c3044395ceaf881518399d2f4127ee29" +dependencies = [ + "autocfg", + "static_assertions", + "version_check", +] + [[package]] name = "snafu" version = "0.7.5" diff --git a/crates/bencher/Cargo.toml b/crates/bencher/Cargo.toml index bdcdaf65..2be6137a 100644 --- a/crates/bencher/Cargo.toml +++ b/crates/bencher/Cargo.toml @@ -24,9 +24,11 @@ aquatic_udp_load_test = { optional = true, workspace = true } anyhow = "1" clap = { version = "4", features = ["derive"] } +humanize-bytes = "1" indexmap = "2" indoc = "2" itertools = "0.12" +num-format = "0.4" nonblock = "0.2" once_cell = "1" regex = "1" diff --git a/crates/bencher/src/html.rs b/crates/bencher/src/html.rs new file mode 100644 index 00000000..15a0b936 --- /dev/null +++ b/crates/bencher/src/html.rs @@ -0,0 +1,230 @@ +use humanize_bytes::humanize_bytes_binary; +use indexmap::{IndexMap, IndexSet}; +use indoc::formatdoc; +use itertools::Itertools; +use num_format::{Locale, ToFormattedString}; + +use crate::{ + run::ProcessStats, + set::{LoadTestRunResults, TrackerCoreCountResults}, +}; + +pub fn html_best_results(results: &[TrackerCoreCountResults]) -> String { + let mut all_implementation_names = IndexSet::new(); + + for core_count_results in results { + all_implementation_names.extend( + core_count_results + .implementations + .iter() + .map(|r| r.name.clone()), + ); + } + + let mut data_rows = Vec::new(); + + for core_count_results in results { + let best_results = core_count_results + .implementations + .iter() + .map(|implementation| (implementation.name.clone(), implementation.best_result())) + .collect::>(); + + let best_results_for_all_implementations = all_implementation_names + .iter() + .map(|name| best_results.get(name).cloned().flatten()) + .collect::>(); + + let data_row = format!( + " + + {} + {} + + ", + core_count_results.core_count, + best_results_for_all_implementations + .into_iter() + .map(|result| { + if let Some(r) = result { + format!( + r#"{}"#, + r.tracker_info, + r.tracker_process_stats.avg_cpu_utilization, + r.average_responses.to_formatted_string(&Locale::en), + ) + } else { + "-".to_string() + } + }) + .join("\n"), + ); + + data_rows.push(data_row); + } + + format!( + " +

Best results

+ + + + + {} + + + + {} + +
CPU cores
+ ", + all_implementation_names + .iter() + .map(|name| format!("{name}")) + .join("\n"), + data_rows.join("\n") + ) +} + +pub fn html_all_runs(all_results: &[TrackerCoreCountResults]) -> String { + let mut all_implementation_names = IndexSet::new(); + + for core_count_results in all_results { + all_implementation_names.extend( + core_count_results + .implementations + .iter() + .map(|r| r.name.clone()), + ); + } + + struct R { + core_count: usize, + avg_responses: Option, + tracker_keys: IndexMap, + tracker_vcpus: String, + tracker_stats: Option, + load_test_keys: IndexMap, + load_test_vcpus: String, + } + + let mut output = String::new(); + + let mut results_by_implementation: IndexMap> = Default::default(); + + for implementation_name in all_implementation_names { + let results = results_by_implementation + .entry(implementation_name.clone()) + .or_default(); + + let mut tracker_key_names: IndexSet = Default::default(); + let mut load_test_key_names: IndexSet = Default::default(); + + for r in all_results { + for i in r + .implementations + .iter() + .filter(|i| i.name == implementation_name) + { + for c in i.configurations.iter() { + for l in c.load_tests.iter() { + match l { + LoadTestRunResults::Success(l) => { + tracker_key_names.extend(l.tracker_keys.keys().cloned()); + load_test_key_names.extend(l.load_test_keys.keys().cloned()); + + results.push(R { + core_count: r.core_count, + avg_responses: Some(l.average_responses), + tracker_keys: l.tracker_keys.clone(), + tracker_vcpus: l.tracker_vcpus.as_cpu_list(), + tracker_stats: Some(l.tracker_process_stats), + load_test_keys: l.load_test_keys.clone(), + load_test_vcpus: l.load_test_vcpus.as_cpu_list(), + }) + } + LoadTestRunResults::Failure(l) => { + tracker_key_names.extend(l.tracker_keys.keys().cloned()); + load_test_key_names.extend(l.load_test_keys.keys().cloned()); + + results.push(R { + core_count: r.core_count, + avg_responses: None, + tracker_keys: l.tracker_keys.clone(), + tracker_vcpus: l.tracker_vcpus.as_cpu_list(), + tracker_stats: None, + load_test_keys: l.load_test_keys.clone(), + load_test_vcpus: l.load_test_vcpus.as_cpu_list(), + }) + } + } + } + } + } + } + + output.push_str(&formatdoc! { + " +

Results for {implementation}

+ + + + + + {tracker_key_names} + + + + {load_test_key_names} + + + + + {body} + +
CoresResponsesTracker avg CPUTracker peak RSSTracker vCPUsLoad test vCPUs
+ ", + implementation = implementation_name, + tracker_key_names = tracker_key_names.iter() + .map(|name| format!("{}", name)) + .join("\n"), + load_test_key_names = load_test_key_names.iter() + .map(|name| format!("Load test {}", name)) + .join("\n"), + body = results.into_iter().map(|r| { + formatdoc! { + " + + {cores} + {avg_responses} + {tracker_key_values} + {cpu}% + {mem} + {tracker_vcpus} + {load_test_key_values} + {load_test_vcpus} + + ", + cores = r.core_count, + avg_responses = r.avg_responses.map(|v| v.to_formatted_string(&Locale::en)) + .unwrap_or_else(|| "-".to_string()), + tracker_key_values = tracker_key_names.iter().map(|name| { + format!("{}", r.tracker_keys.get(name).cloned().unwrap_or_else(|| "-".to_string())) + }).join("\n"), + cpu = r.tracker_stats.map(|stats| stats.avg_cpu_utilization.to_string()) + .unwrap_or_else(|| "-".to_string()), + mem = r.tracker_stats + .map(|stats| humanize_bytes_binary!(stats.peak_rss_bytes).to_string()) + .unwrap_or_else(|| "-".to_string()), + tracker_vcpus = r.tracker_vcpus, + load_test_key_values = load_test_key_names.iter().map(|name| { + format!("{}", r.load_test_keys.get(name).cloned().unwrap_or_else(|| "-".to_string())) + }).join("\n"), + load_test_vcpus = r.load_test_vcpus, + } + }).join("\n") + }); + } + + output +} diff --git a/crates/bencher/src/main.rs b/crates/bencher/src/main.rs index 2ed2be81..5494d5c8 100644 --- a/crates/bencher/src/main.rs +++ b/crates/bencher/src/main.rs @@ -1,4 +1,5 @@ pub mod common; +pub mod html; pub mod protocols; pub mod run; pub mod set; diff --git a/crates/bencher/src/protocols/udp.rs b/crates/bencher/src/protocols/udp.rs index fce0ecdd..cf5e0239 100644 --- a/crates/bencher/src/protocols/udp.rs +++ b/crates/bencher/src/protocols/udp.rs @@ -89,7 +89,7 @@ impl UdpCommand { ChihayaUdpRunner::new(), ], }, - load_test_runs: simple_load_test_runs(cpu_mode, &[1, 2, 4, 6]), + load_test_runs: simple_load_test_runs(cpu_mode, &[1, 2, 4, 6, 8]), }, 2 => SetConfig { implementations: indexmap! { @@ -106,7 +106,7 @@ impl UdpCommand { ChihayaUdpRunner::new(), ], }, - load_test_runs: simple_load_test_runs(cpu_mode, &[1, 2, 4, 6]), + load_test_runs: simple_load_test_runs(cpu_mode, &[1, 2, 4, 6, 8]), }, 3 => SetConfig { implementations: indexmap! { @@ -161,54 +161,49 @@ impl UdpCommand { implementations: indexmap! { UdpTracker::Aquatic => vec![ AquaticUdpRunner::new(7, 1), - AquaticUdpRunner::new(14, 1), AquaticUdpRunner::new(6, 2), AquaticUdpRunner::new(12, 2), + AquaticUdpRunner::new(5, 3), ], UdpTracker::OpenTracker => vec![ OpenTrackerUdpRunner::new(8), OpenTrackerUdpRunner::new(16), ], }, - load_test_runs: simple_load_test_runs(cpu_mode, &[4, 8, 12]), + load_test_runs: simple_load_test_runs(cpu_mode, &[4, 8, 12, 16]), }, 12 => SetConfig { implementations: indexmap! { UdpTracker::Aquatic => vec![ - AquaticUdpRunner::new(11, 1), - AquaticUdpRunner::new(22, 1), AquaticUdpRunner::new(10, 2), - AquaticUdpRunner::new(20, 2), AquaticUdpRunner::new(9, 3), - AquaticUdpRunner::new(18, 3), AquaticUdpRunner::new(8, 4), AquaticUdpRunner::new(16, 4), + AquaticUdpRunner::new(9, 5), ], UdpTracker::OpenTracker => vec![ OpenTrackerUdpRunner::new(12), OpenTrackerUdpRunner::new(24), ], }, - load_test_runs: simple_load_test_runs(cpu_mode, &[8, 12, 16]), + load_test_runs: simple_load_test_runs(cpu_mode, &[8, 12, 16, 24]), }, 16 => SetConfig { implementations: indexmap! { UdpTracker::Aquatic => vec![ - AquaticUdpRunner::new(14, 2), - AquaticUdpRunner::new(28, 2), AquaticUdpRunner::new(13, 3), - AquaticUdpRunner::new(26, 3), AquaticUdpRunner::new(12, 4), - AquaticUdpRunner::new(24, 4), AquaticUdpRunner::new(11, 5), - AquaticUdpRunner::new(22, 5), + AquaticUdpRunner::new(10, 6), + AquaticUdpRunner::new(20, 6), + AquaticUdpRunner::new(9, 7), ], UdpTracker::OpenTracker => vec![ OpenTrackerUdpRunner::new(16), OpenTrackerUdpRunner::new(32), ], }, - load_test_runs: simple_load_test_runs(cpu_mode, &[8, 12, 16]), + load_test_runs: simple_load_test_runs(cpu_mode, &[8, 12, 16, 24]), }, } } diff --git a/crates/bencher/src/run.rs b/crates/bencher/src/run.rs index 09573e07..d3b02107 100644 --- a/crates/bencher/src/run.rs +++ b/crates/bencher/src/run.rs @@ -163,9 +163,8 @@ impl RunConfig { }; let avg_responses = { - static RE: Lazy = Lazy::new(|| { - Regex::new(r"Average responses per second: ([0-9]+\.?[0-9]+)").unwrap() - }); + static RE: Lazy = + Lazy::new(|| Regex::new(r"Average responses per second: ([0-9]+)").unwrap()); let opt_avg_responses = RE .captures_iter(&load_test_stdout) @@ -175,7 +174,7 @@ impl RunConfig { avg_responses.to_string() }) - .and_then(|v| v.parse::().ok()); + .and_then(|v| v.parse::().ok()); if let Some(avg_responses) = opt_avg_responses { avg_responses @@ -199,7 +198,7 @@ impl RunConfig { pub struct RunSuccessResults { pub tracker_process_stats: ProcessStats, - pub avg_responses: f32, + pub avg_responses: u64, } #[derive(Debug)] @@ -316,7 +315,7 @@ impl std::fmt::Display for RunErrorResults { #[derive(Debug, Clone, Copy)] pub struct ProcessStats { pub avg_cpu_utilization: f32, - pub peak_rss_kb: f32, + pub peak_rss_bytes: u64, } impl FromStr for ProcessStats { @@ -325,9 +324,12 @@ impl FromStr for ProcessStats { fn from_str(s: &str) -> Result { let mut parts = s.trim().split_whitespace(); + let avg_cpu_utilization = parts.next().ok_or(())?.parse().map_err(|_| ())?; + let peak_rss_kb: f32 = parts.next().ok_or(())?.parse().map_err(|_| ())?; + Ok(Self { - avg_cpu_utilization: parts.next().ok_or(())?.parse().map_err(|_| ())?, - peak_rss_kb: parts.next().ok_or(())?.parse().map_err(|_| ())?, + avg_cpu_utilization, + peak_rss_bytes: (peak_rss_kb * 1000.0) as u64, }) } } diff --git a/crates/bencher/src/set.rs b/crates/bencher/src/set.rs index 597b8fa5..d4f7f803 100644 --- a/crates/bencher/src/set.rs +++ b/crates/bencher/src/set.rs @@ -1,10 +1,12 @@ use std::rc::Rc; -use indexmap::{IndexMap, IndexSet}; -use itertools::Itertools; +use humanize_bytes::humanize_bytes_binary; +use indexmap::IndexMap; +use num_format::{Locale, ToFormattedString}; use crate::{ common::{CpuDirection, CpuMode, TaskSetCpuList}, + html::{html_all_runs, html_best_results}, run::{ProcessRunner, ProcessStats, RunConfig}, }; @@ -106,21 +108,22 @@ pub fn run_sets( }) .collect::>(); - html_summary(&results); + println!("{}", html_all_runs(&results)); + println!("{}", html_best_results(&results)); } pub struct TrackerCoreCountResults { - core_count: usize, - implementations: Vec, + pub core_count: usize, + pub implementations: Vec, } pub struct ImplementationResults { - name: String, - configurations: Vec, + pub name: String, + pub configurations: Vec, } impl ImplementationResults { - fn best_result(&self) -> Option { + pub fn best_result(&self) -> Option { self.configurations .iter() .filter_map(|c| c.best_result()) @@ -135,7 +138,7 @@ impl ImplementationResults { } pub struct TrackerConfigurationResults { - load_tests: Vec, + pub load_tests: Vec, } impl TrackerConfigurationResults { @@ -185,40 +188,48 @@ impl LoadTestRunResults { ); let load_test_runner = load_test_gen(workers); - // let load_test_keys = load_test_runner.keys(); + let load_test_keys = load_test_runner.keys(); let run_config = RunConfig { tracker_runner: tracker_process.clone(), tracker_vcpus: tracker_vcpus.clone(), load_test_runner, - load_test_vcpus, + load_test_vcpus: load_test_vcpus.clone(), }; match run_config.run(command) { Ok(r) => { - println!("- Average responses per second: {}", r.avg_responses); + println!( + "- Average responses per second: {}", + r.avg_responses.to_formatted_string(&Locale::en) + ); println!( "- Average tracker CPU utilization: {}%", r.tracker_process_stats.avg_cpu_utilization, ); println!( - "- Peak tracker RSS: {} kB", - r.tracker_process_stats.peak_rss_kb + "- Peak tracker RSS: {}", + humanize_bytes_binary!(r.tracker_process_stats.peak_rss_bytes) ); LoadTestRunResults::Success(LoadTestRunResultsSuccess { average_responses: r.avg_responses, - // tracker_keys: tracker_process.keys(), + tracker_keys: tracker_process.keys(), tracker_info: tracker_process.info(), tracker_process_stats: r.tracker_process_stats, - // load_test_keys, + tracker_vcpus, + load_test_keys, + load_test_vcpus, }) } Err(results) => { println!("\nRun failed:\n{:#}\n", results); LoadTestRunResults::Failure(LoadTestRunResultsFailure { - // load_test_keys + tracker_keys: tracker_process.keys(), + tracker_vcpus, + load_test_keys, + load_test_vcpus, }) } } @@ -227,89 +238,18 @@ impl LoadTestRunResults { #[derive(Clone)] pub struct LoadTestRunResultsSuccess { - average_responses: f32, - // tracker_keys: IndexMap, - tracker_info: String, - tracker_process_stats: ProcessStats, - // load_test_keys: IndexMap, + pub average_responses: u64, + pub tracker_keys: IndexMap, + pub tracker_info: String, + pub tracker_process_stats: ProcessStats, + pub tracker_vcpus: TaskSetCpuList, + pub load_test_keys: IndexMap, + pub load_test_vcpus: TaskSetCpuList, } pub struct LoadTestRunResultsFailure { - // load_test_keys: IndexMap, -} - -pub fn html_summary(results: &[TrackerCoreCountResults]) { - let mut all_implementation_names = IndexSet::new(); - - for core_count_results in results { - all_implementation_names.extend( - core_count_results - .implementations - .iter() - .map(|r| r.name.clone()), - ); - } - - let mut data_rows = Vec::new(); - - for core_count_results in results { - let best_results = core_count_results - .implementations - .iter() - .map(|implementation| (implementation.name.clone(), implementation.best_result())) - .collect::>(); - - let best_results_for_all_implementations = all_implementation_names - .iter() - .map(|name| best_results.get(name).cloned().flatten()) - .collect::>(); - - let data_row = format!( - " - - {} - {} - - ", - core_count_results.core_count, - best_results_for_all_implementations - .into_iter() - .map(|result| { - if let Some(r) = result { - format!( - r#"{}"#, - r.tracker_info, - r.tracker_process_stats.avg_cpu_utilization, - r.average_responses, - ) - } else { - "-".to_string() - } - }) - .join("\n"), - ); - - data_rows.push(data_row); - } - - println!( - " - - - - - {} - - - - {} - -
CPU cores
- ", - all_implementation_names - .iter() - .map(|name| format!("{name}")) - .join("\n"), - data_rows.join("\n") - ) + pub tracker_keys: IndexMap, + pub tracker_vcpus: TaskSetCpuList, + pub load_test_keys: IndexMap, + pub load_test_vcpus: TaskSetCpuList, } diff --git a/crates/udp/src/common.rs b/crates/udp/src/common.rs index 439a4974..8cc322c2 100644 --- a/crates/udp/src/common.rs +++ b/crates/udp/src/common.rs @@ -171,13 +171,17 @@ impl ConnectedRequestSender { pub struct ConnectedResponseSender { senders: Vec>, + to_any_last_index_picked: usize, } impl ConnectedResponseSender { pub fn new( senders: Vec>, ) -> Self { - Self { senders } + Self { + senders, + to_any_last_index_picked: 0, + } } pub fn try_send_ref_to( @@ -193,6 +197,23 @@ impl ConnectedResponseSender { ) -> Result, thingbuf::mpsc::errors::Closed> { self.senders[index.0].send_ref() } + + pub fn send_ref_to_any( + &mut self, + ) -> Result, thingbuf::mpsc::errors::Closed> { + let start = self.to_any_last_index_picked + 1; + + for i in (start..start + self.senders.len()).map(|i| i % self.senders.len()) { + if let Ok(sender) = self.senders[i].try_send_ref() { + self.to_any_last_index_picked = i; + + return Ok(sender); + } + } + + self.to_any_last_index_picked = start % self.senders.len(); + self.send_ref_to(SocketWorkerIndex(self.to_any_last_index_picked)) + } } pub type ConnectedResponseReceiver = diff --git a/crates/udp/src/workers/swarm/mod.rs b/crates/udp/src/workers/swarm/mod.rs index 43ee480d..ea551f32 100644 --- a/crates/udp/src/workers/swarm/mod.rs +++ b/crates/udp/src/workers/swarm/mod.rs @@ -23,7 +23,7 @@ pub fn run_swarm_worker( state: State, server_start_instant: ServerStartInstant, request_receiver: Receiver<(SocketWorkerIndex, ConnectedRequest, CanonicalSocketAddr)>, - response_sender: ConnectedResponseSender, + mut response_sender: ConnectedResponseSender, statistics_sender: Sender, worker_index: SwarmWorkerIndex, ) { @@ -43,65 +43,78 @@ pub fn run_swarm_worker( loop { if let Ok((sender_index, request, src)) = request_receiver.recv_timeout(timeout) { - // It is OK to block here as long as we don't do blocking sends - // in socket workers, which could cause a deadlock - match response_sender.send_ref_to(sender_index) { - Ok(mut send_ref) => { + // It is OK to block here as long as we don't also do blocking + // sends in socket workers (doing both could cause a deadlock) + match (request, src.get().ip()) { + (ConnectedRequest::Announce(request), IpAddr::V4(ip)) => { + // It doesn't matter which socket worker receives announce responses + let mut send_ref = response_sender + .send_ref_to_any() + .expect("swarm response channel is closed"); + send_ref.addr = src; + send_ref.kind = ConnectedResponseKind::AnnounceIpv4; + + torrents + .ipv4 + .0 + .entry(request.info_hash) + .or_default() + .announce( + &config, + &statistics_sender, + &mut rng, + &request, + ip.into(), + peer_valid_until, + &mut send_ref.announce_ipv4, + ); + } + (ConnectedRequest::Announce(request), IpAddr::V6(ip)) => { + // It doesn't matter which socket worker receives announce responses + let mut send_ref = response_sender + .send_ref_to_any() + .expect("swarm response channel is closed"); - match (request, src.get().ip()) { - (ConnectedRequest::Announce(request), IpAddr::V4(ip)) => { - send_ref.kind = ConnectedResponseKind::AnnounceIpv4; - - torrents - .ipv4 - .0 - .entry(request.info_hash) - .or_default() - .announce( - &config, - &statistics_sender, - &mut rng, - &request, - ip.into(), - peer_valid_until, - &mut send_ref.announce_ipv4, - ); - } - (ConnectedRequest::Announce(request), IpAddr::V6(ip)) => { - send_ref.kind = ConnectedResponseKind::AnnounceIpv6; - - torrents - .ipv6 - .0 - .entry(request.info_hash) - .or_default() - .announce( - &config, - &statistics_sender, - &mut rng, - &request, - ip.into(), - peer_valid_until, - &mut send_ref.announce_ipv6, - ); - } - (ConnectedRequest::Scrape(request), IpAddr::V4(_)) => { - send_ref.kind = ConnectedResponseKind::Scrape; - - torrents.ipv4.scrape(request, &mut send_ref.scrape); - } - (ConnectedRequest::Scrape(request), IpAddr::V6(_)) => { - send_ref.kind = ConnectedResponseKind::Scrape; - - torrents.ipv6.scrape(request, &mut send_ref.scrape); - } - }; + send_ref.addr = src; + send_ref.kind = ConnectedResponseKind::AnnounceIpv6; + + torrents + .ipv6 + .0 + .entry(request.info_hash) + .or_default() + .announce( + &config, + &statistics_sender, + &mut rng, + &request, + ip.into(), + peer_valid_until, + &mut send_ref.announce_ipv6, + ); } - Err(_) => { - panic!("swarm response channel closed"); + (ConnectedRequest::Scrape(request), IpAddr::V4(_)) => { + let mut send_ref = response_sender + .send_ref_to(sender_index) + .expect("swarm response channel is closed"); + + send_ref.addr = src; + send_ref.kind = ConnectedResponseKind::Scrape; + + torrents.ipv4.scrape(request, &mut send_ref.scrape); } - } + (ConnectedRequest::Scrape(request), IpAddr::V6(_)) => { + let mut send_ref = response_sender + .send_ref_to(sender_index) + .expect("swarm response channel is closed"); + + send_ref.addr = src; + send_ref.kind = ConnectedResponseKind::Scrape; + + torrents.ipv6.scrape(request, &mut send_ref.scrape); + } + }; } // Run periodic tasks