diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d21c994b..dcdb4f4a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -66,6 +66,8 @@ jobs: run: cargo test --verbose --profile "test-fast" --workspace - name: Run tests (aquatic_udp with io_uring) run: cargo test --verbose --profile "test-fast" -p aquatic_udp --features "io-uring" + - name: Run tests (aquatic_udp_protocol with serde) + run: cargo test --verbose --profile "test-fast" -p aquatic_udp_protocol --features "serde" test-file-transfers: runs-on: ubuntu-latest diff --git a/Cargo.lock b/Cargo.lock index 53a4b601..7f2ca661 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -56,6 +56,21 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0942ffc6dcaadf03badf6e6a2d0228460359d5e34b57ccdc720b7382dfbd5ec5" +[[package]] +name = "android-tzdata" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" + +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + [[package]] name = "anes" version = "0.1.6" @@ -262,6 +277,7 @@ dependencies = [ "quickcheck", "regex", "serde", + "serde_with", "zerocopy", ] @@ -349,10 +365,14 @@ version = "0.8.0" dependencies = [ "aquatic_peer_id", "byteorder", + "cfg-if", "either", "pretty_assertions", "quickcheck", "quickcheck_macros", + "serde", + "serde_json", + "serde_with", "zerocopy", ] @@ -620,6 +640,19 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "chrono" +version = "0.4.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f2c685bad3eb3d45a01354cedb7d5faa66194d1d58ba6e267a8de788f79db38" +dependencies = [ + "android-tzdata", + "iana-time-zone", + "num-traits", + "serde", + "windows-targets 0.48.5", +] + [[package]] name = "ciborium" version = "0.2.1" @@ -753,6 +786,12 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f7144d30dcf0fafbce74250a3963025d8d52177934239851c917d29f1df280c2" +[[package]] +name = "core-foundation-sys" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06ea2b9bc92be3c2baa9334a323ebca2d6f074ff852cd1d7b11064035cd3868f" + [[package]] name = "cpufeatures" version = "0.2.12" @@ -899,6 +938,41 @@ dependencies = [ "typenum", ] +[[package]] +name = "darling" +version = "0.20.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0209d94da627ab5605dcccf08bb18afa5009cfbef48d8a8b7d7bdbc79be25c5e" +dependencies = [ + "darling_core", + "darling_macro", +] + +[[package]] +name = "darling_core" +version = "0.20.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "177e3443818124b357d8e76f53be906d60937f0d3a90773a664fa63fa253e621" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim", + "syn 2.0.48", +] + +[[package]] +name = "darling_macro" +version = "0.20.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "836a9bbc7ad63342d6d6e7b815ccab164bc77a2d95d84bc3117a8c0d5c98e2d5" +dependencies = [ + "darling_core", + "quote", + "syn 2.0.48", +] + [[package]] name = "data-encoding" version = "2.5.0" @@ -912,6 +986,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b42b6fa04a440b495c8b04d0e71b707c585f83cb9cb28cf8cd0d976c315e31b4" dependencies = [ "powerfmt", + "serde", ] [[package]] @@ -1449,6 +1524,35 @@ dependencies = [ "want", ] +[[package]] +name = "iana-time-zone" +version = "0.1.59" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6a67363e2aa4443928ce15e57ebae94fd8949958fd1223c4cfc0cd473ad7539" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "wasm-bindgen", + "windows-core", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", +] + +[[package]] +name = "ident_case" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" + [[package]] name = "idna" version = "0.5.0" @@ -1467,6 +1571,7 @@ checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" dependencies = [ "autocfg", "hashbrown 0.12.3", + "serde", ] [[package]] @@ -1477,6 +1582,7 @@ checksum = "d530e1a18b1cb4c484e6e34556a0d948706958449fca0cab753d649f2bce3d1f" dependencies = [ "equivalent", "hashbrown 0.14.3", + "serde", ] [[package]] @@ -2582,6 +2688,35 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_with" +version = "3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64cd236ccc1b7a29e7e2739f27c0b2dd199804abc4290e32f59f3b68d6405c23" +dependencies = [ + "base64", + "chrono", + "hex", + "indexmap 1.9.3", + "indexmap 2.1.0", + "serde", + "serde_json", + "serde_with_macros", + "time", +] + +[[package]] +name = "serde_with_macros" +version = "3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93634eb5f75a2323b16de4748022ac4297f9e76b6dced2be287a099f41b5e788" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "syn 2.0.48", +] + [[package]] name = "sha1" version = "0.10.6" @@ -3216,6 +3351,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows-core" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" +dependencies = [ + "windows-targets 0.52.0", +] + [[package]] name = "windows-sys" version = "0.48.0" diff --git a/crates/peer_id/Cargo.toml b/crates/peer_id/Cargo.toml index 4acc0995..3aad37b9 100644 --- a/crates/peer_id/Cargo.toml +++ b/crates/peer_id/Cargo.toml @@ -20,5 +20,6 @@ compact_str = "0.7" hex = "0.4" regex = "1" serde = { version = "1", features = ["derive"] } +serde_with = { version = "3", features = ["hex"] } quickcheck = { version = "1", optional = true } zerocopy = { version = "0.7", features = ["derive"] } \ No newline at end of file diff --git a/crates/peer_id/src/lib.rs b/crates/peer_id/src/lib.rs index 304e1e5a..4cc1af35 100644 --- a/crates/peer_id/src/lib.rs +++ b/crates/peer_id/src/lib.rs @@ -3,8 +3,10 @@ use std::{borrow::Cow, fmt::Display, sync::OnceLock}; use compact_str::{format_compact, CompactString}; use regex::bytes::Regex; use serde::{Deserialize, Serialize}; +use serde_with::{hex::Hex, serde_as}; use zerocopy::{AsBytes, FromBytes, FromZeroes}; +#[serde_as] #[derive( Debug, Clone, @@ -21,7 +23,8 @@ use zerocopy::{AsBytes, FromBytes, FromZeroes}; FromZeroes, )] #[repr(transparent)] -pub struct PeerId(pub [u8; 20]); +#[serde(transparent)] +pub struct PeerId(#[serde_as(as = "Hex")] pub [u8; 20]); impl PeerId { pub fn client(&self) -> PeerClient { diff --git a/crates/udp/src/common.rs b/crates/udp/src/common.rs index f803ea5e..763b0552 100644 --- a/crates/udp/src/common.rs +++ b/crates/udp/src/common.rs @@ -250,17 +250,23 @@ mod tests { let f = PeerStatus::from_event_and_bytes_left; - assert_eq!(Stopped, f(AnnounceEvent::Stopped, NumberOfBytes::new(0))); - assert_eq!(Stopped, f(AnnounceEvent::Stopped, NumberOfBytes::new(1))); - - assert_eq!(Seeding, f(AnnounceEvent::Started, NumberOfBytes::new(0))); - assert_eq!(Leeching, f(AnnounceEvent::Started, NumberOfBytes::new(1))); - - assert_eq!(Seeding, f(AnnounceEvent::Completed, NumberOfBytes::new(0))); - assert_eq!(Leeching, f(AnnounceEvent::Completed, NumberOfBytes::new(1))); - - assert_eq!(Seeding, f(AnnounceEvent::None, NumberOfBytes::new(0))); - assert_eq!(Leeching, f(AnnounceEvent::None, NumberOfBytes::new(1))); + assert_eq!(Stopped, f(AnnounceEvent::Stopped, NumberOfBytes(0.into()))); + assert_eq!(Stopped, f(AnnounceEvent::Stopped, NumberOfBytes(1.into()))); + + assert_eq!(Seeding, f(AnnounceEvent::Started, NumberOfBytes(0.into()))); + assert_eq!(Leeching, f(AnnounceEvent::Started, NumberOfBytes(1.into()))); + + assert_eq!( + Seeding, + f(AnnounceEvent::Completed, NumberOfBytes(0.into())) + ); + assert_eq!( + Leeching, + f(AnnounceEvent::Completed, NumberOfBytes(1.into())) + ); + + assert_eq!(Seeding, f(AnnounceEvent::None, NumberOfBytes(0.into()))); + assert_eq!(Leeching, f(AnnounceEvent::None, NumberOfBytes(1.into()))); } // Assumes that announce response with maximum amount of ipv6 peers will @@ -273,24 +279,24 @@ mod tests { let peers = ::std::iter::repeat(ResponsePeer { ip_address: Ipv6AddrBytes(Ipv6Addr::new(1, 1, 1, 1, 1, 1, 1, 1).octets()), - port: Port::new(1), + port: Port(1.into()), }) .take(config.protocol.max_response_peers) .collect(); let response = Response::AnnounceIpv6(AnnounceResponse { fixed: AnnounceResponseFixedData { - transaction_id: TransactionId::new(1), - announce_interval: AnnounceInterval::new(1), - seeders: NumberOfPeers::new(1), - leechers: NumberOfPeers::new(1), + transaction_id: TransactionId(1.into()), + announce_interval: AnnounceInterval(1.into()), + seeders: NumberOfPeers(1.into()), + leechers: NumberOfPeers(1.into()), }, peers, }); let mut buf = Vec::new(); - response.write(&mut buf).unwrap(); + response.write_bytes(&mut buf).unwrap(); println!("Buffer len: {}", buf.len()); diff --git a/crates/udp/src/workers/socket/mio.rs b/crates/udp/src/workers/socket/mio.rs index 070e00b3..38a18650 100644 --- a/crates/udp/src/workers/socket/mio.rs +++ b/crates/udp/src/workers/socket/mio.rs @@ -198,7 +198,7 @@ impl SocketWorker { } let src = CanonicalSocketAddr::new(src); - let request_parsable = match Request::from_bytes( + let request_parsable = match Request::parse_bytes( &self.buffer[..bytes_read], self.config.protocol.max_scrape_torrents, ) { @@ -431,7 +431,7 @@ impl SocketWorker { ) { let mut buffer = Cursor::new(&mut buffer[..]); - if let Err(err) = response.write(&mut buffer) { + if let Err(err) = response.write_bytes(&mut buffer) { ::log::error!("failed writing response to buffer: {:#}", err); return; diff --git a/crates/udp/src/workers/socket/storage.rs b/crates/udp/src/workers/socket/storage.rs index 84c11a79..e0d3140b 100644 --- a/crates/udp/src/workers/socket/storage.rs +++ b/crates/udp/src/workers/socket/storage.rs @@ -155,8 +155,8 @@ mod tests { } let request = ScrapeRequest { - transaction_id: TransactionId::new(t), - connection_id: ConnectionId::new(c), + transaction_id: TransactionId(t.into()), + connection_id: ConnectionId(c.into()), info_hashes, }; @@ -191,9 +191,9 @@ mod tests { ( i, TorrentScrapeStatistics { - seeders: NumberOfPeers::new((info_hash.0[0]) as i32), - leechers: NumberOfPeers::new(0), - completed: NumberOfDownloads::new(0), + seeders: NumberOfPeers(((info_hash.0[0]) as i32).into()), + leechers: NumberOfPeers(0.into()), + completed: NumberOfDownloads(0.into()), }, ) }) diff --git a/crates/udp/src/workers/socket/uring/recv_helper.rs b/crates/udp/src/workers/socket/uring/recv_helper.rs index 4a485f63..0aef6d91 100644 --- a/crates/udp/src/workers/socket/uring/recv_helper.rs +++ b/crates/udp/src/workers/socket/uring/recv_helper.rs @@ -138,7 +138,7 @@ impl RecvHelper { let addr = CanonicalSocketAddr::new(addr); - let request = Request::from_bytes(msg.payload_data(), self.max_scrape_torrents) + let request = Request::parse_bytes(msg.payload_data(), self.max_scrape_torrents) .map_err(|err| Error::RequestParseError(err, addr))?; Ok((request, addr)) diff --git a/crates/udp/src/workers/socket/uring/send_buffers.rs b/crates/udp/src/workers/socket/uring/send_buffers.rs index 458d96f6..902e251f 100644 --- a/crates/udp/src/workers/socket/uring/send_buffers.rs +++ b/crates/udp/src/workers/socket/uring/send_buffers.rs @@ -196,7 +196,7 @@ impl SendBuffer { let mut cursor = Cursor::new(&mut self.bytes[..]); - match response.write(&mut cursor) { + match response.write_bytes(&mut cursor) { Ok(()) => { self.iovec.iov_len = cursor.position() as usize; diff --git a/crates/udp/src/workers/socket/validator.rs b/crates/udp/src/workers/socket/validator.rs index c68d1efc..fffebd07 100644 --- a/crates/udp/src/workers/socket/validator.rs +++ b/crates/udp/src/workers/socket/validator.rs @@ -62,7 +62,7 @@ impl ConnectionValidator { connection_id_bytes[..4].copy_from_slice(&elapsed); connection_id_bytes[4..].copy_from_slice(&hash); - ConnectionId::new(i64::from_ne_bytes(connection_id_bytes)) + ConnectionId(i64::from_ne_bytes(connection_id_bytes).into()) } pub fn connection_id_valid( diff --git a/crates/udp/src/workers/swarm/storage.rs b/crates/udp/src/workers/swarm/storage.rs index 0c1dcc20..a11a37e0 100644 --- a/crates/udp/src/workers/swarm/storage.rs +++ b/crates/udp/src/workers/swarm/storage.rs @@ -89,9 +89,9 @@ impl TorrentMap { .get(&info_hash) .map(|torrent_data| torrent_data.scrape_statistics()) .unwrap_or_else(|| TorrentScrapeStatistics { - seeders: NumberOfPeers::new(0), - leechers: NumberOfPeers::new(0), - completed: NumberOfDownloads::new(0), + seeders: NumberOfPeers(0.into()), + leechers: NumberOfPeers(0.into()), + completed: NumberOfDownloads(0.into()), }); (i, stats) @@ -222,11 +222,11 @@ impl TorrentData { let response = AnnounceResponse { fixed: AnnounceResponseFixedData { transaction_id: request.transaction_id, - announce_interval: AnnounceInterval::new( - config.protocol.peer_announce_interval, + announce_interval: AnnounceInterval( + config.protocol.peer_announce_interval.into(), ), - leechers: NumberOfPeers::new(leechers.try_into().unwrap_or(i32::MAX)), - seeders: NumberOfPeers::new(seeders.try_into().unwrap_or(i32::MAX)), + leechers: NumberOfPeers(leechers.try_into().unwrap_or(i32::MAX).into()), + seeders: NumberOfPeers(seeders.try_into().unwrap_or(i32::MAX).into()), }, peers: peer_map.extract_response_peers(max_num_peers_to_take), }; @@ -248,11 +248,11 @@ impl TorrentData { let response = AnnounceResponse { fixed: AnnounceResponseFixedData { transaction_id: request.transaction_id, - announce_interval: AnnounceInterval::new( - config.protocol.peer_announce_interval, + announce_interval: AnnounceInterval( + config.protocol.peer_announce_interval.into(), ), - leechers: NumberOfPeers::new(leechers.try_into().unwrap_or(i32::MAX)), - seeders: NumberOfPeers::new(seeders.try_into().unwrap_or(i32::MAX)), + leechers: NumberOfPeers(leechers.try_into().unwrap_or(i32::MAX).into()), + seeders: NumberOfPeers(seeders.try_into().unwrap_or(i32::MAX).into()), }, peers: peer_map.extract_response_peers(rng, max_num_peers_to_take), }; @@ -307,9 +307,9 @@ impl TorrentData { }; TorrentScrapeStatistics { - seeders: NumberOfPeers::new(seeders.try_into().unwrap_or(i32::MAX)), - leechers: NumberOfPeers::new(leechers.try_into().unwrap_or(i32::MAX)), - completed: NumberOfDownloads::new(0), + seeders: NumberOfPeers(seeders.try_into().unwrap_or(i32::MAX).into()), + leechers: NumberOfPeers(leechers.try_into().unwrap_or(i32::MAX).into()), + completed: NumberOfDownloads(0.into()), } } } diff --git a/crates/udp/tests/common/mod.rs b/crates/udp/tests/common/mod.rs index ee8e365e..4c8c5f69 100644 --- a/crates/udp/tests/common/mod.rs +++ b/crates/udp/tests/common/mod.rs @@ -26,7 +26,7 @@ pub fn run_tracker(config: Config) { pub fn connect(socket: &UdpSocket, tracker_addr: SocketAddr) -> anyhow::Result { let request = Request::Connect(ConnectRequest { - transaction_id: TransactionId::new(0), + transaction_id: TransactionId(0.into()), }); let response = request_and_response(socket, tracker_addr, request)?; @@ -56,17 +56,17 @@ pub fn announce( let request = Request::Announce(AnnounceRequest { connection_id, action_placeholder: Default::default(), - transaction_id: TransactionId::new(0), + transaction_id: TransactionId(0.into()), info_hash, peer_id, - bytes_downloaded: NumberOfBytes::new(0), - bytes_uploaded: NumberOfBytes::new(0), - bytes_left: NumberOfBytes::new(if seeder { 0 } else { 1 }), + bytes_downloaded: NumberOfBytes(0.into()), + bytes_uploaded: NumberOfBytes(0.into()), + bytes_left: NumberOfBytes(if seeder { 0 } else { 1 }.into()), event: AnnounceEvent::Started.into(), ip_address: Ipv4AddrBytes([0; 4]), - key: PeerKey::new(0), - peers_wanted: NumberOfPeers::new(peers_wanted as i32), - port: Port::new(peer_port), + key: PeerKey(0.into()), + peers_wanted: NumberOfPeers((peers_wanted as i32).into()), + port: Port(peer_port.into()), }); request_and_response(socket, tracker_addr, request) @@ -80,7 +80,7 @@ pub fn scrape( ) -> anyhow::Result { let request = Request::Scrape(ScrapeRequest { connection_id, - transaction_id: TransactionId::new(0), + transaction_id: TransactionId(0.into()), info_hashes, }); @@ -104,7 +104,7 @@ pub fn request_and_response( let mut buffer = Cursor::new(&mut buffer[..]); request - .write(&mut buffer) + .write_bytes(&mut buffer) .with_context(|| "write request")?; let bytes_written = buffer.position() as usize; @@ -119,6 +119,6 @@ pub fn request_and_response( .recv_from(&mut buffer) .with_context(|| "recv response")?; - Response::from_bytes(&buffer[..bytes_read], true).with_context(|| "parse response") + Response::parse_bytes(&buffer[..bytes_read], true).with_context(|| "parse response") } } diff --git a/crates/udp/tests/invalid_connection_id.rs b/crates/udp/tests/invalid_connection_id.rs index 11579a33..33314a77 100644 --- a/crates/udp/tests/invalid_connection_id.rs +++ b/crates/udp/tests/invalid_connection_id.rs @@ -41,22 +41,22 @@ fn test_invalid_connection_id() -> anyhow::Result<()> { let announce_request = Request::Announce(AnnounceRequest { connection_id: invalid_connection_id, action_placeholder: Default::default(), - transaction_id: TransactionId::new(0), + transaction_id: TransactionId(0.into()), info_hash: InfoHash([0; 20]), peer_id: PeerId([0; 20]), - bytes_downloaded: NumberOfBytes::new(0), - bytes_uploaded: NumberOfBytes::new(0), - bytes_left: NumberOfBytes::new(0), + bytes_downloaded: NumberOfBytes(0.into()), + bytes_uploaded: NumberOfBytes(0.into()), + bytes_left: NumberOfBytes(0.into()), event: AnnounceEvent::Started.into(), ip_address: Ipv4AddrBytes([0; 4]), - key: PeerKey::new(0), - peers_wanted: NumberOfPeers::new(10), - port: Port::new(1), + key: PeerKey(0.into()), + peers_wanted: NumberOfPeers(10.into()), + port: Port(1.into()), }); let scrape_request = Request::Scrape(ScrapeRequest { connection_id: invalid_connection_id, - transaction_id: TransactionId::new(0), + transaction_id: TransactionId(0.into()), info_hashes: vec![InfoHash([0; 20])], }); @@ -77,7 +77,7 @@ fn no_response( let mut buffer = Cursor::new(&mut buffer[..]); request - .write(&mut buffer) + .write_bytes(&mut buffer) .with_context(|| "write request")?; let bytes_written = buffer.position() as usize; diff --git a/crates/udp_load_test/src/utils.rs b/crates/udp_load_test/src/utils.rs index 6b7f748c..c646c6bd 100644 --- a/crates/udp_load_test/src/utils.rs +++ b/crates/udp_load_test/src/utils.rs @@ -19,7 +19,7 @@ pub fn generate_info_hash() -> InfoHash { } pub fn generate_transaction_id(rng: &mut impl Rng) -> TransactionId { - TransactionId::new(rng.gen()) + TransactionId(rng.gen::().into()) } pub fn create_connect_request(transaction_id: TransactionId) -> Request { diff --git a/crates/udp_load_test/src/worker/mod.rs b/crates/udp_load_test/src/worker/mod.rs index b3be4762..b195988f 100644 --- a/crates/udp_load_test/src/worker/mod.rs +++ b/crates/udp_load_test/src/worker/mod.rs @@ -71,7 +71,7 @@ impl Worker { for _ in events.iter() { while let Ok(amt) = self.socket.recv(&mut self.buffer) { - match Response::from_bytes(&self.buffer[0..amt], self.addr.is_ipv4()) { + match Response::parse_bytes(&self.buffer[0..amt], self.addr.is_ipv4()) { Ok(response) => { if let Some(request) = self.process_response(response) { self.send_request(request); @@ -192,7 +192,7 @@ impl Worker { scrape_hash_indices, connection_id, peer_id: generate_peer_id(), - port: Port::new(self.rng.gen()), + port: Port(self.rng.gen::().into()), } } @@ -232,9 +232,9 @@ impl Worker { .rng .gen_bool(self.config.requests.peer_seeder_probability) { - (AnnounceEvent::Completed, NumberOfBytes::new(0)) + (AnnounceEvent::Completed, NumberOfBytes(0.into())) } else { - (AnnounceEvent::Started, NumberOfBytes::new(50)) + (AnnounceEvent::Started, NumberOfBytes(50.into())) } }; @@ -244,13 +244,13 @@ impl Worker { transaction_id, info_hash: torrent_peer.info_hash, peer_id: torrent_peer.peer_id, - bytes_downloaded: NumberOfBytes::new(50), - bytes_uploaded: NumberOfBytes::new(50), + bytes_downloaded: NumberOfBytes(50.into()), + bytes_uploaded: NumberOfBytes(50.into()), bytes_left, event: event.into(), ip_address: Ipv4AddrBytes([0; 4]), - key: PeerKey::new(0), - peers_wanted: NumberOfPeers::new(self.config.requests.announce_peers_wanted), + key: PeerKey(0.into()), + peers_wanted: NumberOfPeers(self.config.requests.announce_peers_wanted.into()), port: torrent_peer.port, }) .into() @@ -288,7 +288,7 @@ impl Worker { fn send_request(&mut self, request: Request) { let mut cursor = Cursor::new(self.buffer); - match request.write(&mut cursor) { + match request.write_bytes(&mut cursor) { Ok(()) => { let position = cursor.position() as usize; let inner = cursor.get_ref(); diff --git a/crates/udp_protocol/Cargo.toml b/crates/udp_protocol/Cargo.toml index cd4614dd..7206d24b 100644 --- a/crates/udp_protocol/Cargo.toml +++ b/crates/udp_protocol/Cargo.toml @@ -10,14 +10,28 @@ repository.workspace = true readme.workspace = true rust-version.workspace = true +[features] +# Derive serde Serialize and Deserialize for most types. +# +# The implementations leans towards pretty-printing. For instance, it will +# convert info hashes to hex strings. If you want maximum performance, use +# the zerocopy-based write_bytes/parse_bytes methods instead. +serde = ["dep:serde", "dep:serde_with"] + [dependencies] aquatic_peer_id.workspace = true byteorder = "1" +cfg-if = "1" either = "1" zerocopy = { version = "0.7", features = ["derive"] } +# serde feature +serde = { version = "1", optional = true } +serde_with = { version = "3", features = ["hex"], optional = true } + [dev-dependencies] pretty_assertions = "1" +serde_json = "1" quickcheck = "1" quickcheck_macros = "1" diff --git a/crates/udp_protocol/src/common.rs b/crates/udp_protocol/src/common.rs index 6c54cb21..8dd91df8 100644 --- a/crates/udp_protocol/src/common.rs +++ b/crates/udp_protocol/src/common.rs @@ -5,94 +5,59 @@ pub use aquatic_peer_id::{PeerClient, PeerId}; use zerocopy::network_endian::{I32, I64, U16, U32}; use zerocopy::{AsBytes, FromBytes, FromZeroes}; -pub trait Ip: Clone + Copy + Debug + PartialEq + Eq + std::hash::Hash + AsBytes {} - -#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug, AsBytes, FromBytes, FromZeroes)] -#[repr(transparent)] -pub struct AnnounceInterval(pub I32); - -impl AnnounceInterval { - pub fn new(v: i32) -> Self { - Self(I32::new(v)) - } -} - -#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug, AsBytes, FromBytes, FromZeroes)] -#[repr(transparent)] -pub struct InfoHash(pub [u8; 20]); - -#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug, AsBytes, FromBytes, FromZeroes)] -#[repr(transparent)] -pub struct ConnectionId(pub I64); - -impl ConnectionId { - pub fn new(v: i64) -> Self { - Self(I64::new(v)) - } -} - -#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug, AsBytes, FromBytes, FromZeroes)] -#[repr(transparent)] -pub struct TransactionId(pub I32); - -impl TransactionId { - pub fn new(v: i32) -> Self { - Self(I32::new(v)) - } +#[cfg(feature = "serde")] +use serde::{Deserialize, Serialize}; +#[cfg(feature = "serde")] +use serde_with::{hex::Hex, serde_as, FromInto}; + +// This mess is necessary because #[cfg_attr] doesn't seem to work on struct fields +macro_rules! zerocopy_newtype { + ($newtype_name:ident, $inner_type:tt, $derive_as:expr) => { + cfg_if::cfg_if! { + if #[cfg(feature = "serde")] { + #[derive(PartialEq, Eq, Hash, Clone, Copy, Debug, AsBytes, FromBytes, FromZeroes)] + #[repr(transparent)] + #[serde_as] + #[derive(Serialize, Deserialize)] + #[serde(transparent)] + pub struct $newtype_name( + #[serde_as(as = $derive_as)] + pub $inner_type + ); + } else { + #[derive(PartialEq, Eq, Hash, Clone, Copy, Debug, AsBytes, FromBytes, FromZeroes)] + #[repr(transparent)] + pub struct $newtype_name( + pub $inner_type + ); + } + } + }; } -#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug, AsBytes, FromBytes, FromZeroes)] -#[repr(transparent)] -pub struct NumberOfBytes(pub I64); +pub trait Ip: Clone + Copy + Debug + PartialEq + Eq + std::hash::Hash + AsBytes {} -impl NumberOfBytes { - pub fn new(v: i64) -> Self { - Self(I64::new(v)) - } -} +zerocopy_newtype!(AnnounceInterval, I32, "FromInto"); -#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug, AsBytes, FromBytes, FromZeroes)] -#[repr(transparent)] -pub struct NumberOfPeers(pub I32); +zerocopy_newtype!(ConnectionId, I64, "FromInto"); -impl NumberOfPeers { - pub fn new(v: i32) -> Self { - Self(I32::new(v)) - } -} +zerocopy_newtype!(TransactionId, I32, "FromInto"); -#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug, AsBytes, FromBytes, FromZeroes)] -#[repr(transparent)] -pub struct NumberOfDownloads(pub I32); +zerocopy_newtype!(NumberOfBytes, I64, "FromInto"); -impl NumberOfDownloads { - pub fn new(v: i32) -> Self { - Self(I32::new(v)) - } -} +zerocopy_newtype!(NumberOfPeers, I32, "FromInto"); -#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug, AsBytes, FromBytes, FromZeroes)] -#[repr(transparent)] -pub struct Port(pub U16); +zerocopy_newtype!(NumberOfDownloads, I32, "FromInto"); -impl Port { - pub fn new(v: u16) -> Self { - Self(U16::new(v)) - } -} +zerocopy_newtype!(Port, U16, "FromInto"); -#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug, AsBytes, FromBytes, FromZeroes)] -#[repr(transparent)] -pub struct PeerKey(pub I32); +zerocopy_newtype!(PeerKey, I32, "FromInto"); -impl PeerKey { - pub fn new(v: i32) -> Self { - Self(I32::new(v)) - } -} +zerocopy_newtype!(InfoHash, [u8; 20], "Hex"); #[derive(PartialEq, Eq, Clone, Copy, Debug, Hash, AsBytes, FromBytes, FromZeroes)] #[repr(C, packed)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct ResponsePeer { pub ip_address: I, pub port: Port, @@ -100,6 +65,11 @@ pub struct ResponsePeer { #[derive(PartialEq, Eq, Hash, Clone, Copy, Debug, AsBytes, FromBytes, FromZeroes)] #[repr(transparent)] +#[cfg_attr( + feature = "serde", + derive(Serialize, Deserialize), + serde(from = "Ipv4Addr", into = "Ipv4Addr") +)] pub struct Ipv4AddrBytes(pub [u8; 4]); impl Ip for Ipv4AddrBytes {} @@ -118,6 +88,11 @@ impl From for Ipv4AddrBytes { #[derive(PartialEq, Eq, Hash, Clone, Copy, Debug, AsBytes, FromBytes, FromZeroes)] #[repr(transparent)] +#[cfg_attr( + feature = "serde", + derive(Serialize, Deserialize), + serde(from = "Ipv6Addr", into = "Ipv6Addr") +)] pub struct Ipv6AddrBytes(pub [u8; 16]); impl Ip for Ipv6AddrBytes {} diff --git a/crates/udp_protocol/src/request.rs b/crates/udp_protocol/src/request.rs index cd2963e4..6dc721fc 100644 --- a/crates/udp_protocol/src/request.rs +++ b/crates/udp_protocol/src/request.rs @@ -5,6 +5,9 @@ use either::Either; use zerocopy::FromZeroes; use zerocopy::{byteorder::network_endian::I32, AsBytes, FromBytes}; +#[cfg(feature = "serde")] +use serde::{Deserialize, Serialize}; + use aquatic_peer_id::PeerId; use super::common::*; @@ -12,6 +15,7 @@ use super::common::*; const PROTOCOL_IDENTIFIER: i64 = 4_497_486_125_440; #[derive(PartialEq, Eq, Clone, Debug)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub enum Request { Connect(ConnectRequest), Announce(AnnounceRequest), @@ -19,30 +23,15 @@ pub enum Request { } impl Request { - pub fn write(self, bytes: &mut impl Write) -> Result<(), io::Error> { + pub fn write_bytes(self, bytes: &mut impl Write) -> Result<(), io::Error> { match self { - Request::Connect(r) => { - bytes.write_i64::(PROTOCOL_IDENTIFIER)?; - bytes.write_i32::(0)?; - bytes.write_all(r.transaction_id.as_bytes())?; - } - - Request::Announce(r) => { - bytes.write_all(r.as_bytes())?; - } - - Request::Scrape(r) => { - bytes.write_all(r.connection_id.as_bytes())?; - bytes.write_i32::(2)?; - bytes.write_all(r.transaction_id.as_bytes())?; - bytes.write_all((*r.info_hashes.as_slice()).as_bytes())?; - } + Request::Connect(r) => r.write_bytes(bytes), + Request::Announce(r) => r.write_bytes(bytes), + Request::Scrape(r) => r.write_bytes(bytes), } - - Ok(()) } - pub fn from_bytes(bytes: &[u8], max_scrape_torrents: u8) -> Result { + pub fn parse_bytes(bytes: &[u8], max_scrape_torrents: u8) -> Result { let action = bytes .get(8..12) .map(|bytes| I32::from_bytes(bytes.try_into().unwrap())) @@ -146,16 +135,29 @@ impl From for Request { } #[derive(PartialEq, Eq, Clone, Debug)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct ConnectRequest { pub transaction_id: TransactionId, } +impl ConnectRequest { + pub fn write_bytes(&self, bytes: &mut impl Write) -> Result<(), io::Error> { + bytes.write_i64::(PROTOCOL_IDENTIFIER)?; + bytes.write_i32::(0)?; + bytes.write_all(self.transaction_id.as_bytes())?; + + Ok(()) + } +} + #[derive(PartialEq, Eq, Clone, Debug, AsBytes, FromBytes, FromZeroes)] #[repr(C, packed)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct AnnounceRequest { pub connection_id: ConnectionId, /// This field is only present to enable zero-copy serialization and /// deserialization. + #[cfg_attr(feature = "serde", serde(skip))] pub action_placeholder: AnnounceActionPlaceholder, pub transaction_id: TransactionId, pub info_hash: InfoHash, @@ -170,6 +172,12 @@ pub struct AnnounceRequest { pub port: Port, } +impl AnnounceRequest { + pub fn write_bytes(&self, bytes: &mut impl Write) -> Result<(), io::Error> { + bytes.write_all(self.as_bytes()) + } +} + /// Note: Request::from_bytes only creates this struct with value 1 #[derive(PartialEq, Eq, Clone, Copy, Debug, AsBytes, FromBytes, FromZeroes)] #[repr(transparent)] @@ -184,25 +192,30 @@ impl Default for AnnounceActionPlaceholder { /// Note: Request::from_bytes only creates this struct with values 0..=3 #[derive(PartialEq, Eq, Clone, Copy, Debug, AsBytes, FromBytes, FromZeroes)] #[repr(transparent)] +#[cfg_attr( + feature = "serde", + derive(Serialize, Deserialize), + serde(from = "AnnounceEvent", into = "AnnounceEvent") +)] pub struct AnnounceEventBytes(I32); impl From for AnnounceEventBytes { fn from(value: AnnounceEvent) -> Self { - Self(I32::new(match value { - AnnounceEvent::None => 0, - AnnounceEvent::Completed => 1, - AnnounceEvent::Started => 2, - AnnounceEvent::Stopped => 3, - })) + Self(I32::new(value as i32)) } } #[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)] +#[cfg_attr( + feature = "serde", + derive(Serialize, Deserialize), + serde(rename_all = "lowercase") +)] pub enum AnnounceEvent { + None, + Completed, Started, Stopped, - Completed, - None, } impl From for AnnounceEvent { @@ -217,12 +230,24 @@ impl From for AnnounceEvent { } #[derive(PartialEq, Eq, Clone, Debug)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct ScrapeRequest { pub connection_id: ConnectionId, pub transaction_id: TransactionId, pub info_hashes: Vec, } +impl ScrapeRequest { + pub fn write_bytes(&self, bytes: &mut impl Write) -> Result<(), io::Error> { + bytes.write_all(self.connection_id.as_bytes())?; + bytes.write_i32::(2)?; + bytes.write_all(self.transaction_id.as_bytes())?; + bytes.write_all((*self.info_hashes.as_slice()).as_bytes())?; + + Ok(()) + } +} + #[derive(Debug)] pub enum RequestParseError { Sendable { @@ -299,7 +324,7 @@ mod tests { bytes_left: NumberOfBytes(I64::new(i64::arbitrary(g))), event: AnnounceEvent::arbitrary(g).into(), ip_address: Ipv4AddrBytes::arbitrary(g), - key: PeerKey::new(i32::arbitrary(g)), + key: PeerKey(i32::arbitrary(g).into()), peers_wanted: NumberOfPeers(I32::new(i32::arbitrary(g))), port: Port(U16::new(u16::arbitrary(g))), } @@ -323,8 +348,8 @@ mod tests { fn same_after_conversion(request: Request) -> bool { let mut buf = Vec::new(); - request.clone().write(&mut buf).unwrap(); - let r2 = Request::from_bytes(&buf[..], ::std::u8::MAX).unwrap(); + request.clone().write_bytes(&mut buf).unwrap(); + let r2 = Request::parse_bytes(&buf[..], ::std::u8::MAX).unwrap(); let success = request == r2; @@ -353,4 +378,35 @@ mod tests { TestResult::from_bool(same_after_conversion(request.into())) } + + #[quickcheck] + #[cfg(feature = "serde")] + fn test_serde_connect_request(request: ConnectRequest) -> bool { + let serialized = serde_json::to_string(&request).unwrap(); + let deserialized = serde_json::from_str::(&serialized).unwrap(); + + deserialized == request + } + + #[quickcheck] + #[cfg(feature = "serde")] + fn test_serde_announce_request(request: AnnounceRequest) -> bool { + let serialized = serde_json::to_string(&request).unwrap(); + let deserialized = serde_json::from_str::(&serialized).unwrap(); + + deserialized == request + } + + #[quickcheck] + #[cfg(feature = "serde")] + fn test_serde_scrape_request(request: ScrapeRequest) -> TestResult { + if request.info_hashes.is_empty() { + return TestResult::discard(); + } + + let serialized = serde_json::to_string(&request).unwrap(); + let deserialized = serde_json::from_str::(&serialized).unwrap(); + + TestResult::from_bool(deserialized == request) + } } diff --git a/crates/udp_protocol/src/response.rs b/crates/udp_protocol/src/response.rs index 4e3353f8..4e61adcd 100644 --- a/crates/udp_protocol/src/response.rs +++ b/crates/udp_protocol/src/response.rs @@ -5,9 +5,13 @@ use std::mem::size_of; use byteorder::{NetworkEndian, WriteBytesExt}; use zerocopy::{AsBytes, FromBytes, FromZeroes}; +#[cfg(feature = "serde")] +use serde::{Deserialize, Serialize}; + use super::common::*; #[derive(PartialEq, Eq, Clone, Debug)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub enum Response { Connect(ConnectResponse), AnnounceIpv4(AnnounceResponse), @@ -18,18 +22,18 @@ pub enum Response { impl Response { #[inline] - pub fn write(&self, bytes: &mut impl Write) -> Result<(), io::Error> { + pub fn write_bytes(&self, bytes: &mut impl Write) -> Result<(), io::Error> { match self { - Response::Connect(r) => r.write(bytes), - Response::AnnounceIpv4(r) => r.write(bytes), - Response::AnnounceIpv6(r) => r.write(bytes), - Response::Scrape(r) => r.write(bytes), - Response::Error(r) => r.write(bytes), + Response::Connect(r) => r.write_bytes(bytes), + Response::AnnounceIpv4(r) => r.write_bytes(bytes), + Response::AnnounceIpv6(r) => r.write_bytes(bytes), + Response::Scrape(r) => r.write_bytes(bytes), + Response::Error(r) => r.write_bytes(bytes), } } #[inline] - pub fn from_bytes(mut bytes: &[u8], ipv4: bool) -> Result { + pub fn parse_bytes(mut bytes: &[u8], ipv4: bool) -> Result { let action = read_i32_ne(&mut bytes)?; match action.get() { @@ -130,6 +134,7 @@ impl From for Response { #[derive(PartialEq, Eq, Clone, Debug, AsBytes, FromBytes, FromZeroes)] #[repr(C, packed)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct ConnectResponse { pub transaction_id: TransactionId, pub connection_id: ConnectionId, @@ -137,7 +142,7 @@ pub struct ConnectResponse { impl ConnectResponse { #[inline] - pub fn write(&self, bytes: &mut impl Write) -> Result<(), io::Error> { + pub fn write_bytes(&self, bytes: &mut impl Write) -> Result<(), io::Error> { bytes.write_i32::(0)?; bytes.write_all(self.as_bytes())?; @@ -146,7 +151,9 @@ impl ConnectResponse { } #[derive(PartialEq, Eq, Clone, Debug)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct AnnounceResponse { + #[cfg_attr(feature = "serde", serde(flatten))] pub fixed: AnnounceResponseFixedData, pub peers: Vec>, } @@ -160,7 +167,7 @@ impl AnnounceResponse { } #[inline] - pub fn write(&self, bytes: &mut impl Write) -> Result<(), io::Error> { + pub fn write_bytes(&self, bytes: &mut impl Write) -> Result<(), io::Error> { bytes.write_i32::(1)?; bytes.write_all(self.fixed.as_bytes())?; bytes.write_all((*self.peers.as_slice()).as_bytes())?; @@ -171,6 +178,7 @@ impl AnnounceResponse { #[derive(PartialEq, Eq, Clone, Debug, AsBytes, FromBytes, FromZeroes)] #[repr(C, packed)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct AnnounceResponseFixedData { pub transaction_id: TransactionId, pub announce_interval: AnnounceInterval, @@ -179,6 +187,7 @@ pub struct AnnounceResponseFixedData { } #[derive(PartialEq, Eq, Clone, Debug)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct ScrapeResponse { pub transaction_id: TransactionId, pub torrent_stats: Vec, @@ -186,7 +195,7 @@ pub struct ScrapeResponse { impl ScrapeResponse { #[inline] - pub fn write(&self, bytes: &mut impl Write) -> Result<(), io::Error> { + pub fn write_bytes(&self, bytes: &mut impl Write) -> Result<(), io::Error> { bytes.write_i32::(2)?; bytes.write_all(self.transaction_id.as_bytes())?; bytes.write_all((*self.torrent_stats.as_slice()).as_bytes())?; @@ -197,6 +206,7 @@ impl ScrapeResponse { #[derive(PartialEq, Eq, Debug, Copy, Clone, AsBytes, FromBytes, FromZeroes)] #[repr(C, packed)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct TorrentScrapeStatistics { pub seeders: NumberOfPeers, pub completed: NumberOfDownloads, @@ -204,6 +214,7 @@ pub struct TorrentScrapeStatistics { } #[derive(PartialEq, Eq, Clone, Debug)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct ErrorResponse { pub transaction_id: TransactionId, pub message: Cow<'static, str>, @@ -211,7 +222,7 @@ pub struct ErrorResponse { impl ErrorResponse { #[inline] - pub fn write(&self, bytes: &mut impl Write) -> Result<(), io::Error> { + pub fn write_bytes(&self, bytes: &mut impl Write) -> Result<(), io::Error> { bytes.write_i32::(3)?; bytes.write_all(self.transaction_id.as_bytes())?; bytes.write_all(self.message.as_bytes())?; @@ -304,8 +315,8 @@ mod tests { fn same_after_conversion(response: Response, ipv4: bool) -> bool { let mut buf = Vec::new(); - response.clone().write(&mut buf).unwrap(); - let r2 = Response::from_bytes(&buf[..], ipv4).unwrap(); + response.clone().write_bytes(&mut buf).unwrap(); + let r2 = Response::parse_bytes(&buf[..], ipv4).unwrap(); let success = response == r2; @@ -339,4 +350,42 @@ mod tests { fn test_scrape_response_convert_identity(response: ScrapeResponse) -> bool { same_after_conversion(response.into(), true) } + + #[quickcheck] + #[cfg(feature = "serde")] + fn test_serde_connect_response(response: ConnectResponse) -> bool { + let serialized = serde_json::to_string(&response).unwrap(); + let deserialized = serde_json::from_str::(&serialized).unwrap(); + + deserialized == response + } + + #[quickcheck] + #[cfg(feature = "serde")] + fn test_serde_announce_response_ipv4(response: AnnounceResponse) -> bool { + let serialized = serde_json::to_string(&response).unwrap(); + let deserialized = + serde_json::from_str::>(&serialized).unwrap(); + + deserialized == response + } + + #[quickcheck] + #[cfg(feature = "serde")] + fn test_serde_announce_response_ipv6(response: AnnounceResponse) -> bool { + let serialized = serde_json::to_string(&response).unwrap(); + let deserialized = + serde_json::from_str::>(&serialized).unwrap(); + + deserialized == response + } + + #[quickcheck] + #[cfg(feature = "serde")] + fn test_serde_scrape_response(response: ScrapeResponse) -> bool { + let serialized = serde_json::to_string(&response).unwrap(); + let deserialized = serde_json::from_str::(&serialized).unwrap(); + + deserialized == response + } }