diff --git a/Cargo.lock b/Cargo.lock index 8dd6b2d0..3968b40a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -30,9 +30,9 @@ dependencies = [ [[package]] name = "ahash" -version = "0.8.6" +version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91429305e9f0a25f6205c5b8e0d2db09e0708a7a6df0f42212bb56c32c8ac97a" +checksum = "77c3a9648d43b9cd48db467b3f87fdd6e146bcc88ab0180006cef2179fe11d01" dependencies = [ "cfg-if", "getrandom", @@ -112,9 +112,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.76" +version = "1.0.79" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59d2a3357dde987206219e78ecfbbb6e8dad06cbb65292758d3270e6254f7355" +checksum = "080e9890a082662b09c1ad45f567faeeb47f22b5fb23895fbe1e651e718e25ca" [[package]] name = "aquatic" @@ -134,7 +134,7 @@ dependencies = [ "anyhow", "aquatic_udp", "aquatic_udp_load_test", - "clap 4.4.11", + "clap 4.4.13", "humanize-bytes", "indexmap 2.1.0", "indoc", @@ -152,7 +152,7 @@ dependencies = [ name = "aquatic_common" version = "0.8.0" dependencies = [ - "ahash 0.8.6", + "ahash 0.8.7", "anyhow", "aquatic_toml_config", "arc-swap", @@ -194,8 +194,8 @@ dependencies = [ "libc", "log", "memchr", - "metrics", - "metrics-exporter-prometheus", + "metrics 0.21.1", + "metrics-exporter-prometheus 0.12.2", "mimalloc", "once_cell", "privdrop", @@ -305,9 +305,9 @@ dependencies = [ "io-uring", "libc", "log", - "metrics", - "metrics-exporter-prometheus", - "metrics-util", + "metrics 0.21.1", + "metrics-exporter-prometheus 0.12.2", + "metrics-util 0.15.1", "mimalloc", "mio", "num-format", @@ -376,9 +376,9 @@ dependencies = [ "httparse", "indexmap 2.1.0", "log", - "metrics", - "metrics-exporter-prometheus", - "metrics-util", + "metrics 0.22.0", + "metrics-exporter-prometheus 0.13.0", + "metrics-util 0.16.0", "mimalloc", "privdrop", "quickcheck", @@ -453,9 +453,9 @@ checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" [[package]] name = "async-tungstenite" -version = "0.23.0" +version = "0.24.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1e9efbe14612da0a19fb983059a0b621e9cf6225d7018ecab4f9988215540dc" +checksum = "3609af4bbf701ddaf1f6bb4e6257dff4ff8932327d0e685d3f653724c258b1ac" dependencies = [ "futures-io", "futures-util", @@ -661,9 +661,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.4.11" +version = "4.4.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bfaff671f6b22ca62406885ece523383b9b64022e341e53e009a62ebc47a45f2" +checksum = "52bdc885e4cacc7f7c9eedc1ef6da641603180c783c41a15c264944deeaab642" dependencies = [ "clap_builder", "clap_derive", @@ -671,9 +671,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.4.11" +version = "4.4.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a216b506622bb1d316cd51328dce24e07bdff4a6128a47c7e7fad11878d5adbb" +checksum = "fb7fb5e4e979aec3be7791562fcba452f94ad85e954da024396433e0e25a79e9" dependencies = [ "anstream", "anstyle", @@ -690,7 +690,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.43", + "syn 2.0.48", ] [[package]] @@ -755,9 +755,9 @@ checksum = "f7144d30dcf0fafbce74250a3963025d8d52177934239851c917d29f1df280c2" [[package]] name = "cpufeatures" -version = "0.2.11" +version = "0.2.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce420fe07aecd3e67c5f910618fe65e94158f6dcc0adf44e00d69ce2bdfe0fd0" +checksum = "53fe5e26ff1b7aef8bca9c6080520cfb8d9333c7568e1829cef191a9723e5504" dependencies = [ "libc", ] @@ -806,7 +806,7 @@ dependencies = [ "anes", "cast", "ciborium", - "clap 4.4.11", + "clap 4.4.13", "criterion-plot", "is-terminal", "itertools 0.10.5", @@ -916,9 +916,9 @@ checksum = "7e962a19be5cfc3f3bf6dd8f61eb50107f356ad6270fbb3ed41476571db78be5" [[package]] name = "deranged" -version = "0.3.10" +version = "0.3.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8eb30d70a07a3b04884d2677f06bec33509dc67ca60d92949e5535352d3191dc" +checksum = "b42b6fa04a440b495c8b04d0e71b707c585f83cb9cb28cf8cd0d976c315e31b4" dependencies = [ "powerfmt", ] @@ -1153,17 +1153,18 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn 2.0.43", + "syn 2.0.48", ] [[package]] name = "futures-rustls" -version = "0.24.0" +version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "35bd3cf68c183738046838e300353e4716c674dc5e56890de4826801a6622a28" +checksum = "3afda89bce8f65072d24f8b99a2127e229462d8008182ca93f1d5d2e5df8f22f" dependencies = [ "futures-io", "rustls", + "rustls-pki-types", ] [[package]] @@ -1243,7 +1244,7 @@ dependencies = [ "log", "proc-macro2", "quote", - "syn 2.0.43", + "syn 2.0.48", "time", ] @@ -1310,7 +1311,7 @@ version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "33ff8ae62cd3a9102e5637afc8452c55acf3844001bd5374e0b0bd7b6616c038" dependencies = [ - "ahash 0.8.6", + "ahash 0.8.7", ] [[package]] @@ -1319,7 +1320,7 @@ version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" dependencies = [ - "ahash 0.8.6", + "ahash 0.8.7", "allocator-api2", "serde", ] @@ -1376,6 +1377,17 @@ dependencies = [ "itoa", ] +[[package]] +name = "http" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b32afd38673a8016f7c9ae69e5af41a58f81b1d31689040f2f1959594ce194ea" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + [[package]] name = "http-body" version = "0.4.6" @@ -1383,7 +1395,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" dependencies = [ "bytes", - "http", + "http 0.2.11", "pin-project-lite", ] @@ -1433,7 +1445,7 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "http", + "http 0.2.11", "http-body", "httparse", "httpdate", @@ -1518,13 +1530,13 @@ checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" [[package]] name = "is-terminal" -version = "0.4.9" +version = "0.4.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cb0889898416213fab133e1d33a0e5858a48177452750691bde3666d0fdbaf8b" +checksum = "0bad00257d07be169d870ab665980b06cdb366d792ad690bf2e76876dc503455" dependencies = [ "hermit-abi 0.3.3", "rustix", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] @@ -1704,9 +1716,9 @@ dependencies = [ [[package]] name = "memchr" -version = "2.6.4" +version = "2.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f665ee40bc4a3c5590afb1e9677db74a508659dfd71e126420da8274909a0167" +checksum = "523dc4f511e55ab87b694dc30d0f820d60906ef06413f93d4d7a1385599cc149" [[package]] name = "memoffset" @@ -1741,11 +1753,21 @@ version = "0.21.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fde3af1a009ed76a778cb84fdef9e7dbbdf5775ae3e4cc1f434a6a307f6f76c5" dependencies = [ - "ahash 0.8.6", + "ahash 0.8.7", "metrics-macros", "portable-atomic", ] +[[package]] +name = "metrics" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77b9e10a211c839210fd7f99954bda26e5f8e26ec686ad68da6a32df7c80e782" +dependencies = [ + "ahash 0.8.7", + "portable-atomic", +] + [[package]] name = "metrics-exporter-prometheus" version = "0.12.2" @@ -1756,9 +1778,26 @@ dependencies = [ "hyper", "indexmap 1.9.3", "ipnet", - "metrics", - "metrics-util", - "quanta", + "metrics 0.21.1", + "metrics-util 0.15.1", + "quanta 0.11.1", + "thiserror", + "tokio", +] + +[[package]] +name = "metrics-exporter-prometheus" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83a4c4718a371ddfb7806378f23617876eea8b82e5ff1324516bcd283249d9ea" +dependencies = [ + "base64", + "hyper", + "indexmap 1.9.3", + "ipnet", + "metrics 0.22.0", + "metrics-util 0.16.0", + "quanta 0.12.2", "thiserror", "tokio", ] @@ -1771,7 +1810,7 @@ checksum = "38b4faf00617defe497754acde3024865bc143d44a86799b24e191ecff91354f" dependencies = [ "proc-macro2", "quote", - "syn 2.0.43", + "syn 2.0.48", ] [[package]] @@ -1785,10 +1824,29 @@ dependencies = [ "crossbeam-utils", "hashbrown 0.13.1", "indexmap 1.9.3", - "metrics", + "metrics 0.21.1", + "num_cpus", + "ordered-float 3.9.2", + "quanta 0.11.1", + "radix_trie", + "sketches-ddsketch 0.2.1", +] + +[[package]] +name = "metrics-util" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2670b8badcc285d486261e2e9f1615b506baff91427b61bd336a472b65bbf5ed" +dependencies = [ + "aho-corasick", + "crossbeam-epoch", + "crossbeam-utils", + "hashbrown 0.13.1", + "indexmap 1.9.3", + "metrics 0.22.0", "num_cpus", - "ordered-float", - "quanta", + "ordered-float 4.2.0", + "quanta 0.12.2", "radix_trie", "sketches-ddsketch 0.2.1", ] @@ -1993,6 +2051,15 @@ dependencies = [ "num-traits", ] +[[package]] +name = "ordered-float" +version = "4.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a76df7075c7d4d01fdcb46c912dd17fba5b60c78ea480b475f2b6ab6f666584e" +dependencies = [ + "num-traits", +] + [[package]] name = "os_str_bytes" version = "6.6.1" @@ -2057,7 +2124,7 @@ checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" dependencies = [ "proc-macro2", "quote", - "syn 2.0.43", + "syn 2.0.48", ] [[package]] @@ -2170,9 +2237,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.71" +version = "1.0.76" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75cb1540fadbd5b8fbccc4dddad2734eba435053f725621c070711a14bb5f4b8" +checksum = "95fc56cda0b5c3325f5fbbd7ff9fda9e02bb00bb3dac51252d2f1bfa1cb8cc8c" dependencies = [ "unicode-ident", ] @@ -2187,7 +2254,22 @@ dependencies = [ "libc", "mach2", "once_cell", - "raw-cpuid", + "raw-cpuid 10.7.0", + "wasi", + "web-sys", + "winapi 0.3.9", +] + +[[package]] +name = "quanta" +version = "0.12.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ca0b7bac0b97248c40bb77288fc52029cf1459c0461ea1b05ee32ccf011de2c" +dependencies = [ + "crossbeam-utils", + "libc", + "once_cell", + "raw-cpuid 11.0.1", "wasi", "web-sys", "winapi 0.3.9", @@ -2217,9 +2299,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.33" +version = "1.0.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5267fca4496028628a95160fc423a33e8b2e6af8a5302579e322e4b520293cae" +checksum = "291ec9ab5efd934aaf503a6466c5d5251535d108ee747472c3977cc5acc868ef" dependencies = [ "proc-macro2", ] @@ -2283,6 +2365,15 @@ dependencies = [ "bitflags 1.3.2", ] +[[package]] +name = "raw-cpuid" +version = "11.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d86a7c4638d42c44551f4791a20e687dbb4c3de1f33c43dd71e355cd429def1" +dependencies = [ + "bitflags 2.4.1", +] + [[package]] name = "rayon" version = "1.8.0" @@ -2312,6 +2403,26 @@ dependencies = [ "bitflags 1.3.2", ] +[[package]] +name = "ref-cast" +version = "1.0.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4846d4c50d1721b1a3bef8af76924eef20d5e723647333798c1b519b3a9473f" +dependencies = [ + "ref-cast-impl", +] + +[[package]] +name = "ref-cast-impl" +version = "1.0.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fddb4f8d99b0a2ebafc65a87a69a7b9875e4b1ae1f00db265d300ef7f28bccc" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.48", +] + [[package]] name = "regex" version = "1.10.2" @@ -2385,32 +2496,42 @@ dependencies = [ [[package]] name = "rustls" -version = "0.21.10" +version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9d5a6813c0759e4609cd494e8e725babae6a2ca7b62a5536a13daaec6fcb7ba" +checksum = "fe6b63262c9fcac8659abfaa96cac103d28166d3ff3eaf8f412e19f3ae9e5a48" dependencies = [ "log", "ring", + "rustls-pki-types", "rustls-webpki", - "sct", + "subtle", + "zeroize", ] [[package]] name = "rustls-pemfile" -version = "1.0.4" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1c74cae0a4cf6ccbbf5f359f08efdf8ee7e1dc532573bf0db71968cb56b1448c" +checksum = "35e4980fa29e4c4b212ffb3db068a564cbf560e51d3944b7c88bd8bf5bec64f4" dependencies = [ "base64", + "rustls-pki-types", ] +[[package]] +name = "rustls-pki-types" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e9d979b3ce68192e42760c7810125eb6cf2ea10efae545a156063e61f314e2a" + [[package]] name = "rustls-webpki" -version = "0.101.7" +version = "0.102.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765" +checksum = "ef4ca26037c909dedb327b48c3327d0ba91d3dd3c4e05dad328f210ffb68e95b" dependencies = [ "ring", + "rustls-pki-types", "untrusted", ] @@ -2447,21 +2568,11 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" -[[package]] -name = "sct" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414" -dependencies = [ - "ring", - "untrusted", -] - [[package]] name = "serde" -version = "1.0.193" +version = "1.0.195" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25dd9975e68d0cb5aa1120c288333fc98731bd1dd12f561e468ea4728c042b89" +checksum = "63261df402c67811e9ac6def069e4786148c4563f4b50fd4bf30aa370d626b02" dependencies = [ "serde_derive", ] @@ -2478,29 +2589,29 @@ dependencies = [ [[package]] name = "serde_bytes" -version = "0.11.12" +version = "0.11.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab33ec92f677585af6d88c65593ae2375adde54efdbf16d597f2cbc7a6d368ff" +checksum = "8b8497c313fd43ab992087548117643f6fcd935cbf36f176ffda0aacf9591734" dependencies = [ "serde", ] [[package]] name = "serde_derive" -version = "1.0.193" +version = "1.0.195" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43576ca501357b9b071ac53cdc7da8ef0cbd9493d8df094cd821777ea6e894d3" +checksum = "46fe8f8603d81ba86327b23a2e9cdf49e1255fb94a4c5f297f6ee0547178ea2c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.43", + "syn 2.0.48", ] [[package]] name = "serde_json" -version = "1.0.108" +version = "1.0.111" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d1c7e3eac408d115102c4c24ad393e0821bb3a5df4d506a80f85f7a742a526b" +checksum = "176e46fa42316f18edd598015a5166857fc835ec732f5215eac6b7bdbf0a84f4" dependencies = [ "itoa", "ryu", @@ -2548,13 +2659,14 @@ dependencies = [ [[package]] name = "simd-json" -version = "0.12.0" +version = "0.13.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0f07a84c7456b901b8dd2c1d44caca8b0fd2c2616206ee5acc9d9da61e8d9ec" +checksum = "0c58001aca67fc467da571f35e7e1dc9c094e91b099cc54bd3cead2962db2432" dependencies = [ "getrandom", "halfbrown", "lexical-core", + "ref-cast", "serde", "serde_json", "simdutf8", @@ -2569,9 +2681,9 @@ checksum = "f27f6278552951f1f2b8cf9da965d10969b2efdea95a6ec47987ab46edfe263a" [[package]] name = "simple_logger" -version = "4.3.0" +version = "4.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da0ca6504625ee1aa5fda33913d2005eab98c7a42dd85f116ecce3ff54c9d3ef" +checksum = "8e7e46c8c90251d47d08b28b8a419ffb4aede0f87c2eea95e17d1d5bacbf3ef1" dependencies = [ "colored", "log", @@ -2689,6 +2801,12 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" +[[package]] +name = "subtle" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc" + [[package]] name = "syn" version = "1.0.109" @@ -2702,9 +2820,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.43" +version = "2.0.48" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee659fb5f3d355364e1f3e5bc10fb82068efbf824a1e9d1c9504244a6469ad53" +checksum = "0f3531638e407dfc0814761abb7c00a5b54992b849452a0646b7f65c9f770f3f" dependencies = [ "proc-macro2", "quote", @@ -2713,15 +2831,15 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.8.1" +version = "3.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ef1adac450ad7f4b3c28589471ade84f25f731a7a0fe30d71dfa9f60fd808e5" +checksum = "01ce4141aa927a6d1bd34a041795abd0db1cccba5d5f24b009f694bdf3a1f3fa" dependencies = [ "cfg-if", "fastrand 2.0.1", "redox_syscall", "rustix", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] @@ -2742,22 +2860,22 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.52" +version = "1.0.56" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83a48fd946b02c0a526b2e9481c8e2a17755e47039164a86c4070446e3a4614d" +checksum = "d54378c645627613241d077a3a79db965db602882668f9136ac42af9ecb730ad" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.52" +version = "1.0.56" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e7fbe9b594d6568a6a1443250a7e67d80b74e1e96f6d1715e1e21cc1888291d3" +checksum = "fa0faa943b50f3db30a20aa7e265dbc66076993efed8463e8de414e5d06d3471" dependencies = [ "proc-macro2", "quote", - "syn 2.0.43", + "syn 2.0.48", ] [[package]] @@ -2898,7 +3016,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.43", + "syn 2.0.48", ] [[package]] @@ -2918,14 +3036,14 @@ checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" [[package]] name = "tungstenite" -version = "0.20.1" +version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e3dac10fd62eaf6617d3a904ae222845979aec67c615d1c842b4002c7666fb9" +checksum = "9ef1a641ea34f399a848dea702823bbecfb4c486f911735368f1f137cb8257e1" dependencies = [ "byteorder", "bytes", "data-encoding", - "http", + "http 1.0.0", "httparse", "log", "rand", @@ -2999,9 +3117,9 @@ checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" [[package]] name = "value-trait" -version = "0.6.1" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09a5b6c8ceb01263b969cac48d4a6705134d490ded13d889e52c0cfc80c6945e" +checksum = "ea87257cfcbedcb9444eda79c59fdfea71217e6305afee8ee33f500375c2ac97" dependencies = [ "float-cmp", "halfbrown", @@ -3067,7 +3185,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.43", + "syn 2.0.48", "wasm-bindgen-shared", ] @@ -3089,7 +3207,7 @@ checksum = "f0eb82fcb7930ae6219a7ecfd55b217f5f0893484b7a13022ebb2b2bf20b5283" dependencies = [ "proc-macro2", "quote", - "syn 2.0.43", + "syn 2.0.48", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -3287,9 +3405,9 @@ checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" [[package]] name = "winnow" -version = "0.5.31" +version = "0.5.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97a4882e6b134d6c28953a387571f1acdd3496830d5e36c5e3a1075580ea641c" +checksum = "b7520bbdec7211caa7c4e682eb1fbe07abe20cee6756b6e00f537c82c11816aa" dependencies = [ "memchr", ] @@ -3318,5 +3436,11 @@ checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.43", + "syn 2.0.48", ] + +[[package]] +name = "zeroize" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "525b4ec142c6b68a2d10f01f7bbf6755599ca3f81ea53b8431b7dd348f5fdb2d" diff --git a/TODO.md b/TODO.md index 8f0b3ab2..79b4e4ef 100644 --- a/TODO.md +++ b/TODO.md @@ -2,6 +2,9 @@ ## High priority +* if peer_clients is on, add task to generate prometheus exports on regular + interval to clean up data + * aquatic_bench * Opentracker "slow to get up to speed", is it due to getting faster once inserts are rarely needed since most ip-port combinations have been sent? diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml index 10ec9cad..2dafac8f 100644 --- a/crates/common/Cargo.toml +++ b/crates/common/Cargo.toml @@ -37,5 +37,5 @@ toml = "0.5" # Optional glommio = { version = "0.8", optional = true } hwloc = { version = "0.5", optional = true } -rustls = { version = "0.21", optional = true } -rustls-pemfile = { version = "1", optional = true } +rustls = { version = "0.22", optional = true } +rustls-pemfile = { version = "2", optional = true } diff --git a/crates/common/src/rustls_config.rs b/crates/common/src/rustls_config.rs index e6df59d6..56b83360 100644 --- a/crates/common/src/rustls_config.rs +++ b/crates/common/src/rustls_config.rs @@ -17,10 +17,20 @@ pub fn create_rustls_config( })?; let mut f = BufReader::new(f); - rustls_pemfile::certs(&mut f)? - .into_iter() - .map(|bytes| rustls::Certificate(bytes)) - .collect() + let mut certs = Vec::new(); + + for cert in rustls_pemfile::certs(&mut f) { + match cert { + Ok(cert) => { + certs.push(cert); + } + Err(err) => { + ::log::error!("error parsing certificate: {:#?}", err) + } + } + } + + certs }; let private_key = { @@ -32,16 +42,16 @@ pub fn create_rustls_config( })?; let mut f = BufReader::new(f); - rustls_pemfile::pkcs8_private_keys(&mut f)? - .first() - .map(|bytes| rustls::PrivateKey(bytes.clone())) - .ok_or(anyhow::anyhow!("No private keys in file"))? + let key = rustls_pemfile::pkcs8_private_keys(&mut f) + .next() + .ok_or(anyhow::anyhow!("No private keys in file"))??; + + key }; let tls_config = rustls::ServerConfig::builder() - .with_safe_defaults() .with_no_client_auth() - .with_single_cert(certs, private_key) + .with_single_cert(certs, rustls::pki_types::PrivateKeyDer::Pkcs8(private_key)) .with_context(|| "create rustls config")?; Ok(tls_config) diff --git a/crates/http/Cargo.toml b/crates/http/Cargo.toml index 2660d466..5b98dcbc 100644 --- a/crates/http/Cargo.toml +++ b/crates/http/Cargo.toml @@ -33,7 +33,7 @@ cfg-if = "1" either = "1" futures = "0.3" futures-lite = "1" -futures-rustls = "0.24" +futures-rustls = "0.25" glommio = "0.8" httparse = "1" itoa = "1" @@ -46,7 +46,7 @@ memchr = "2" privdrop = "0.5" once_cell = "1" rand = { version = "0.8", features = ["small_rng"] } -rustls-pemfile = "1" +rustls-pemfile = "2" serde = { version = "1", features = ["derive"] } signal-hook = { version = "0.3" } slotmap = "1" diff --git a/crates/http_load_test/Cargo.toml b/crates/http_load_test/Cargo.toml index b4f44110..52c942f2 100644 --- a/crates/http_load_test/Cargo.toml +++ b/crates/http_load_test/Cargo.toml @@ -21,14 +21,14 @@ aquatic_toml_config.workspace = true anyhow = "1" futures = "0.3" futures-lite = "1" -futures-rustls = "0.24" +futures-rustls = "0.25" hashbrown = "0.14" glommio = "0.8" log = "0.4" mimalloc = { version = "0.1", default-features = false } rand = { version = "0.8", features = ["small_rng"] } rand_distr = "0.4" -rustls = { version = "0.21", default-features = false, features = ["logging", "dangerous_configuration"] } # TLS 1.2 disabled +rustls = { version = "0.22", default-features = false, features = ["logging"] } # TLS 1.2 disabled serde = { version = "1", features = ["derive"] } [dev-dependencies] diff --git a/crates/http_load_test/src/common.rs b/crates/http_load_test/src/common.rs index f187ad55..6d44b124 100644 --- a/crates/http_load_test/src/common.rs +++ b/crates/http_load_test/src/common.rs @@ -4,7 +4,6 @@ use rand_distr::Gamma; pub use aquatic_http_protocol::common::*; pub use aquatic_http_protocol::request::*; -pub use aquatic_http_protocol::response::*; #[derive(PartialEq, Eq, Clone)] pub struct TorrentPeer { diff --git a/crates/http_load_test/src/main.rs b/crates/http_load_test/src/main.rs index 36301249..21b881e1 100644 --- a/crates/http_load_test/src/main.rs +++ b/crates/http_load_test/src/main.rs @@ -180,25 +180,53 @@ fn monitor_statistics(state: LoadTestState, config: &Config) { } } +#[derive(Debug)] struct FakeCertificateVerifier; -impl rustls::client::ServerCertVerifier for FakeCertificateVerifier { +impl rustls::client::danger::ServerCertVerifier for FakeCertificateVerifier { fn verify_server_cert( &self, - _end_entity: &rustls::Certificate, - _intermediates: &[rustls::Certificate], - _server_name: &rustls::ServerName, - _scts: &mut dyn Iterator, + _end_entity: &rustls::pki_types::CertificateDer<'_>, + _intermediates: &[rustls::pki_types::CertificateDer<'_>], + _server_name: &rustls::pki_types::ServerName<'_>, _ocsp_response: &[u8], - _now: std::time::SystemTime, - ) -> Result { - Ok(rustls::client::ServerCertVerified::assertion()) + _now: rustls::pki_types::UnixTime, + ) -> Result { + Ok(rustls::client::danger::ServerCertVerified::assertion()) + } + + fn verify_tls12_signature( + &self, + _message: &[u8], + _cert: &rustls::pki_types::CertificateDer<'_>, + _dss: &rustls::DigitallySignedStruct, + ) -> Result { + Ok(rustls::client::danger::HandshakeSignatureValid::assertion()) + } + + fn verify_tls13_signature( + &self, + _message: &[u8], + _cert: &rustls::pki_types::CertificateDer<'_>, + _dss: &rustls::DigitallySignedStruct, + ) -> Result { + Ok(rustls::client::danger::HandshakeSignatureValid::assertion()) + } + + fn supported_verify_schemes(&self) -> Vec { + vec![ + rustls::SignatureScheme::ECDSA_NISTP384_SHA384, + rustls::SignatureScheme::ECDSA_NISTP256_SHA256, + rustls::SignatureScheme::RSA_PSS_SHA512, + rustls::SignatureScheme::RSA_PSS_SHA384, + rustls::SignatureScheme::RSA_PSS_SHA256, + rustls::SignatureScheme::ED25519, + ] } } fn create_tls_config() -> anyhow::Result> { let mut config = rustls::ClientConfig::builder() - .with_safe_defaults() .with_root_certificates(rustls::RootCertStore::empty()) .with_no_client_auth(); diff --git a/crates/ws/Cargo.toml b/crates/ws/Cargo.toml index 2d7d8c8c..662bc618 100644 --- a/crates/ws/Cargo.toml +++ b/crates/ws/Cargo.toml @@ -18,9 +18,12 @@ name = "aquatic_ws" name = "aquatic_ws" [features] -default = ["prometheus"] +default = ["prometheus", "mimalloc"] prometheus = ["metrics", "metrics-exporter-prometheus"] metrics = ["dep:metrics", "metrics-util"] +# Use mimalloc allocator for much better performance. Requires cmake and a +# C/C++ compiler +mimalloc = ["dep:mimalloc"] [dependencies] aquatic_common = { workspace = true, features = ["rustls", "glommio"] } @@ -29,31 +32,31 @@ aquatic_toml_config.workspace = true aquatic_ws_protocol.workspace = true anyhow = "1" -async-tungstenite = "0.23" +async-tungstenite = "0.24" arc-swap = "1" cfg-if = "1" futures = "0.3" futures-lite = "1" -futures-rustls = "0.24" +futures-rustls = "0.25" glommio = "0.8" hashbrown = { version = "0.14", features = ["serde"] } httparse = "1" indexmap = "2" log = "0.4" -metrics = { version = "0.21", optional = true } -metrics-util = { version = "0.15", optional = true } -metrics-exporter-prometheus = { version = "0.12", optional = true, default-features = false, features = ["http-listener"] } -mimalloc = { version = "0.1", default-features = false } +metrics = { version = "0.22", optional = true } +metrics-util = { version = "0.16", optional = true } +metrics-exporter-prometheus = { version = "0.13", optional = true, default-features = false, features = ["http-listener"] } +mimalloc = { version = "0.1", default-features = false, optional = true } privdrop = "0.5" rand = { version = "0.8", features = ["small_rng"] } -rustls = "0.21" -rustls-pemfile = "1" +rustls = "0.22" +rustls-pemfile = "2" serde = { version = "1", features = ["derive"] } signal-hook = { version = "0.3" } slab = "0.4" slotmap = "1" socket2 = { version = "0.5", features = ["all"] } -tungstenite = "0.20" +tungstenite = "0.21" [dev-dependencies] quickcheck = "1" diff --git a/crates/ws/src/main.rs b/crates/ws/src/main.rs index cb8b58f1..0a39f959 100644 --- a/crates/ws/src/main.rs +++ b/crates/ws/src/main.rs @@ -1,6 +1,7 @@ use aquatic_common::cli::run_app_with_cli_and_config; use aquatic_ws::config::Config; +#[cfg(feature = "mimalloc")] #[global_allocator] static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; diff --git a/crates/ws/src/workers/socket/connection.rs b/crates/ws/src/workers/socket/connection.rs index b4fbc547..fca6537e 100644 --- a/crates/ws/src/workers/socket/connection.rs +++ b/crates/ws/src/workers/socket/connection.rs @@ -9,7 +9,6 @@ use anyhow::Context; use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache}; use aquatic_common::rustls_config::RustlsConfig; use aquatic_common::ServerStartInstant; -use aquatic_peer_id::PeerClient; use aquatic_ws_protocol::*; use arc_swap::ArcSwap; use async_tungstenite::WebSocketStream; @@ -18,14 +17,17 @@ use futures::{AsyncWriteExt, StreamExt}; use futures_lite::future::race; use futures_rustls::TlsAcceptor; use glommio::channels::channel_mesh::Senders; -use glommio::channels::local_channel::{LocalReceiver, LocalSender}; +use glommio::channels::local_channel::{new_bounded, LocalReceiver, LocalSender}; use glommio::net::TcpStream; -use glommio::timer::{sleep, timeout}; +use glommio::timer::timeout; use glommio::{enclose, prelude::*}; use hashbrown::hash_map::Entry; use hashbrown::HashMap; use slab::Slab; +#[cfg(feature = "metrics")] +use metrics::{Counter, Gauge}; + use crate::common::*; use crate::config::Config; use crate::workers::socket::calculate_in_message_consumer_index; @@ -33,16 +35,20 @@ use crate::workers::socket::calculate_in_message_consumer_index; #[cfg(feature = "metrics")] use crate::workers::socket::{ip_version_to_metrics_str, WORKER_INDEX}; +/// Length of ConnectionReader backpressure channel +/// +/// ConnectionReader awaits a message in a channel before proceeding with +/// reading a request. For each response sent, a message is sent to the +/// channel, up to a maximum of this constant. +const READ_PASS_CHANNEL_LEN: usize = 4; + pub struct ConnectionRunner { pub config: Rc, pub access_list: Arc, pub in_message_senders: Rc>, - pub tq_prioritized: TaskQueueHandle, - pub tq_regular: TaskQueueHandle, pub connection_valid_until: Rc>, pub out_message_sender: Rc>, pub out_message_receiver: LocalReceiver<(OutMessageMeta, OutMessage)>, - pub close_conn_receiver: LocalReceiver<()>, pub server_start_instant: ServerStartInstant, pub out_message_consumer_id: ConsumerId, pub connection_id: ConnectionId, @@ -54,25 +60,45 @@ impl ConnectionRunner { pub async fn run( self, control_message_senders: Rc>, + close_conn_receiver: LocalReceiver<()>, stream: TcpStream, ) { let clean_up_data = ConnectionCleanupData { announced_info_hashes: Default::default(), ip_version: self.ip_version, opt_peer_client: Default::default(), + #[cfg(feature = "metrics")] + active_connections_gauge: ::metrics::gauge!( + "aquatic_active_connections", + "ip_version" => ip_version_to_metrics_str(self.ip_version), + "worker_index" => WORKER_INDEX.get().to_string(), + ), }; clean_up_data.before_open(); let config = self.config.clone(); + let connection_id = self.connection_id.clone(); - if let Err(err) = self.run_inner(clean_up_data.clone(), stream).await { - ::log::debug!("connection error: {:#}", err); - } + race( + async { + if let Err(err) = self.run_inner(clean_up_data.clone(), stream).await { + ::log::debug!("connection {:?} closed: {:#}", connection_id, err); + } + }, + async { + close_conn_receiver.recv().await; + }, + ) + .await; + + ::log::debug!("connection {:?} starting clean up", connection_id); clean_up_data .after_close(&config, control_message_senders) .await; + + ::log::debug!("connection {:?} finished clean up", connection_id); } async fn run_inner( @@ -81,7 +107,8 @@ impl ConnectionRunner { mut stream: TcpStream, ) -> anyhow::Result<()> { if let Some(tls_config) = self.opt_tls_config.as_ref() { - let tls_acceptor: TlsAcceptor = tls_config.load_full().into(); + let tls_config = tls_config.load_full(); + let tls_acceptor = TlsAcceptor::from(tls_config); let stream = tls_acceptor.accept(stream).await?; @@ -136,65 +163,73 @@ impl ConnectionRunner { ..Default::default() }; let stream = async_tungstenite::accept_async_with_config(stream, Some(ws_config)).await?; - let (ws_out, ws_in) = futures::StreamExt::split(stream); let pending_scrape_slab = Rc::new(RefCell::new(Slab::new())); let access_list_cache = create_access_list_cache(&self.access_list); + let (read_pass_sender, read_pass_receiver) = new_bounded(READ_PASS_CHANNEL_LEN); + + for _ in 0..READ_PASS_CHANNEL_LEN { + if let Err(err) = read_pass_sender.try_send(()) { + panic!( + "couldn't add initial entries to read pass channel: {:#}", + err + ) + }; + } + let config = self.config.clone(); - let reader_handle = spawn_local_into( - enclose!((pending_scrape_slab, clean_up_data) async move { - let mut reader = ConnectionReader { - config: self.config.clone(), - access_list_cache, - in_message_senders: self.in_message_senders, - out_message_sender: self.out_message_sender, - pending_scrape_slab, - out_message_consumer_id: self.out_message_consumer_id, - ws_in, - ip_version: self.ip_version, - connection_id: self.connection_id, - clean_up_data: clean_up_data.clone(), - }; - - reader.run_in_message_loop().await - }), - self.tq_regular, - ) - .unwrap(); - - let writer_handle = spawn_local_into( - async move { - let mut writer = ConnectionWriter { - config, - out_message_receiver: self.out_message_receiver, - connection_valid_until: self.connection_valid_until, - ws_out, - pending_scrape_slab, - server_start_instant: self.server_start_instant, - ip_version: self.ip_version, - clean_up_data, - }; - - writer.run_out_message_loop().await - }, - self.tq_prioritized, - ) - .unwrap(); + let reader_future = enclose!((pending_scrape_slab, clean_up_data) async move { + let mut reader = ConnectionReader { + config: self.config.clone(), + access_list_cache, + in_message_senders: self.in_message_senders, + out_message_sender: self.out_message_sender, + read_pass_receiver, + pending_scrape_slab, + out_message_consumer_id: self.out_message_consumer_id, + ws_in, + ip_version: self.ip_version, + connection_id: self.connection_id, + clean_up_data: clean_up_data.clone(), + #[cfg(feature = "metrics")] + total_announce_requests_counter: ::metrics::counter!( + "aquatic_requests_total", + "type" => "announce", + "ip_version" => ip_version_to_metrics_str(self.ip_version), + "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), + ), + #[cfg(feature = "metrics")] + total_scrape_requests_counter: ::metrics::counter!( + "aquatic_requests_total", + "type" => "scrape", + "ip_version" => ip_version_to_metrics_str(self.ip_version), + "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), + ) + }; - let close_conn_future = spawn_local_into( - async move { - self.close_conn_receiver.recv().await; + reader.run_in_message_loop().await + }); - Ok(()) - }, - self.tq_prioritized, - ) - .unwrap(); + let writer_future = async move { + let mut writer = ConnectionWriter { + config, + out_message_receiver: self.out_message_receiver, + read_pass_sender, + connection_valid_until: self.connection_valid_until, + ws_out, + pending_scrape_slab, + server_start_instant: self.server_start_instant, + ip_version: self.ip_version, + clean_up_data, + }; - race(close_conn_future, race(reader_handle, writer_handle)).await + writer.run_out_message_loop().await + }; + + race(reader_future, writer_future).await } } @@ -203,22 +238,26 @@ struct ConnectionReader { access_list_cache: AccessListCache, in_message_senders: Rc>, out_message_sender: Rc>, + read_pass_receiver: LocalReceiver<()>, pending_scrape_slab: Rc>>, out_message_consumer_id: ConsumerId, ws_in: SplitStream>, ip_version: IpVersion, connection_id: ConnectionId, clean_up_data: ConnectionCleanupData, + #[cfg(feature = "metrics")] + total_announce_requests_counter: Counter, + #[cfg(feature = "metrics")] + total_scrape_requests_counter: Counter, } impl ConnectionReader { async fn run_in_message_loop(&mut self) -> anyhow::Result<()> { loop { - while self.out_message_sender.is_full() { - sleep(Duration::from_millis(100)).await; - - yield_if_needed().await; - } + self.read_pass_receiver + .recv() + .await + .ok_or_else(|| anyhow::anyhow!("read pass channel closed"))?; let message = self .ws_in @@ -266,12 +305,7 @@ impl ConnectionReader { async fn handle_announce_request(&mut self, request: AnnounceRequest) -> anyhow::Result<()> { #[cfg(feature = "metrics")] - ::metrics::increment_counter!( - "aquatic_requests_total", - "type" => "announce", - "ip_version" => ip_version_to_metrics_str(self.ip_version), - "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), - ); + self.total_announce_requests_counter.increment(1); let info_hash = request.info_hash; @@ -311,24 +345,28 @@ impl ConnectionReader { && self.clean_up_data.opt_peer_client.borrow().is_none() { let peer_id = aquatic_peer_id::PeerId(request.peer_id.0); - let client = peer_id.client(); - let prefix = peer_id.first_8_bytes_hex().to_string(); - ::metrics::increment_gauge!( + let peer_client_gauge = ::metrics::gauge!( "aquatic_peer_clients", - 1.0, - "client" => client.to_string(), + "client" => peer_id.client().to_string(), ); - if self.config.metrics.peer_id_prefixes { - ::metrics::increment_gauge!( - "aquatic_peer_id_prefixes", - 1.0, - "prefix_hex" => prefix.to_string(), - ); - } + peer_client_gauge.increment(1.0); + + let opt_peer_id_prefix_gauge = + self.config.metrics.peer_id_prefixes.then(|| { + let g = ::metrics::gauge!( + "aquatic_peer_id_prefixes", + "prefix_hex" => peer_id.first_8_bytes_hex().to_string(), + ); + + g.increment(1.0); - *self.clean_up_data.opt_peer_client.borrow_mut() = Some((client, prefix)); + g + }); + + *self.clean_up_data.opt_peer_client.borrow_mut() = + Some((peer_client_gauge, opt_peer_id_prefix_gauge)); }; } } @@ -366,12 +404,7 @@ impl ConnectionReader { async fn handle_scrape_request(&mut self, request: ScrapeRequest) -> anyhow::Result<()> { #[cfg(feature = "metrics")] - ::metrics::increment_counter!( - "aquatic_requests_total", - "type" => "scrape", - "ip_version" => ip_version_to_metrics_str(self.ip_version), - "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), - ); + self.total_scrape_requests_counter.increment(1); let info_hashes = if let Some(info_hashes) = request.info_hashes { info_hashes @@ -463,6 +496,7 @@ impl ConnectionReader { struct ConnectionWriter { config: Rc, out_message_receiver: LocalReceiver<(OutMessageMeta, OutMessage)>, + read_pass_sender: LocalSender<()>, connection_valid_until: Rc>, ws_out: SplitSink, tungstenite::Message>, pending_scrape_slab: Rc>>, @@ -514,6 +548,12 @@ impl ConnectionWriter { self.send_out_message(&out_message).await?; } }; + + if let Err(GlommioError::Closed(_)) = self.read_pass_sender.try_send(()) { + return Err(anyhow::anyhow!("read pass channel closed")); + } + + yield_if_needed().await; } } @@ -544,32 +584,24 @@ impl ConnectionWriter { OutMessage::ErrorResponse(_) => "error", }; - ::metrics::increment_counter!( + ::metrics::counter!( "aquatic_responses_total", "type" => out_message_type, "ip_version" => ip_version_to_metrics_str(self.ip_version), "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), - ); + ) + .increment(1); - if let Some((peer_client, prefix)) = + // As long as connection is still alive, increment peer client + // gauges by zero to prevent them from being removed due to + // idleness + if let Some((peer_client_gauge, opt_peer_id_prefix_gauge)) = self.clean_up_data.opt_peer_client.borrow().as_ref() { - // As long as connection is still alive, increment peer client - // gauges by zero to prevent them from being removed due to - // idleness - - ::metrics::increment_gauge!( - "aquatic_peer_clients", - 0.0, - "client" => peer_client.to_string(), - ); - - if self.config.metrics.peer_id_prefixes { - ::metrics::increment_gauge!( - "aquatic_peer_id_prefixes", - 0.0, - "prefix_hex" => prefix.to_string(), - ); + peer_client_gauge.increment(0.0); + + if let Some(g) = opt_peer_id_prefix_gauge { + g.increment(0.0); } } } @@ -583,18 +615,15 @@ impl ConnectionWriter { struct ConnectionCleanupData { announced_info_hashes: Rc>>, ip_version: IpVersion, - opt_peer_client: Rc>>, + opt_peer_client: Rc)>>>, + #[cfg(feature = "metrics")] + active_connections_gauge: Gauge, } impl ConnectionCleanupData { fn before_open(&self) { #[cfg(feature = "metrics")] - ::metrics::increment_gauge!( - "aquatic_active_connections", - 1.0, - "ip_version" => ip_version_to_metrics_str(self.ip_version), - "worker_index" => WORKER_INDEX.get().to_string(), - ); + self.active_connections_gauge.increment(1.0); } async fn after_close( &self, @@ -621,28 +650,14 @@ impl ConnectionCleanupData { } #[cfg(feature = "metrics")] - { - ::metrics::decrement_gauge!( - "aquatic_active_connections", - 1.0, - "ip_version" => ip_version_to_metrics_str(self.ip_version), - "worker_index" => WORKER_INDEX.get().to_string(), - ); + self.active_connections_gauge.decrement(1.0); - if let Some((peer_client, prefix)) = self.opt_peer_client.borrow().as_ref() { - ::metrics::decrement_gauge!( - "aquatic_peer_clients", - 1.0, - "client" => peer_client.to_string(), - ); - - if config.metrics.peer_id_prefixes { - ::metrics::decrement_gauge!( - "aquatic_peer_id_prefixes", - 1.0, - "prefix_hex" => prefix.to_string(), - ); - } + #[cfg(feature = "metrics")] + if let Some((peer_client_gauge, opt_peer_id_prefix_gauge)) = self.opt_peer_client.take() { + peer_client_gauge.decrement(1.0); + + if let Some(g) = opt_peer_id_prefix_gauge { + g.decrement(1.0); } } } diff --git a/crates/ws/src/workers/socket/mod.rs b/crates/ws/src/workers/socket/mod.rs index 7bc6c721..eb284ad6 100644 --- a/crates/ws/src/workers/socket/mod.rs +++ b/crates/ws/src/workers/socket/mod.rs @@ -173,12 +173,9 @@ pub async fn run_socket_worker( config, access_list, in_message_senders, - tq_prioritized, - tq_regular, connection_valid_until, out_message_sender, out_message_receiver, - close_conn_receiver, server_start_instant, out_message_consumer_id, connection_id, @@ -186,7 +183,7 @@ pub async fn run_socket_worker( ip_version }; - runner.run(control_message_senders, stream).await; + runner.run(control_message_senders, close_conn_receiver, stream).await; connection_handles.borrow_mut().remove(connection_id); }), @@ -254,20 +251,20 @@ async fn clean_connections( let worker_index = WORKER_INDEX.with(|index| index.get()).to_string(); if config.network.address.is_ipv4() || !config.network.only_ipv6 { - ::metrics::increment_gauge!( + ::metrics::gauge!( "aquatic_active_connections", - 0.0, "ip_version" => "4", "worker_index" => worker_index.clone(), - ); + ) + .increment(0.0); } if config.network.address.is_ipv6() { - ::metrics::increment_gauge!( + ::metrics::gauge!( "aquatic_active_connections", - 0.0, "ip_version" => "6", "worker_index" => worker_index, - ); + ) + .increment(0.0); } } @@ -287,10 +284,14 @@ async fn receive_out_messages( match reference.out_message_sender.try_send((meta, out_message)) { Ok(()) => {} Err(GlommioError::Closed(_)) => {} - Err(GlommioError::WouldBlock(_)) => {} + Err(GlommioError::WouldBlock(_)) => { + ::log::debug!( + "couldn't send OutMessage over local channel to Connection, channel full" + ); + } Err(err) => { ::log::debug!( - "Couldn't send out_message from shared channel to local receiver: {:?}", + "couldn't send OutMessage over local channel to Connection: {:?}", err ); } diff --git a/crates/ws/src/workers/swarm/storage.rs b/crates/ws/src/workers/swarm/storage.rs index 1ec9d085..7ad311db 100644 --- a/crates/ws/src/workers/swarm/storage.rs +++ b/crates/ws/src/workers/swarm/storage.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache}; use hashbrown::HashMap; +use metrics::Gauge; use rand::rngs::SmallRng; use aquatic_common::{ @@ -16,10 +17,42 @@ use crate::workers::swarm::WORKER_INDEX; type TorrentMap = IndexMap; type PeerMap = IndexMap; -#[derive(Default)] pub struct TorrentMaps { ipv4: TorrentMap, ipv6: TorrentMap, + peers_gauge_ipv4: Gauge, + peers_gauge_ipv6: Gauge, + torrents_gauge_ipv4: Gauge, + torrents_gauge_ipv6: Gauge, +} + +impl Default for TorrentMaps { + fn default() -> Self { + Self { + ipv4: Default::default(), + ipv6: Default::default(), + peers_gauge_ipv4: ::metrics::gauge!( + "aquatic_peers", + "ip_version" => "4", + "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), + ), + peers_gauge_ipv6: ::metrics::gauge!( + "aquatic_peers", + "ip_version" => "6", + "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), + ), + torrents_gauge_ipv4: ::metrics::gauge!( + "aquatic_torrents", + "ip_version" => "4", + "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), + ), + torrents_gauge_ipv6: ::metrics::gauge!( + "aquatic_torrents", + "ip_version" => "6", + "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), + ), + } + } } impl TorrentMaps { @@ -33,12 +66,11 @@ impl TorrentMaps { request_sender_meta: InMessageMeta, request: AnnounceRequest, ) { - let (torrent_data, ip_version): (&mut TorrentData, &'static str) = - if let IpVersion::V4 = request_sender_meta.ip_version { - (self.ipv4.entry(request.info_hash).or_default(), "4") - } else { - (self.ipv6.entry(request.info_hash).or_default(), "6") - }; + let torrent_data = if let IpVersion::V4 = request_sender_meta.ip_version { + self.ipv4.entry(request.info_hash).or_default() + } else { + self.ipv6.entry(request.info_hash).or_default() + }; // If there is already a peer with this peer_id, check that connection id // is same as that of request sender. Otherwise, ignore request. Since @@ -89,12 +121,10 @@ impl TorrentMaps { } #[cfg(feature = "metrics")] - ::metrics::decrement_gauge!( - "aquatic_peers", - 1.0, - "ip_version" => ip_version, - "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), - ); + match request_sender_meta.ip_version { + IpVersion::V4 => self.peers_gauge_ipv4.decrement(1.0), + IpVersion::V6 => self.peers_gauge_ipv6.decrement(1.0), + } return; } @@ -112,12 +142,10 @@ impl TorrentMaps { entry.insert(peer); #[cfg(feature = "metrics")] - ::metrics::increment_gauge!( - "aquatic_peers", - 1.0, - "ip_version" => ip_version, - "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), - ); + match request_sender_meta.ip_version { + IpVersion::V4 => self.peers_gauge_ipv4.increment(1.0), + IpVersion::V6 => self.peers_gauge_ipv6.increment(1.0), + } } PeerStatus::Seeding => { torrent_data.num_seeders += 1; @@ -133,12 +161,10 @@ impl TorrentMaps { entry.insert(peer); #[cfg(feature = "metrics")] - ::metrics::increment_gauge!( - "aquatic_peers", - 1.0, - "ip_version" => ip_version, - "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), - ); + match request_sender_meta.ip_version { + IpVersion::V4 => self.peers_gauge_ipv4.increment(1.0), + IpVersion::V6 => self.peers_gauge_ipv6.increment(1.0), + } } PeerStatus::Stopped => return, }, @@ -311,8 +337,20 @@ impl TorrentMaps { let mut access_list_cache = create_access_list_cache(access_list); let now = server_start_instant.seconds_elapsed(); - Self::clean_torrent_map(config, &mut access_list_cache, &mut self.ipv4, now, "4"); - Self::clean_torrent_map(config, &mut access_list_cache, &mut self.ipv6, now, "6"); + Self::clean_torrent_map( + config, + &mut access_list_cache, + &mut self.ipv4, + now, + &self.peers_gauge_ipv4, + ); + Self::clean_torrent_map( + config, + &mut access_list_cache, + &mut self.ipv6, + now, + &self.peers_gauge_ipv6, + ); } fn clean_torrent_map( @@ -320,7 +358,7 @@ impl TorrentMaps { access_list_cache: &mut AccessListCache, torrent_map: &mut TorrentMap, now: SecondsSinceServerStart, - ip_version: &'static str, + peers_gauge: &Gauge, ) { let mut total_num_peers = 0u64; @@ -357,31 +395,14 @@ impl TorrentMaps { torrent_map.shrink_to_fit(); - let total_num_peers = total_num_peers as f64; - #[cfg(feature = "metrics")] - ::metrics::gauge!( - "aquatic_peers", - total_num_peers, - "ip_version" => ip_version, - "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), - ); + peers_gauge.set(total_num_peers as f64) } #[cfg(feature = "metrics")] pub fn update_torrent_count_metrics(&self) { - ::metrics::gauge!( - "aquatic_torrents", - self.ipv4.len() as f64, - "ip_version" => "4", - "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), - ); - ::metrics::gauge!( - "aquatic_torrents", - self.ipv6.len() as f64, - "ip_version" => "6", - "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), - ); + self.torrents_gauge_ipv4.set(self.ipv4.len() as f64); + self.torrents_gauge_ipv6.set(self.ipv6.len() as f64); } pub fn handle_connection_closed( @@ -397,24 +418,14 @@ impl TorrentMaps { torrent_data.remove_peer(peer_id); #[cfg(feature = "metrics")] - ::metrics::decrement_gauge!( - "aquatic_peers", - 1.0, - "ip_version" => "4", - "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), - ); + self.peers_gauge_ipv4.decrement(1.0); } } else { if let Some(torrent_data) = self.ipv6.get_mut(&info_hash) { torrent_data.remove_peer(peer_id); #[cfg(feature = "metrics")] - ::metrics::decrement_gauge!( - "aquatic_peers", - 1.0, - "ip_version" => "6", - "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), - ); + self.peers_gauge_ipv6.decrement(1.0); } } } diff --git a/crates/ws_load_test/Cargo.toml b/crates/ws_load_test/Cargo.toml index 2ad2a728..90dbe94f 100644 --- a/crates/ws_load_test/Cargo.toml +++ b/crates/ws_load_test/Cargo.toml @@ -19,18 +19,18 @@ aquatic_toml_config.workspace = true aquatic_ws_protocol.workspace = true anyhow = "1" -async-tungstenite = "0.23" +async-tungstenite = "0.24" futures = "0.3" -futures-rustls = "0.24" +futures-rustls = "0.25" glommio = "0.8" log = "0.4" mimalloc = { version = "0.1", default-features = false } rand = { version = "0.8", features = ["small_rng"] } rand_distr = "0.4" -rustls = { version = "0.21", default-features = false, features = ["dangerous_configuration"] } +rustls = { version = "0.22" } serde = { version = "1", features = ["derive"] } serde_json = "1" -tungstenite = "0.20" +tungstenite = "0.21" [dev-dependencies] quickcheck = "1" diff --git a/crates/ws_load_test/src/main.rs b/crates/ws_load_test/src/main.rs index a3008d5d..479f2aa8 100644 --- a/crates/ws_load_test/src/main.rs +++ b/crates/ws_load_test/src/main.rs @@ -87,25 +87,53 @@ fn run(config: Config) -> ::anyhow::Result<()> { Ok(()) } +#[derive(Debug)] struct FakeCertificateVerifier; -impl rustls::client::ServerCertVerifier for FakeCertificateVerifier { +impl rustls::client::danger::ServerCertVerifier for FakeCertificateVerifier { fn verify_server_cert( &self, - _end_entity: &rustls::Certificate, - _intermediates: &[rustls::Certificate], - _server_name: &rustls::ServerName, - _scts: &mut dyn Iterator, + _end_entity: &rustls::pki_types::CertificateDer<'_>, + _intermediates: &[rustls::pki_types::CertificateDer<'_>], + _server_name: &rustls::pki_types::ServerName<'_>, _ocsp_response: &[u8], - _now: std::time::SystemTime, - ) -> Result { - Ok(rustls::client::ServerCertVerified::assertion()) + _now: rustls::pki_types::UnixTime, + ) -> Result { + Ok(rustls::client::danger::ServerCertVerified::assertion()) + } + + fn verify_tls12_signature( + &self, + _message: &[u8], + _cert: &rustls::pki_types::CertificateDer<'_>, + _dss: &rustls::DigitallySignedStruct, + ) -> Result { + Ok(rustls::client::danger::HandshakeSignatureValid::assertion()) + } + + fn verify_tls13_signature( + &self, + _message: &[u8], + _cert: &rustls::pki_types::CertificateDer<'_>, + _dss: &rustls::DigitallySignedStruct, + ) -> Result { + Ok(rustls::client::danger::HandshakeSignatureValid::assertion()) + } + + fn supported_verify_schemes(&self) -> Vec { + vec![ + rustls::SignatureScheme::ECDSA_NISTP384_SHA384, + rustls::SignatureScheme::ECDSA_NISTP256_SHA256, + rustls::SignatureScheme::RSA_PSS_SHA512, + rustls::SignatureScheme::RSA_PSS_SHA384, + rustls::SignatureScheme::RSA_PSS_SHA256, + rustls::SignatureScheme::ED25519, + ] } } fn create_tls_config() -> anyhow::Result> { let mut config = rustls::ClientConfig::builder() - .with_safe_defaults() .with_root_certificates(rustls::RootCertStore::empty()) .with_no_client_auth(); diff --git a/crates/ws_protocol/Cargo.toml b/crates/ws_protocol/Cargo.toml index 54053a05..df05b2dd 100644 --- a/crates/ws_protocol/Cargo.toml +++ b/crates/ws_protocol/Cargo.toml @@ -24,8 +24,8 @@ anyhow = "1" hashbrown = { version = "0.14", features = ["serde"] } serde = { version = "1", features = ["derive"] } serde_json = "1" -simd-json = "0.12" -tungstenite = "0.20" +simd-json = "0.13" +tungstenite = "0.21" [dev-dependencies] criterion = "0.5"