From 8fa9b3be61fd9ed72ab3e37faac275879100afec Mon Sep 17 00:00:00 2001 From: Jakub Zajkowski Date: Mon, 27 Jan 2025 13:11:17 +0100 Subject: [PATCH 1/4] Changing error messages in case of node disconnection; cleaning up unused properties; removing `request_limit` and the logic attached to that since we don't actually handle multiple in-flight requests; Removing the possibility to define "infinite" as a valid retry amount in node client connector since it can lead to deadlocks --- Cargo.lock | 24 +- README.md | 4 +- metrics/src/rpc.rs | 39 ++- .../example_configs/EXAMPLE_NCTL_CONFIG.toml | 4 +- .../EXAMPLE_NCTL_POSTGRES_CONFIG.toml | 4 +- .../example_configs/EXAMPLE_NODE_CONFIG.toml | 4 +- .../default_debian_config.toml | 6 +- .../default_rpc_only_config.toml | 2 +- .../default_sse_only_config.toml | 2 +- rpc_sidecar/src/config.rs | 234 +----------------- rpc_sidecar/src/lib.rs | 2 +- rpc_sidecar/src/node_client.rs | 121 +++++---- sidecar/src/config.rs | 13 +- sidecar/src/run.rs | 27 +- 14 files changed, 167 insertions(+), 319 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7c517db9..f1971d1a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -493,7 +493,7 @@ dependencies = [ [[package]] name = "casper-binary-port" version = "1.0.0" -source = "git+https://github.com/casper-network/casper-node.git?branch=dev#b1540891bdbb25d586a209afa4e2f687d397cc66" +source = "git+https://github.com/casper-network/casper-node.git?branch=dev#83ab415edd779746477f77c5d6daf1ec9525f965" dependencies = [ "bincode", "bytes", @@ -699,7 +699,7 @@ dependencies = [ [[package]] name = "casper-types" version = "5.0.0" -source = "git+https://github.com/casper-network/casper-node.git?branch=dev#b1540891bdbb25d586a209afa4e2f687d397cc66" +source = "git+https://github.com/casper-network/casper-node.git?branch=dev#83ab415edd779746477f77c5d6daf1ec9525f965" dependencies = [ "base16", "base64 0.13.1", @@ -865,9 +865,9 @@ checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" [[package]] name = "cpufeatures" -version = "0.2.16" +version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "16b80225097f2e5ae4e7179dd2266824648f3e2f49d9134d584b76389d31c4c3" +checksum = "59ed5838eebb26a2bb2e58f6d5b5316989ae9d08bab10e0e6d103e656d1b0280" dependencies = [ "libc", ] @@ -3048,9 +3048,9 @@ dependencies = [ [[package]] name = "native-tls" -version = "0.2.12" +version = "0.2.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8614eb2c83d59d1c8cc974dd3f920198647674a0a035e1af1fa58707e317466" +checksum = "0dab59f8e050d5df8e4dd87d9206fb6f65a483e20ac9fda365ade4fab353196c" dependencies = [ "libc", "log", @@ -3240,9 +3240,9 @@ checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381" [[package]] name = "openssl" -version = "0.10.68" +version = "0.10.69" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6174bc48f102d208783c2c84bf931bb75927a617866870de8a4ea85597f871f5" +checksum = "f5e534d133a060a3c19daec1eb3e98ec6f4685978834f2dbadfe2ec215bab64e" dependencies = [ "bitflags 2.8.0", "cfg-if", @@ -3266,9 +3266,9 @@ dependencies = [ [[package]] name = "openssl-probe" -version = "0.1.5" +version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" +checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" [[package]] name = "openssl-sys" @@ -5325,9 +5325,9 @@ checksum = "7eec5d1121208364f6793f7d2e222bf75a915c19557537745b195b253dd64217" [[package]] name = "unicode-ident" -version = "1.0.14" +version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "adb9e6ca4f869e1180728b7950e35922a7fc6397f7b641499e8f3ef06e50dc83" +checksum = "11cd88e12b17c6494200a9c1b683a04fcac9573ed74cd1b62aeb2727c5592243" [[package]] name = "unicode-normalization" diff --git a/README.md b/README.md index 3dff2da9..95f7cb32 100644 --- a/README.md +++ b/README.md @@ -208,8 +208,8 @@ address = '0.0.0.0:28101' max_message_size_bytes = 4_194_304 request_limit = 3 request_buffer_size = 16 -message_timeout_secs = 30 -client_access_timeout_secs = 2 +message_timeout_secs = 10 +client_access_timeout_secs = 10 [rpc_server.speculative_exec_server] enable_server = true diff --git a/metrics/src/rpc.rs b/metrics/src/rpc.rs index f2334e38..9c32cd58 100644 --- a/metrics/src/rpc.rs +++ b/metrics/src/rpc.rs @@ -8,8 +8,8 @@ const RESPONSE_SIZE_BUCKETS: &[f64; 8] = &[ 5e+2_f64, 1e+3_f64, 2e+3_f64, 5e+3_f64, 5e+4_f64, 5e+5_f64, 5e+6_f64, 5e+7_f64, ]; -const RESPONSE_TIME_MS_BUCKETS: &[f64; 8] = &[ - 1_f64, 5_f64, 10_f64, 30_f64, 50_f64, 100_f64, 200_f64, 300_f64, +const RESPONSE_TIME_MS_BUCKETS: &[f64; 9] = &[ + 1_f64, 5_f64, 10_f64, 30_f64, 50_f64, 100_f64, 300_f64, 1000_f64, 3000_f64, ]; static ENDPOINT_CALLS: Lazy = Lazy::new(|| { @@ -24,6 +24,21 @@ static ENDPOINT_CALLS: Lazy = Lazy::new(|| { counter }); +static TIMEOUT_COUNTERS: Lazy = Lazy::new(|| { + let counter = IntCounterVec::new( + Opts::new( + "rpc_server_timeout_counts", + "Counters for how many of the requests failed due to internal timeout", + ), + &["timer"], + ) + .unwrap(); + REGISTRY + .register(Box::new(counter.clone())) + .expect("cannot register metric"); + counter +}); + static RESPONSE_TIMES_MS: Lazy = Lazy::new(|| { let histogram = HistogramVec::new( HistogramOpts { @@ -56,6 +71,18 @@ static RECONNECT_TIMES_MS: Lazy = Lazy::new(|| { histogram }); +static MISMATCHED_IDS: Lazy = Lazy::new(|| { + let counter = IntGauge::new( + "rpc_server_mismatched_ids", + "Number of mismathced id events observed in responses from binary port", + ) + .expect("rpc_server_mismatched_ids metric can't be created"); + REGISTRY + .register(Box::new(counter.clone())) + .expect("cannot register metric"); + counter +}); + static DISCONNECT_EVENTS: Lazy = Lazy::new(|| { let counter = IntGauge::new( "rpc_server_disconnects", @@ -108,3 +135,11 @@ pub fn register_request_size(method: &str, payload_size: f64) { .with_label_values(&[method]) .observe(payload_size); } + +pub fn register_timeout(timer_name: &str) { + TIMEOUT_COUNTERS.with_label_values(&[timer_name]).inc(); +} + +pub fn register_mismatched_id() { + MISMATCHED_IDS.inc(); +} diff --git a/resources/example_configs/EXAMPLE_NCTL_CONFIG.toml b/resources/example_configs/EXAMPLE_NCTL_CONFIG.toml index 1d7400c8..3d6e10f2 100644 --- a/resources/example_configs/EXAMPLE_NCTL_CONFIG.toml +++ b/resources/example_configs/EXAMPLE_NCTL_CONFIG.toml @@ -18,10 +18,8 @@ cors_origin = "" ip_address = "0.0.0.0" port = 28102 max_message_size_bytes = 4194304 -request_limit = 3 -request_buffer_size = 16 message_timeout_secs = 30 -client_access_timeout_secs = 2 +client_access_timeout_secs = 10 keepalive_timeout_ms = 10_000 [rpc_server.node_client.exponential_backoff] diff --git a/resources/example_configs/EXAMPLE_NCTL_POSTGRES_CONFIG.toml b/resources/example_configs/EXAMPLE_NCTL_POSTGRES_CONFIG.toml index f8413f01..3d89e8c3 100644 --- a/resources/example_configs/EXAMPLE_NCTL_POSTGRES_CONFIG.toml +++ b/resources/example_configs/EXAMPLE_NCTL_POSTGRES_CONFIG.toml @@ -18,10 +18,8 @@ cors_origin = "" ip_address = "0.0.0.0" port = 28102 max_message_size_bytes = 4194304 -request_limit = 3 -request_buffer_size = 16 message_timeout_secs = 30 -client_access_timeout_secs = 2 +client_access_timeout_secs = 10 keepalive_timeout_ms = 10_000 [rpc_server.node_client.exponential_backoff] diff --git a/resources/example_configs/EXAMPLE_NODE_CONFIG.toml b/resources/example_configs/EXAMPLE_NODE_CONFIG.toml index 5637c0b3..b54d440f 100644 --- a/resources/example_configs/EXAMPLE_NODE_CONFIG.toml +++ b/resources/example_configs/EXAMPLE_NODE_CONFIG.toml @@ -18,10 +18,8 @@ cors_origin = "" ip_address = "3.20.57.210" port = 7777 max_message_size_bytes = 4194304 -request_limit = 10 -request_buffer_size = 50 message_timeout_secs = 60 -client_access_timeout_secs = 60 +client_access_timeout_secs = 10 keepalive_timeout_ms = 10_000 [rpc_server.node_client.exponential_backoff] diff --git a/resources/example_configs/default_debian_config.toml b/resources/example_configs/default_debian_config.toml index c0639b98..4e436a93 100644 --- a/resources/example_configs/default_debian_config.toml +++ b/resources/example_configs/default_debian_config.toml @@ -71,14 +71,10 @@ ip_address = '127.0.0.1' port = 7779 # Maximum size of a message in bytes. max_message_size_bytes = 4_194_304 -# Maximum number of in-flight node requests. -request_limit = 3 -# Number of node requests that can be buffered. -request_buffer_size = 16 # Timeout for a node request in seconds. message_timeout_secs = 30 # Timeout specifying how long to wait for binary port client to be available. -client_access_timeout_secs = 2 +client_access_timeout_secs = 10 # The amount of time in milliseconds to wait between sending keepalive requests. keepalive_timeout_ms = 10_000 diff --git a/resources/example_configs/default_rpc_only_config.toml b/resources/example_configs/default_rpc_only_config.toml index 63e53aa0..5b6ce3ed 100644 --- a/resources/example_configs/default_rpc_only_config.toml +++ b/resources/example_configs/default_rpc_only_config.toml @@ -78,7 +78,7 @@ request_buffer_size = 16 # Timeout for a node request in seconds. message_timeout_secs = 30 # Timeout specifying how long to wait for binary port client to be available. -client_access_timeout_secs = 2 +client_access_timeout_secs = 10 # The amount of time in milliseconds to wait between sending keepalive requests. keepalive_timeout_ms = 10_000 diff --git a/resources/example_configs/default_sse_only_config.toml b/resources/example_configs/default_sse_only_config.toml index b477133a..5ab0b882 100644 --- a/resources/example_configs/default_sse_only_config.toml +++ b/resources/example_configs/default_sse_only_config.toml @@ -32,7 +32,7 @@ port = 18888 max_concurrent_requests = 50 max_requests_per_second = 50 -[admin_server] +[admin_api_server] enable_server = true port = 18887 max_concurrent_requests = 1 diff --git a/rpc_sidecar/src/config.rs b/rpc_sidecar/src/config.rs index bda13d46..85a77feb 100644 --- a/rpc_sidecar/src/config.rs +++ b/rpc_sidecar/src/config.rs @@ -1,7 +1,4 @@ -use std::{ - convert::{TryFrom, TryInto}, - net::{IpAddr, Ipv4Addr}, -}; +use std::net::{IpAddr, Ipv4Addr}; use datasize::DataSize; use serde::Deserialize; @@ -22,32 +19,6 @@ const DEFAULT_MAX_BODY_BYTES: u32 = 2_621_440; /// Default CORS origin. const DEFAULT_CORS_ORIGIN: &str = ""; -#[derive(Clone, Debug, Deserialize, PartialEq, Eq)] -// Disallow unknown fields to ensure config files and command-line overrides contain valid keys. -#[serde(deny_unknown_fields)] -pub struct RpcServerConfigTarget { - pub main_server: RpcConfig, - pub speculative_exec_server: Option, - pub node_client: NodeClientConfigTarget, -} - -impl TryFrom for RpcServerConfig { - type Error = FieldParseError; - fn try_from(value: RpcServerConfigTarget) -> Result { - let node_client = value.node_client.try_into().map_err(|e: FieldParseError| { - FieldParseError::ParseError { - field_name: "node_client", - error: e.to_string(), - } - })?; - Ok(RpcServerConfig { - main_server: value.main_server, - speculative_exec_server: value.speculative_exec_server, - node_client, - }) - } -} - #[derive(Error, Debug)] pub enum FieldParseError { #[error("failed to parse field {} with error: {}", .field_name, .error)] @@ -116,10 +87,6 @@ const DEFAULT_MAX_PAYLOAD_SIZE: u32 = 4 * 1024 * 1024; const DEFAULT_MESSAGE_TIMEOUT_SECS: u64 = 30; /// Default timeout for client access. const DEFAULT_CLIENT_ACCESS_TIMEOUT_SECS: u64 = 10; -/// Default request limit. -const DEFAULT_NODE_REQUEST_LIMIT: u16 = 3; -/// Default request buffer size. -const DEFAULT_REQUEST_BUFFER_SIZE: usize = 16; /// Default exponential backoff base delay. const DEFAULT_EXPONENTIAL_BACKOFF_BASE_MS: u64 = 1000; /// Default exponential backoff maximum delay. @@ -128,6 +95,8 @@ const DEFAULT_EXPONENTIAL_BACKOFF_MAX_MS: u64 = 64_000; const DEFAULT_EXPONENTIAL_BACKOFF_COEFFICIENT: u64 = 2; /// Default keep alive timeout milliseconds. const DEFAULT_KEEPALIVE_TIMEOUT_MS: u64 = 1_000; +/// Default max attempts +const DEFAULT_MAX_ATTEMPTS: u32 = 3; /// Node client configuration. #[derive(Clone, DataSize, Debug, Deserialize, PartialEq, Eq)] @@ -145,10 +114,6 @@ pub struct NodeClientConfig { /// Timeout specifying how long to wait for binary port client to be available. // Access to the client is synchronized. pub client_access_timeout_secs: u64, - /// Maximum number of in-flight node requests. - pub request_limit: u16, - /// Number of node requests that can be buffered. - pub request_buffer_size: usize, /// The amount of ms to wait between sending keepalive requests. pub keepalive_timeout_ms: u64, /// Configuration for exponential backoff to be used for re-connects. @@ -161,9 +126,7 @@ impl NodeClientConfig { NodeClientConfig { ip_address: DEFAULT_NODE_CONNECT_IP_ADDRESS, port: DEFAULT_NODE_CONNECT_PORT, - request_limit: DEFAULT_NODE_REQUEST_LIMIT, max_message_size_bytes: DEFAULT_MAX_PAYLOAD_SIZE, - request_buffer_size: DEFAULT_REQUEST_BUFFER_SIZE, message_timeout_secs: DEFAULT_MESSAGE_TIMEOUT_SECS, client_access_timeout_secs: DEFAULT_CLIENT_ACCESS_TIMEOUT_SECS, keepalive_timeout_ms: DEFAULT_KEEPALIVE_TIMEOUT_MS, @@ -171,7 +134,7 @@ impl NodeClientConfig { initial_delay_ms: DEFAULT_EXPONENTIAL_BACKOFF_BASE_MS, max_delay_ms: DEFAULT_EXPONENTIAL_BACKOFF_MAX_MS, coefficient: DEFAULT_EXPONENTIAL_BACKOFF_COEFFICIENT, - max_attempts: MaxAttempts::Infinite, + max_attempts: DEFAULT_MAX_ATTEMPTS, }, } } @@ -183,9 +146,7 @@ impl NodeClientConfig { NodeClientConfig { ip_address: localhost, port, - request_limit: DEFAULT_NODE_REQUEST_LIMIT, max_message_size_bytes: DEFAULT_MAX_PAYLOAD_SIZE, - request_buffer_size: DEFAULT_REQUEST_BUFFER_SIZE, message_timeout_secs: DEFAULT_MESSAGE_TIMEOUT_SECS, client_access_timeout_secs: DEFAULT_CLIENT_ACCESS_TIMEOUT_SECS, keepalive_timeout_ms: DEFAULT_KEEPALIVE_TIMEOUT_MS, @@ -193,7 +154,7 @@ impl NodeClientConfig { initial_delay_ms: DEFAULT_EXPONENTIAL_BACKOFF_BASE_MS, max_delay_ms: DEFAULT_EXPONENTIAL_BACKOFF_MAX_MS, coefficient: DEFAULT_EXPONENTIAL_BACKOFF_COEFFICIENT, - max_attempts: MaxAttempts::Infinite, + max_attempts: DEFAULT_MAX_ATTEMPTS, }, } } @@ -201,14 +162,12 @@ impl NodeClientConfig { /// Creates an instance of `NodeClientConfig` with specified listening port and maximum number /// of reconnection retries. #[cfg(any(feature = "testing", test))] - pub fn new_with_port_and_retries(port: u16, num_of_retries: usize) -> Self { + pub fn new_with_port_and_retries(port: u16, num_of_retries: u32) -> Self { let localhost = IpAddr::V4(Ipv4Addr::LOCALHOST); NodeClientConfig { ip_address: localhost, port, - request_limit: DEFAULT_NODE_REQUEST_LIMIT, max_message_size_bytes: DEFAULT_MAX_PAYLOAD_SIZE, - request_buffer_size: DEFAULT_REQUEST_BUFFER_SIZE, message_timeout_secs: DEFAULT_MESSAGE_TIMEOUT_SECS, client_access_timeout_secs: DEFAULT_CLIENT_ACCESS_TIMEOUT_SECS, keepalive_timeout_ms: DEFAULT_KEEPALIVE_TIMEOUT_MS, @@ -216,7 +175,7 @@ impl NodeClientConfig { initial_delay_ms: 500, max_delay_ms: 3000, coefficient: 3, - max_attempts: MaxAttempts::Finite(num_of_retries), + max_attempts: num_of_retries, }, } } @@ -228,57 +187,6 @@ impl Default for NodeClientConfig { } } -/// Node client configuration. -#[derive(Clone, DataSize, Debug, Deserialize, PartialEq, Eq)] -// Disallow unknown fields to ensure config files and command-line overrides contain valid keys. -#[serde(deny_unknown_fields)] -pub struct NodeClientConfigTarget { - /// IP address of the node. - pub ip_address: IpAddr, - /// TCP port of the node - pub port: u16, - /// Maximum size of a message in bytes. - pub max_message_size_bytes: u32, - /// Message transfer timeout in seconds. - pub message_timeout_secs: u64, - /// Timeout specifying how long to wait for binary port client to be available. - // Access to the client is synchronized. - pub client_access_timeout_secs: u64, - /// Maximum number of in-flight node requests. - pub request_limit: u16, - /// Number of node requests that can be buffered. - pub request_buffer_size: usize, - /// The amount of ms to wait between sending keepalive requests. - pub keepalive_timeout_ms: u64, - /// Configuration for exponential backoff to be used for re-connects. - pub exponential_backoff: ExponentialBackoffConfigTarget, -} - -impl TryFrom for NodeClientConfig { - type Error = FieldParseError; - fn try_from(value: NodeClientConfigTarget) -> Result { - let exponential_backoff = - value - .exponential_backoff - .try_into() - .map_err(|e: FieldParseError| FieldParseError::ParseError { - field_name: "exponential_backoff", - error: e.to_string(), - })?; - Ok(NodeClientConfig { - ip_address: value.ip_address, - port: value.port, - request_limit: value.request_limit, - max_message_size_bytes: value.max_message_size_bytes, - request_buffer_size: value.request_buffer_size, - client_access_timeout_secs: value.client_access_timeout_secs, - message_timeout_secs: value.message_timeout_secs, - keepalive_timeout_ms: value.keepalive_timeout_ms, - exponential_backoff, - }) - } -} - /// Exponential backoff configuration for re-connects. #[derive(Clone, DataSize, Debug, Deserialize, PartialEq, Eq)] // Disallow unknown fields to ensure config files and command-line overrides contain valid keys. @@ -291,131 +199,5 @@ pub struct ExponentialBackoffConfig { /// The multiplier to apply to the previous delay to get the next delay. pub coefficient: u64, /// Maximum number of connection attempts. - pub max_attempts: MaxAttempts, -} - -/// Exponential backoff configuration for re-connects. -#[derive(Clone, DataSize, Debug, Deserialize, PartialEq, Eq)] -// Disallow unknown fields to ensure config files and command-line overrides contain valid keys. -#[serde(deny_unknown_fields)] -pub struct ExponentialBackoffConfigTarget { - /// Initial wait time before the first re-connect attempt. - pub initial_delay_ms: u64, - /// Maximum wait time between re-connect attempts. - pub max_delay_ms: u64, - /// The multiplier to apply to the previous delay to get the next delay. - pub coefficient: u64, - /// Maximum number of re-connect attempts. - pub max_attempts: MaxAttemptsTarget, -} - -impl TryFrom for ExponentialBackoffConfig { - type Error = FieldParseError; - fn try_from(value: ExponentialBackoffConfigTarget) -> Result { - let max_attempts = value - .max_attempts - .try_into() - .map_err(|e: MaxAttemptsError| FieldParseError::ParseError { - field_name: "max_attempts", - error: e.to_string(), - })?; - Ok(ExponentialBackoffConfig { - initial_delay_ms: value.initial_delay_ms, - max_delay_ms: value.max_delay_ms, - coefficient: value.coefficient, - max_attempts, - }) - } -} - -#[derive(Clone, DataSize, Debug, Deserialize, PartialEq, Eq)] -pub enum MaxAttempts { - Infinite, - Finite(usize), -} - -impl MaxAttempts { - pub fn can_attempt(&self, current_attempt: usize) -> bool { - match self { - MaxAttempts::Infinite => true, - MaxAttempts::Finite(max_attempts) => *max_attempts >= current_attempt, - } - } -} - -#[derive(Clone, DataSize, Debug, Deserialize, PartialEq, Eq)] -#[serde(untagged)] -pub enum MaxAttemptsTarget { - StringBased(String), - UsizeBased(usize), -} - -impl TryFrom for MaxAttempts { - type Error = MaxAttemptsError; - fn try_from(value: MaxAttemptsTarget) -> Result { - match value { - MaxAttemptsTarget::StringBased(s) => { - if s == "infinite" { - Ok(MaxAttempts::Infinite) - } else { - Err(MaxAttemptsError::UnexpectedValue(s)) - } - } - MaxAttemptsTarget::UsizeBased(u) => { - if u == 0 { - Err(MaxAttemptsError::UnexpectedValue(u.to_string())) - } else { - Ok(MaxAttempts::Finite(u)) - } - } - } - } -} - -#[derive(Error, Debug)] -pub enum MaxAttemptsError { - #[error("Max attempts must be either 'infinite' or a integer > 0. Got: {}", .0)] - UnexpectedValue(String), -} - -#[cfg(test)] -mod tests { - use super::*; - #[test] - fn test_should_deserialize_infinite() { - let json = r#""infinite""#.to_string(); - let deserialized: MaxAttempts = serde_json::from_str::(&json) - .unwrap() - .try_into() - .unwrap(); - assert_eq!(deserialized, MaxAttempts::Infinite); - } - - #[test] - fn test_should_deserialize_finite() { - let json = r#"125"#.to_string(); - let deserialized: MaxAttempts = serde_json::from_str::(&json) - .unwrap() - .try_into() - .unwrap(); - assert_eq!(deserialized, MaxAttempts::Finite(125)); - } - - #[test] - fn test_should_fail_on_other_inputs() { - assert_failing_deserialization(r#""x""#); - assert_failing_deserialization(r#""infiniteee""#); - assert_failing_deserialization(r#""infinite ""#); - assert_failing_deserialization(r#"" infinite""#); - let deserialized = serde_json::from_str::(r#"-1"#); - assert!(deserialized.is_err()); - } - - fn assert_failing_deserialization(input: &str) { - let deserialized: Result = - serde_json::from_str::(input) - .unwrap() - .try_into(); - assert!(deserialized.is_err(), "input = {}", input); - } + pub max_attempts: u32, } diff --git a/rpc_sidecar/src/lib.rs b/rpc_sidecar/src/lib.rs index 3149cad5..acd5d983 100644 --- a/rpc_sidecar/src/lib.rs +++ b/rpc_sidecar/src/lib.rs @@ -11,7 +11,7 @@ use anyhow::Error; use casper_binary_port::{BinaryRequest, BinaryRequestHeader}; use casper_types::bytesrepr::ToBytes; use casper_types::{bytesrepr, ProtocolVersion}; -pub use config::{FieldParseError, RpcServerConfig, RpcServerConfigTarget}; +pub use config::{FieldParseError, RpcServerConfig}; pub use config::{NodeClientConfig, RpcConfig}; use futures::future::BoxFuture; use futures::FutureExt; diff --git a/rpc_sidecar/src/node_client.rs b/rpc_sidecar/src/node_client.rs index 29c067ef..fda90839 100644 --- a/rpc_sidecar/src/node_client.rs +++ b/rpc_sidecar/src/node_client.rs @@ -1,12 +1,16 @@ -use crate::{config::MaxAttempts, encode_request, NodeClientConfig, SUPPORTED_PROTOCOL_VERSION}; +use crate::{ + config::ExponentialBackoffConfig, encode_request, NodeClientConfig, SUPPORTED_PROTOCOL_VERSION, +}; use anyhow::Error as AnyhowError; use async_trait::async_trait; use futures::{Future, SinkExt, StreamExt}; -use metrics::rpc::{inc_disconnect, observe_reconnect_time}; +use metrics::rpc::{ + inc_disconnect, observe_reconnect_time, register_mismatched_id, register_timeout, +}; use serde::de::DeserializeOwned; use std::{ convert::{TryFrom, TryInto}, - net::SocketAddr, + net::{IpAddr, SocketAddr}, sync::{ atomic::{AtomicU16, Ordering}, Arc, @@ -39,7 +43,7 @@ use std::{ }; use tokio::{ net::TcpStream, - sync::{futures::Notified, RwLock, RwLockWriteGuard, Semaphore}, + sync::{futures::Notified, RwLock, RwLockWriteGuard}, }; use tracing::{error, field, info, warn}; @@ -765,6 +769,8 @@ pub enum Error { GasPriceToleranceTooLow, #[error("received v1 transaction for speculative execution")] ReceivedV1Transaction, + #[error("connection to node lost")] + ConnectionLost, } impl Error { @@ -924,7 +930,6 @@ pub struct FramedNodeClient { reconnect: Arc>, shutdown: Arc>, config: NodeClientConfig, - request_limit: Semaphore, current_request_id: Arc, } @@ -941,7 +946,13 @@ impl FramedNodeClient { AnyhowError, > { let stream = Arc::new(RwLock::new( - Self::connect_with_retries(&config, None).await?, + Self::connect_with_retries( + &config.ip_address, + config.port, + &config.exponential_backoff, + config.max_message_size_bytes, + ) + .await?, )); let shutdown = Notify::::new(); @@ -963,7 +974,6 @@ impl FramedNodeClient { let node_client = Self { client: Arc::clone(&stream), - request_limit: Semaphore::new(config.request_limit as usize), reconnect, shutdown, config, @@ -1059,6 +1069,7 @@ impl FramedNodeClient { .await .map_err(|_| Error::RequestFailed("timeout".to_owned()))? { + register_timeout("sending_payload"); return Err(Error::RequestFailed(err.to_string())); }; @@ -1069,7 +1080,8 @@ impl FramedNodeClient { ) .await else { - return Err(Error::RequestFailed("timeout".to_owned())); + register_timeout("receiving_response"); + return Err(Error::ConnectionLost); }; if let Some(response) = maybe_response { @@ -1086,12 +1098,16 @@ impl FramedNodeClient { // If our expected ID is greater than the one we received, it means we can // try to recover from the situation by reading more responses from the stream. warn!(%err, "received a response with an outdated id, trying another response"); + register_mismatched_id(); continue; } - Err(err) => return Err(err), + Err(err) => { + register_mismatched_id(); + return Err(err); + } } } else { - return Err(Error::RequestFailed("disconnected".to_owned())); + return Err(Error::ConnectionLost); } } @@ -1111,28 +1127,26 @@ impl FramedNodeClient { } async fn connect_with_retries( - config: &NodeClientConfig, - maybe_max_attempts_override: Option<&MaxAttempts>, + ip_address: &IpAddr, + port: u16, + backoff_config: &ExponentialBackoffConfig, + max_message_size_bytes: u32, ) -> Result, AnyhowError> { - let mut wait = config.exponential_backoff.initial_delay_ms; - let max_attempts = if let Some(attempts) = maybe_max_attempts_override { - attempts - } else { - &config.exponential_backoff.max_attempts - }; - let tcp_socket = SocketAddr::new(config.ip_address, config.port); + let mut wait = backoff_config.initial_delay_ms; + let max_attempts = &backoff_config.max_attempts; + let tcp_socket = SocketAddr::new(*ip_address, port); let mut current_attempt = 1; loop { match TcpStream::connect(tcp_socket).await { Ok(stream) => { return Ok(Framed::new( stream, - BinaryMessageCodec::new(config.max_message_size_bytes), + BinaryMessageCodec::new(max_message_size_bytes), )) } Err(err) => { current_attempt += 1; - if !max_attempts.can_attempt(current_attempt) { + if *max_attempts < current_attempt { anyhow::bail!( "Couldn't connect to node {} after {} attempts", tcp_socket, @@ -1141,8 +1155,7 @@ impl FramedNodeClient { } warn!(%err, "failed to connect to node {tcp_socket}, waiting {wait}ms before retrying"); tokio::time::sleep(Duration::from_millis(wait)).await; - wait = (wait * config.exponential_backoff.coefficient) - .min(config.exponential_backoff.max_delay_ms); + wait = (wait * backoff_config.coefficient).min(backoff_config.max_delay_ms); } }; } @@ -1150,12 +1163,17 @@ impl FramedNodeClient { async fn reconnect_internal( config: &NodeClientConfig, - maybe_max_attempts_override: Option<&MaxAttempts>, ) -> Result, AnyhowError> { let disconnected_start = Instant::now(); inc_disconnect(); error!("node connection closed, will attempt to reconnect"); - let stream = Self::connect_with_retries(config, maybe_max_attempts_override).await?; + let stream = Self::connect_with_retries( + &config.ip_address, + config.port, + &config.exponential_backoff, + config.max_message_size_bytes, + ) + .await?; info!("connection with the node has been re-established"); observe_reconnect_time(disconnected_start.elapsed()); Ok(stream) @@ -1164,25 +1182,13 @@ impl FramedNodeClient { async fn reconnect( config: &NodeClientConfig, ) -> Result, AnyhowError> { - Self::reconnect_internal(config, None).await - } - - async fn reconnect_without_retries( - config: &NodeClientConfig, - ) -> Result, AnyhowError> { - Self::reconnect_internal(config, Some(&MaxAttempts::Finite(1))).await + Self::reconnect_internal(config).await } } #[async_trait] impl NodeClient for FramedNodeClient { async fn send_request(&self, req: BinaryRequest) -> Result { - let _permit = self - .request_limit - .acquire() - .await - .map_err(|err| Error::RequestFailed(err.to_string()))?; - // TODO: Use queue instead of individual timeouts. Currently it is possible to go pass the // semaphore and the immediately wait for the client to become available. let mut client = match tokio::time::timeout( @@ -1192,7 +1198,10 @@ impl NodeClient for FramedNodeClient { .await { Ok(client) => client, - Err(err) => return Err(Error::RequestFailed(err.to_string())), + Err(_) => { + register_timeout("acquiring_client"); + return Err(Error::ConnectionLost); + } }; let result = self.send_request_internal(&req, &mut client).await; @@ -1204,12 +1213,24 @@ impl NodeClient for FramedNodeClient { ); // attempt to reconnect once in case the node was restarted and connection broke client.close().await.ok(); - match Self::reconnect_without_retries(&self.config).await { - Ok(new_client) => { + let ip_address = &self.config.ip_address; + + match tokio::time::timeout( + Duration::from_secs(self.config.client_access_timeout_secs), + Self::connect_with_retries( + ip_address, + self.config.port, + &self.config.exponential_backoff, + self.config.max_message_size_bytes, + ), + ) + .await + { + Ok(Ok(new_client)) => { *client = new_client; return self.send_request_internal(&req, &mut client).await; } - Err(err) => { + Ok(Err(err)) => { warn!( %err, addr = %self.config.ip_address, @@ -1218,7 +1239,16 @@ impl NodeClient for FramedNodeClient { // schedule standard reconnect process with multiple retries // and return a response self.reconnect.notify_one(); - return Err(Error::RequestFailed("disconnected".to_owned())); + return Err(Error::ConnectionLost); + } + Err(_) => { + warn!( + %err, + addr = %self.config.ip_address, + "failed to reestablish connection in timely fashion" + ); + register_timeout("reacquiring_connection"); + return Err(Error::ConnectionLost); } } } @@ -1585,10 +1615,7 @@ mod tests { let err = query_global_state_for_string_value(&mut rng, &c) .await .unwrap_err(); - assert!(matches!( - err, - Error::RequestFailed(e) if e == "disconnected" - )); + assert!(matches!(err, Error::ConnectionLost)); // restart node let mock_server_handle = start_mock_binary_port_responding_with_stored_value( diff --git a/sidecar/src/config.rs b/sidecar/src/config.rs index 4af83a9f..fe958de7 100644 --- a/sidecar/src/config.rs +++ b/sidecar/src/config.rs @@ -3,7 +3,7 @@ use casper_event_sidecar::{ AdminApiServerConfig, DatabaseConfigError, RestApiServerConfig, SseEventServerConfig, StorageConfig, StorageConfigSerdeTarget, }; -use casper_rpc_sidecar::{FieldParseError, RpcServerConfig, RpcServerConfigTarget}; +use casper_rpc_sidecar::{FieldParseError, RpcServerConfig}; use serde::Deserialize; use thiserror::Error; @@ -16,7 +16,7 @@ pub struct SidecarConfigTarget { rest_api_server: Option, admin_api_server: Option, sse_server: Option, - rpc_server: Option, + rpc_server: Option, } #[derive(Clone, Debug, Deserialize, PartialEq, Eq)] @@ -114,15 +114,12 @@ impl TryFrom for SidecarConfig { .map(|target| target.try_into().map(Some)) .unwrap_or(Ok(None)); let storage_config = storage_config_res?; - let rpc_server_config_res: Option> = - value.rpc_server.map(|target| target.try_into()); - let rpc_server_config = invert(rpc_server_config_res)?; Ok(SidecarConfig { max_thread_count: value.max_thread_count, max_blocking_thread_count: value.max_blocking_thread_count, network_name: value.network_name, sse_server: sse_server_config, - rpc_server: rpc_server_config, + rpc_server: value.rpc_server, storage: storage_config, rest_api_server: value.rest_api_server, admin_api_server: value.admin_api_server, @@ -130,10 +127,6 @@ impl TryFrom for SidecarConfig { } } -fn invert(x: Option>) -> Result, E> { - x.map_or(Ok(None), |v| v.map(Some)) -} - #[derive(Error, Debug)] pub enum ConfigReadError { #[error("failed to read sidecar configuration. Underlying reason: {}", .error)] diff --git a/sidecar/src/run.rs b/sidecar/src/run.rs index c44ac177..609a6241 100644 --- a/sidecar/src/run.rs +++ b/sidecar/src/run.rs @@ -2,10 +2,15 @@ use crate::component::*; use crate::config::SidecarConfig; use anyhow::{anyhow, Context, Error}; use casper_event_sidecar::LazyDatabaseWrapper; -use std::process::ExitCode; -use tokio::signal::unix::{signal, SignalKind}; +use std::{process::ExitCode, time::Duration}; +use tokio::{ + signal::unix::{signal, SignalKind}, + time::timeout, +}; use tracing::{error, info}; +const MAX_COMPONENT_STARTUP_TIMEOUT_SECS: u64 = 120; + pub async fn run(config: SidecarConfig) -> Result { let maybe_database = config .storage @@ -50,8 +55,24 @@ async fn do_run( components: Vec>, ) -> Result { let mut component_futures = Vec::new(); + let max_startup_duration = Duration::from_secs(MAX_COMPONENT_STARTUP_TIMEOUT_SECS); for component in components.iter() { - let maybe_future = component.prepare_component_task(&config).await?; + let component_name = component.name(); + let component_startup_res = timeout( + max_startup_duration, + component.prepare_component_task(&config), + ) + .await; + if component_startup_res.is_err() { + return Err(ComponentError::Initialization { + component_name: component_name.clone(), + internal_error: anyhow!( + "Failed to start component {component_name} in {MAX_COMPONENT_STARTUP_TIMEOUT_SECS} [s]" + ), + }); + } + + let maybe_future = component_startup_res.unwrap()?; if let Some(future) = maybe_future { component_futures.push(future); } From 28e022b6e61f4a38ef8909af844902251a7e6769 Mon Sep 17 00:00:00 2001 From: Jakub Zajkowski Date: Wed, 29 Jan 2025 09:54:12 +0100 Subject: [PATCH 2/4] Aligning message_timeout_secs --- resources/example_configs/EXAMPLE_NCTL_CONFIG.toml | 2 +- resources/example_configs/EXAMPLE_NCTL_POSTGRES_CONFIG.toml | 2 +- resources/example_configs/EXAMPLE_NODE_CONFIG.toml | 2 +- resources/example_configs/default_debian_config.toml | 2 +- resources/example_configs/default_rpc_only_config.toml | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/resources/example_configs/EXAMPLE_NCTL_CONFIG.toml b/resources/example_configs/EXAMPLE_NCTL_CONFIG.toml index 3d6e10f2..5797319c 100644 --- a/resources/example_configs/EXAMPLE_NCTL_CONFIG.toml +++ b/resources/example_configs/EXAMPLE_NCTL_CONFIG.toml @@ -18,7 +18,7 @@ cors_origin = "" ip_address = "0.0.0.0" port = 28102 max_message_size_bytes = 4194304 -message_timeout_secs = 30 +message_timeout_secs = 10 client_access_timeout_secs = 10 keepalive_timeout_ms = 10_000 diff --git a/resources/example_configs/EXAMPLE_NCTL_POSTGRES_CONFIG.toml b/resources/example_configs/EXAMPLE_NCTL_POSTGRES_CONFIG.toml index 3d89e8c3..2c55c06c 100644 --- a/resources/example_configs/EXAMPLE_NCTL_POSTGRES_CONFIG.toml +++ b/resources/example_configs/EXAMPLE_NCTL_POSTGRES_CONFIG.toml @@ -18,7 +18,7 @@ cors_origin = "" ip_address = "0.0.0.0" port = 28102 max_message_size_bytes = 4194304 -message_timeout_secs = 30 +message_timeout_secs = 10 client_access_timeout_secs = 10 keepalive_timeout_ms = 10_000 diff --git a/resources/example_configs/EXAMPLE_NODE_CONFIG.toml b/resources/example_configs/EXAMPLE_NODE_CONFIG.toml index b54d440f..a1801798 100644 --- a/resources/example_configs/EXAMPLE_NODE_CONFIG.toml +++ b/resources/example_configs/EXAMPLE_NODE_CONFIG.toml @@ -18,7 +18,7 @@ cors_origin = "" ip_address = "3.20.57.210" port = 7777 max_message_size_bytes = 4194304 -message_timeout_secs = 60 +message_timeout_secs = 10 client_access_timeout_secs = 10 keepalive_timeout_ms = 10_000 diff --git a/resources/example_configs/default_debian_config.toml b/resources/example_configs/default_debian_config.toml index 4e436a93..5270aab4 100644 --- a/resources/example_configs/default_debian_config.toml +++ b/resources/example_configs/default_debian_config.toml @@ -72,7 +72,7 @@ port = 7779 # Maximum size of a message in bytes. max_message_size_bytes = 4_194_304 # Timeout for a node request in seconds. -message_timeout_secs = 30 +message_timeout_secs = 10 # Timeout specifying how long to wait for binary port client to be available. client_access_timeout_secs = 10 # The amount of time in milliseconds to wait between sending keepalive requests. diff --git a/resources/example_configs/default_rpc_only_config.toml b/resources/example_configs/default_rpc_only_config.toml index 5b6ce3ed..f33add55 100644 --- a/resources/example_configs/default_rpc_only_config.toml +++ b/resources/example_configs/default_rpc_only_config.toml @@ -76,7 +76,7 @@ request_limit = 3 # Number of node requests that can be buffered. request_buffer_size = 16 # Timeout for a node request in seconds. -message_timeout_secs = 30 +message_timeout_secs = 10 # Timeout specifying how long to wait for binary port client to be available. client_access_timeout_secs = 10 # The amount of time in milliseconds to wait between sending keepalive requests. From abe3a1ac3d8f0f03d7ae7538f74531d588fc44ae Mon Sep 17 00:00:00 2001 From: Jakub Zajkowski Date: Wed, 29 Jan 2025 14:17:46 +0100 Subject: [PATCH 3/4] Making keepalive loop use the standard mechnism of sending messages to gain retries and id-checks --- rpc_sidecar/src/lib.rs | 2 +- rpc_sidecar/src/node_client.rs | 61 ++++++++++------------------------ 2 files changed, 18 insertions(+), 45 deletions(-) diff --git a/rpc_sidecar/src/lib.rs b/rpc_sidecar/src/lib.rs index acd5d983..b238a103 100644 --- a/rpc_sidecar/src/lib.rs +++ b/rpc_sidecar/src/lib.rs @@ -40,7 +40,7 @@ pub async fn build_rpc_server<'a>( ) -> MaybeRpcServerReturn<'a> { let (node_client, reconnect_loop, keepalive_loop) = FramedNodeClient::new(config.node_client.clone(), maybe_network_name).await?; - let node_client: Arc = Arc::new(node_client); + let node_client: Arc = node_client; let mut futures = Vec::new(); let main_server_config = config.main_server; if main_server_config.enable_server { diff --git a/rpc_sidecar/src/node_client.rs b/rpc_sidecar/src/node_client.rs index fda90839..68e7a95c 100644 --- a/rpc_sidecar/src/node_client.rs +++ b/rpc_sidecar/src/node_client.rs @@ -701,8 +701,8 @@ pub enum Error { TooManyMismatchedResponses { max: u8 }, #[error("failed to deserialize the original request provided with the response: {0}")] OriginalRequestDeserialization(String), - #[error("failed to deserialize the envelope of a response: {0}")] - EnvelopeDeserialization(String), + #[error("failed to deserialize the envelope of a response")] + EnvelopeDeserialization, #[error("failed to deserialize a response: {0}")] Deserialization(String), #[error("failed to serialize a request: {0}")] @@ -939,7 +939,7 @@ impl FramedNodeClient { maybe_network_name: Option, ) -> Result< ( - Self, + Arc, impl Future>, impl Future>, ), @@ -964,21 +964,15 @@ impl FramedNodeClient { Arc::clone(&shutdown), Arc::clone(&reconnect), ); - - let current_request_id = Arc::new(AtomicU16::new(INITIAL_REQUEST_ID)); - let keepalive_loop = Self::keepalive_loop( - config.clone(), - Arc::clone(&stream), - Arc::clone(¤t_request_id), - ); - - let node_client = Self { + let keepalive_timeout = Duration::from_millis(config.keepalive_timeout_ms); + let node_client = Arc::new(Self { client: Arc::clone(&stream), reconnect, shutdown, config, current_request_id: AtomicU16::new(INITIAL_REQUEST_ID).into(), - }; + }); + let keepalive_loop = Self::keepalive_loop(node_client.clone(), keepalive_timeout); // validate network name if let Some(network_name) = maybe_network_name { @@ -1019,39 +1013,12 @@ impl FramedNodeClient { } async fn keepalive_loop( - config: NodeClientConfig, - client: Arc>>, - current_request_id: Arc, + client: Arc, + keepalive_timeout: Duration, ) -> Result<(), AnyhowError> { - let keepalive_timeout = Duration::from_millis(config.keepalive_timeout_ms); - loop { tokio::time::sleep(keepalive_timeout).await; - - let mut client = client.write().await; - - let next_id = current_request_id.fetch_add(1, Ordering::Relaxed); - let payload = BinaryMessage::new( - encode_request(&BinaryRequest::KeepAliveRequest, next_id) - .expect("should always serialize a request"), - ); - - if tokio::time::timeout( - Duration::from_secs(config.message_timeout_secs), - client.send(payload), - ) - .await - .is_err() - { - continue; - } - - tokio::time::timeout( - Duration::from_secs(config.message_timeout_secs), - client.next(), - ) - .await - .ok(); + client.send_request(BinaryRequest::KeepAliveRequest).await?; } } @@ -1090,7 +1057,13 @@ impl FramedNodeClient { .map_err(|err| Error::RequestFailed(err.to_string()))? .payload(), ) - .map_err(|err| Error::EnvelopeDeserialization(err.to_string()))?; + .map_err(|err| { + error!( + "Error when deserializing binary port envelope: {}", + err.to_string() + ); + Error::EnvelopeDeserialization + })?; match validate_response(resp, request_id, &self.shutdown) { Ok(response) => return Ok(response), Err(err) if matches!(err, Error::RequestResponseIdMismatch { expected, got } if expected > got) => From 9f7de56b821c316119b3569e849e4f28799f5efe Mon Sep 17 00:00:00 2001 From: Jakub Zajkowski Date: Wed, 29 Jan 2025 14:24:10 +0100 Subject: [PATCH 4/4] Applying CR suggestions --- metrics/src/rpc.rs | 2 +- rpc_sidecar/src/config.rs | 6 +++--- rpc_sidecar/src/lib.rs | 3 +-- rpc_sidecar/src/node_client.rs | 9 ++++----- sidecar/src/run.rs | 2 +- 5 files changed, 10 insertions(+), 12 deletions(-) diff --git a/metrics/src/rpc.rs b/metrics/src/rpc.rs index 9c32cd58..df3690b0 100644 --- a/metrics/src/rpc.rs +++ b/metrics/src/rpc.rs @@ -74,7 +74,7 @@ static RECONNECT_TIMES_MS: Lazy = Lazy::new(|| { static MISMATCHED_IDS: Lazy = Lazy::new(|| { let counter = IntGauge::new( "rpc_server_mismatched_ids", - "Number of mismathced id events observed in responses from binary port", + "Number of mismatched ID events observed in responses from binary port", ) .expect("rpc_server_mismatched_ids metric can't be created"); REGISTRY diff --git a/rpc_sidecar/src/config.rs b/rpc_sidecar/src/config.rs index 85a77feb..7ac5f730 100644 --- a/rpc_sidecar/src/config.rs +++ b/rpc_sidecar/src/config.rs @@ -96,7 +96,7 @@ const DEFAULT_EXPONENTIAL_BACKOFF_COEFFICIENT: u64 = 2; /// Default keep alive timeout milliseconds. const DEFAULT_KEEPALIVE_TIMEOUT_MS: u64 = 1_000; /// Default max attempts -const DEFAULT_MAX_ATTEMPTS: u32 = 3; +const DEFAULT_EXPONENTIAL_BACKOFF_MAX_ATTEMPTS: u32 = 3; /// Node client configuration. #[derive(Clone, DataSize, Debug, Deserialize, PartialEq, Eq)] @@ -134,7 +134,7 @@ impl NodeClientConfig { initial_delay_ms: DEFAULT_EXPONENTIAL_BACKOFF_BASE_MS, max_delay_ms: DEFAULT_EXPONENTIAL_BACKOFF_MAX_MS, coefficient: DEFAULT_EXPONENTIAL_BACKOFF_COEFFICIENT, - max_attempts: DEFAULT_MAX_ATTEMPTS, + max_attempts: DEFAULT_EXPONENTIAL_BACKOFF_MAX_ATTEMPTS, }, } } @@ -154,7 +154,7 @@ impl NodeClientConfig { initial_delay_ms: DEFAULT_EXPONENTIAL_BACKOFF_BASE_MS, max_delay_ms: DEFAULT_EXPONENTIAL_BACKOFF_MAX_MS, coefficient: DEFAULT_EXPONENTIAL_BACKOFF_COEFFICIENT, - max_attempts: DEFAULT_MAX_ATTEMPTS, + max_attempts: DEFAULT_EXPONENTIAL_BACKOFF_MAX_ATTEMPTS, }, } } diff --git a/rpc_sidecar/src/lib.rs b/rpc_sidecar/src/lib.rs index b238a103..2909f3e4 100644 --- a/rpc_sidecar/src/lib.rs +++ b/rpc_sidecar/src/lib.rs @@ -11,8 +11,7 @@ use anyhow::Error; use casper_binary_port::{BinaryRequest, BinaryRequestHeader}; use casper_types::bytesrepr::ToBytes; use casper_types::{bytesrepr, ProtocolVersion}; -pub use config::{FieldParseError, RpcServerConfig}; -pub use config::{NodeClientConfig, RpcConfig}; +pub use config::{FieldParseError, NodeClientConfig, RpcConfig, RpcServerConfig}; use futures::future::BoxFuture; use futures::FutureExt; pub use http_server::run as run_rpc_server; diff --git a/rpc_sidecar/src/node_client.rs b/rpc_sidecar/src/node_client.rs index 68e7a95c..a8d81e76 100644 --- a/rpc_sidecar/src/node_client.rs +++ b/rpc_sidecar/src/node_client.rs @@ -1034,9 +1034,10 @@ impl FramedNodeClient { client.send(payload), ) .await - .map_err(|_| Error::RequestFailed("timeout".to_owned()))? - { + .map_err(|_| { register_timeout("sending_payload"); + Error::RequestFailed("timeout".to_owned()) + })? { return Err(Error::RequestFailed(err.to_string())); }; @@ -1162,8 +1163,6 @@ impl FramedNodeClient { #[async_trait] impl NodeClient for FramedNodeClient { async fn send_request(&self, req: BinaryRequest) -> Result { - // TODO: Use queue instead of individual timeouts. Currently it is possible to go pass the - // semaphore and the immediately wait for the client to become available. let mut client = match tokio::time::timeout( Duration::from_secs(self.config.client_access_timeout_secs), self.client.write(), @@ -1184,7 +1183,7 @@ impl NodeClient for FramedNodeClient { err = display_error(&err), "binary port client handler error" ); - // attempt to reconnect once in case the node was restarted and connection broke + // attempt to reconnect in case the node was restarted and connection broke client.close().await.ok(); let ip_address = &self.config.ip_address; diff --git a/sidecar/src/run.rs b/sidecar/src/run.rs index 609a6241..46063edd 100644 --- a/sidecar/src/run.rs +++ b/sidecar/src/run.rs @@ -9,7 +9,7 @@ use tokio::{ }; use tracing::{error, info}; -const MAX_COMPONENT_STARTUP_TIMEOUT_SECS: u64 = 120; +const MAX_COMPONENT_STARTUP_TIMEOUT_SECS: u64 = 30; pub async fn run(config: SidecarConfig) -> Result { let maybe_database = config