Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement keepalive checks #368

Merged
merged 6 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions resources/example_configs/EXAMPLE_NCTL_CONFIG.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ request_limit = 3
request_buffer_size = 16
message_timeout_secs = 30
client_access_timeout_secs = 2
keepalive_timeout_ms = 10_000

[rpc_server.node_client.exponential_backoff]
initial_delay_ms = 1000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ request_limit = 3
request_buffer_size = 16
message_timeout_secs = 30
client_access_timeout_secs = 2
keepalive_timeout_ms = 10_000

[rpc_server.node_client.exponential_backoff]
initial_delay_ms = 1000
Expand Down
1 change: 1 addition & 0 deletions resources/example_configs/EXAMPLE_NODE_CONFIG.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ request_limit = 10
request_buffer_size = 50
message_timeout_secs = 60
client_access_timeout_secs = 60
keepalive_timeout_ms = 10_000

[rpc_server.node_client.exponential_backoff]
initial_delay_ms = 1000
Expand Down
2 changes: 2 additions & 0 deletions resources/example_configs/default_debian_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ request_buffer_size = 16
message_timeout_secs = 30
# Timeout specifying how long to wait for binary port client to be available.
client_access_timeout_secs = 2
# The amount of time in milliseconds to wait between sending keepalive requests.
keepalive_timeout_ms = 10_000

[rpc_server.node_client.exponential_backoff]
# The initial delay in milliseconds before the first retry.
Expand Down
2 changes: 2 additions & 0 deletions resources/example_configs/default_rpc_only_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ request_buffer_size = 16
message_timeout_secs = 30
# Timeout specifying how long to wait for binary port client to be available.
client_access_timeout_secs = 2
# The amount of time in milliseconds to wait between sending keepalive requests.
keepalive_timeout_ms = 10_000

[rpc_server.node_client.exponential_backoff]
# The initial delay in milliseconds before the first retry.
Expand Down
10 changes: 10 additions & 0 deletions rpc_sidecar/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ const DEFAULT_EXPONENTIAL_BACKOFF_BASE_MS: u64 = 1000;
const DEFAULT_EXPONENTIAL_BACKOFF_MAX_MS: u64 = 64_000;
/// Default exponential backoff coefficient.
const DEFAULT_EXPONENTIAL_BACKOFF_COEFFICIENT: u64 = 2;
/// Default keep alive timeout milliseconds.
const DEFAULT_KEEPALIVE_TIMEOUT_MS: u64 = 1_000;

/// Node client configuration.
#[derive(Clone, DataSize, Debug, Deserialize, PartialEq, Eq)]
Expand All @@ -147,6 +149,8 @@ pub struct NodeClientConfig {
pub request_limit: u16,
/// Number of node requests that can be buffered.
pub request_buffer_size: usize,
/// The amount of ms to wait between sending keepalive requests.
pub keepalive_timeout_ms: u64,
/// Configuration for exponential backoff to be used for re-connects.
pub exponential_backoff: ExponentialBackoffConfig,
}
Expand All @@ -162,6 +166,7 @@ impl NodeClientConfig {
request_buffer_size: DEFAULT_REQUEST_BUFFER_SIZE,
message_timeout_secs: DEFAULT_MESSAGE_TIMEOUT_SECS,
client_access_timeout_secs: DEFAULT_CLIENT_ACCESS_TIMEOUT_SECS,
keepalive_timeout_ms: DEFAULT_KEEPALIVE_TIMEOUT_MS,
exponential_backoff: ExponentialBackoffConfig {
initial_delay_ms: DEFAULT_EXPONENTIAL_BACKOFF_BASE_MS,
max_delay_ms: DEFAULT_EXPONENTIAL_BACKOFF_MAX_MS,
Expand All @@ -183,6 +188,7 @@ impl NodeClientConfig {
request_buffer_size: DEFAULT_REQUEST_BUFFER_SIZE,
message_timeout_secs: DEFAULT_MESSAGE_TIMEOUT_SECS,
client_access_timeout_secs: DEFAULT_CLIENT_ACCESS_TIMEOUT_SECS,
keepalive_timeout_ms: DEFAULT_KEEPALIVE_TIMEOUT_MS,
exponential_backoff: ExponentialBackoffConfig {
initial_delay_ms: DEFAULT_EXPONENTIAL_BACKOFF_BASE_MS,
max_delay_ms: DEFAULT_EXPONENTIAL_BACKOFF_MAX_MS,
Expand All @@ -205,6 +211,7 @@ impl NodeClientConfig {
request_buffer_size: DEFAULT_REQUEST_BUFFER_SIZE,
message_timeout_secs: DEFAULT_MESSAGE_TIMEOUT_SECS,
client_access_timeout_secs: DEFAULT_CLIENT_ACCESS_TIMEOUT_SECS,
keepalive_timeout_ms: DEFAULT_KEEPALIVE_TIMEOUT_MS,
exponential_backoff: ExponentialBackoffConfig {
initial_delay_ms: 500,
max_delay_ms: 3000,
Expand Down Expand Up @@ -241,6 +248,8 @@ pub struct NodeClientConfigTarget {
pub request_limit: u16,
/// Number of node requests that can be buffered.
pub request_buffer_size: usize,
/// The amount of ms to wait between sending keepalive requests.
pub keepalive_timeout_ms: u64,
/// Configuration for exponential backoff to be used for re-connects.
pub exponential_backoff: ExponentialBackoffConfigTarget,
}
Expand All @@ -264,6 +273,7 @@ impl TryFrom<NodeClientConfigTarget> for NodeClientConfig {
request_buffer_size: value.request_buffer_size,
client_access_timeout_secs: value.client_access_timeout_secs,
message_timeout_secs: value.message_timeout_secs,
keepalive_timeout_ms: value.keepalive_timeout_ms,
exponential_backoff,
})
}
Expand Down
7 changes: 6 additions & 1 deletion rpc_sidecar/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ pub const CLIENT_SHUTDOWN_EXIT_CODE: u8 = 0x3;

pub type MaybeRpcServerReturn<'a> = Result<Option<BoxFuture<'a, Result<ExitCode, Error>>>, Error>;
pub async fn build_rpc_server<'a>(config: RpcServerConfig) -> MaybeRpcServerReturn<'a> {
let (node_client, reconnect_loop) = FramedNodeClient::new(config.node_client.clone()).await?;
let (node_client, reconnect_loop, keepalive_loop) =
FramedNodeClient::new(config.node_client.clone()).await?;
let node_client: Arc<dyn NodeClient> = Arc::new(node_client);
let mut futures = Vec::new();
let main_server_config = config.main_server;
Expand All @@ -58,6 +59,10 @@ pub async fn build_rpc_server<'a>(config: RpcServerConfig) -> MaybeRpcServerRetu
.map(|_| Ok(ExitCode::from(CLIENT_SHUTDOWN_EXIT_CODE)))
.boxed();
futures.push(reconnect_loop);
let keepalive_loop = keepalive_loop
.map(|_| Ok(ExitCode::from(CLIENT_SHUTDOWN_EXIT_CODE)))
.boxed();
futures.push(keepalive_loop);
Ok(Some(retype_future_vec(futures).boxed()))
}

Expand Down
71 changes: 62 additions & 9 deletions rpc_sidecar/src/node_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -817,16 +817,24 @@ pub struct FramedNodeClient {
shutdown: Arc<Notify<Shutdown>>,
config: NodeClientConfig,
request_limit: Semaphore,
current_request_id: AtomicU16,
current_request_id: Arc<AtomicU16>,
}

impl FramedNodeClient {
pub async fn new(
config: NodeClientConfig,
) -> Result<(Self, impl Future<Output = Result<(), AnyhowError>>), AnyhowError> {
) -> Result<
(
Self,
impl Future<Output = Result<(), AnyhowError>>,
impl Future<Output = Result<(), AnyhowError>>,
),
AnyhowError,
> {
let stream = Arc::new(RwLock::new(
Self::connect_with_retries(&config, None).await?,
));

let shutdown = Notify::<Shutdown>::new();
let reconnect = Notify::<Reconnect>::new();

Expand All @@ -837,16 +845,24 @@ impl FramedNodeClient {
Arc::clone(&reconnect),
);

let current_request_id = Arc::new(AtomicU16::new(INITIAL_REQUEST_ID));
let keepalive_loop = Self::keepalive_loop(
config.clone(),
Arc::clone(&stream),
Arc::clone(&current_request_id),
);

Ok((
Self {
client: Arc::clone(&stream),
request_limit: Semaphore::new(config.request_limit as usize),
reconnect,
shutdown,
config,
current_request_id: AtomicU16::new(INITIAL_REQUEST_ID),
current_request_id,
},
reconnect_loop,
keepalive_loop,
))
}

Expand Down Expand Up @@ -875,6 +891,43 @@ impl FramedNodeClient {
}
}

async fn keepalive_loop(
config: NodeClientConfig,
client: Arc<RwLock<Framed<TcpStream, BinaryMessageCodec>>>,
current_request_id: Arc<AtomicU16>,
) -> Result<(), AnyhowError> {
let keepalive_timeout = Duration::from_millis(config.keepalive_timeout_ms);

loop {
tokio::time::sleep(keepalive_timeout).await;

let mut client = client.write().await;

let next_id = current_request_id.fetch_add(1, Ordering::Relaxed);
let payload = BinaryMessage::new(
encode_request(&BinaryRequest::KeepAliveRequest, next_id)
.expect("should always serialize a request"),
);

if tokio::time::timeout(
Duration::from_secs(config.message_timeout_secs),
client.send(payload),
)
.await
.is_err()
{
continue;
}

tokio::time::timeout(
Duration::from_secs(config.message_timeout_secs),
client.next(),
)
.await
.ok();
}
}

async fn send_request_internal(
&self,
req: &BinaryRequest,
Expand Down Expand Up @@ -1286,7 +1339,7 @@ mod tests {
)
.await;
let config = NodeClientConfig::new_with_port_and_retries(port, 2);
let (c, _) = FramedNodeClient::new(config).await.unwrap();
let (c, _, _) = FramedNodeClient::new(config).await.unwrap();

let res = query_global_state_for_string_value(&mut rng, &c)
.await
Expand All @@ -1311,7 +1364,7 @@ mod tests {
.await;
});
let config = NodeClientConfig::new_with_port_and_retries(port, 5);
let (client, _) = FramedNodeClient::new(config).await.unwrap();
let (client, _, _) = FramedNodeClient::new(config).await.unwrap();

let res = query_global_state_for_string_value(&mut rng, &client)
.await
Expand Down Expand Up @@ -1351,7 +1404,7 @@ mod tests {
.await;

let config = NodeClientConfig::new_with_port(port);
let (c, reconnect_loop) = FramedNodeClient::new(config).await.unwrap();
let (c, reconnect_loop, _) = FramedNodeClient::new(config).await.unwrap();

let scenario = async {
// Request id = 1
Expand Down Expand Up @@ -1419,7 +1472,7 @@ mod tests {
let shutdown = Arc::new(tokio::sync::Notify::new());
let _mock_server_handle =
start_mock_binary_port(port, vec![], 1, Arc::clone(&shutdown)).await;
let (c, _) = FramedNodeClient::new(config).await.unwrap();
let (c, _, _) = FramedNodeClient::new(config).await.unwrap();

let generated_ids: Vec<_> = (INITIAL_REQUEST_ID..INITIAL_REQUEST_ID + 10)
.map(|_| {
Expand Down Expand Up @@ -1499,7 +1552,7 @@ mod tests {
)
.await;
let config = NodeClientConfig::new_with_port_and_retries(port, 2);
let (c, _) = FramedNodeClient::new(config).await.unwrap();
let (c, _, _) = FramedNodeClient::new(config).await.unwrap();

let res = query_global_state_for_string_value(&mut rng, &c)
.await
Expand All @@ -1523,7 +1576,7 @@ mod tests {
)
.await;
let config = NodeClientConfig::new_with_port_and_retries(port, 2);
let (c, _) = FramedNodeClient::new(config).await.unwrap();
let (c, _, _) = FramedNodeClient::new(config).await.unwrap();

let res = query_global_state_for_string_value(&mut rng, &c)
.await
Expand Down
Loading