Skip to content

Commit

Permalink
Merge pull request #349 from teonite/rpc_graceful_reconnect
Browse files Browse the repository at this point in the history
RPC client graceful reconnect
  • Loading branch information
wojcik91 authored Nov 12, 2024
2 parents fca0147 + 7ebd64f commit a630bc4
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 25 deletions.
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,6 @@
/target
/test_output
/test_output
# direnv-related files
.direnv/
.envrc

102 changes: 78 additions & 24 deletions rpc_sidecar/src/node_client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{encode_request, NodeClientConfig, SUPPORTED_PROTOCOL_VERSION};
use crate::{config::MaxAttempts, encode_request, NodeClientConfig, SUPPORTED_PROTOCOL_VERSION};
use anyhow::Error as AnyhowError;
use async_trait::async_trait;
use futures::{Future, SinkExt, StreamExt};
Expand Down Expand Up @@ -822,7 +822,9 @@ impl FramedNodeClient {
pub async fn new(
config: NodeClientConfig,
) -> Result<(Self, impl Future<Output = Result<(), AnyhowError>>), AnyhowError> {
let stream = Arc::new(RwLock::new(Self::connect_with_retries(&config).await?));
let stream = Arc::new(RwLock::new(
Self::connect_with_retries(&config, None).await?,
));
let shutdown = Notify::<Shutdown>::new();
let reconnect = Notify::<Reconnect>::new();

Expand Down Expand Up @@ -873,7 +875,7 @@ impl FramedNodeClient {

async fn send_request_internal(
&self,
req: BinaryRequest,
req: &BinaryRequest,
client: &mut RwLockWriteGuard<'_, Framed<TcpStream, BinaryMessageCodec>>,
) -> Result<BinaryResponseAndRequest, Error> {
let (request_id, payload) = self.generate_payload(req);
Expand Down Expand Up @@ -926,20 +928,26 @@ impl FramedNodeClient {
})
}

fn generate_payload(&self, req: BinaryRequest) -> (u16, BinaryMessage) {
fn generate_payload(&self, req: &BinaryRequest) -> (u16, BinaryMessage) {
let next_id = self.next_id();
(
next_id,
BinaryMessage::new(
encode_request(&req, next_id).expect("should always serialize a request"),
encode_request(req, next_id).expect("should always serialize a request"),
),
)
}

async fn connect_with_retries(
config: &NodeClientConfig,
maybe_max_attempts_override: Option<&MaxAttempts>,
) -> Result<Framed<TcpStream, BinaryMessageCodec>, AnyhowError> {
let mut wait = config.exponential_backoff.initial_delay_ms;
let max_attempts = if let Some(attempts) = maybe_max_attempts_override {
attempts
} else {
&config.exponential_backoff.max_attempts
};
let mut current_attempt = 1;
loop {
match TcpStream::connect(config.address).await {
Expand All @@ -950,19 +958,15 @@ impl FramedNodeClient {
))
}
Err(err) => {
warn!(%err, "failed to connect to the node, waiting {wait}ms before retrying");
current_attempt += 1;
if !config
.exponential_backoff
.max_attempts
.can_attempt(current_attempt)
{
if !max_attempts.can_attempt(current_attempt) {
anyhow::bail!(
"Couldn't connect to node {} after {} attempts",
config.address,
current_attempt - 1
);
}
warn!(%err, "failed to connect to the node, waiting {wait}ms before retrying");
tokio::time::sleep(Duration::from_millis(wait)).await;
wait = (wait * config.exponential_backoff.coefficient)
.min(config.exponential_backoff.max_delay_ms);
Expand All @@ -971,17 +975,30 @@ impl FramedNodeClient {
}
}

async fn reconnect(
async fn reconnect_internal(
config: &NodeClientConfig,
maybe_max_attempts_override: Option<&MaxAttempts>,
) -> Result<Framed<TcpStream, BinaryMessageCodec>, AnyhowError> {
let disconnected_start = Instant::now();
inc_disconnect();
error!("node connection closed, will attempt to reconnect");
let stream = Self::connect_with_retries(config).await?;
let stream = Self::connect_with_retries(config, maybe_max_attempts_override).await?;
info!("connection with the node has been re-established");
observe_reconnect_time(disconnected_start.elapsed());
Ok(stream)
}

async fn reconnect(
config: &NodeClientConfig,
) -> Result<Framed<TcpStream, BinaryMessageCodec>, AnyhowError> {
Self::reconnect_internal(config, None).await
}

async fn reconnect_without_retries(
config: &NodeClientConfig,
) -> Result<Framed<TcpStream, BinaryMessageCodec>, AnyhowError> {
Self::reconnect_internal(config, Some(&MaxAttempts::Finite(1))).await
}
}

#[async_trait]
Expand All @@ -1005,15 +1022,32 @@ impl NodeClient for FramedNodeClient {
Err(err) => return Err(Error::RequestFailed(err.to_string())),
};

let result = self.send_request_internal(req, &mut client).await;
let result = self.send_request_internal(&req, &mut client).await;
if let Err(err) = &result {
warn!(
addr = %self.config.address,
err = display_error(&err),
"binary port client handler error"
);
// attempt to reconnect once in case the node was restarted and connection broke
client.close().await.ok();
self.reconnect.notify_one()
match Self::reconnect_without_retries(&self.config).await {
Ok(new_client) => {
*client = new_client;
return self.send_request_internal(&req, &mut client).await;
}
Err(err) => {
warn!(
%err,
addr = %self.config.address,
"binary port client failed to reconnect"
);
// schedule standard reconnect process with multiple retries
// and return a response
self.reconnect.notify_one();
return Err(Error::RequestFailed("disconnected".to_owned()));
}
}
}
result
}
Expand Down Expand Up @@ -1304,27 +1338,29 @@ mod tests {
async fn given_client_should_reconnect_to_restarted_node_and_do_request() {
let port = get_port();
let mut rng = TestRng::new();
let shutdown = Arc::new(tokio::sync::Notify::new());
let shutdown_server = Arc::new(tokio::sync::Notify::new());
let mock_server_handle = start_mock_binary_port_responding_with_stored_value(
port,
Some(INITIAL_REQUEST_ID),
None,
Arc::clone(&shutdown),
Arc::clone(&shutdown_server),
)
.await;

let config = NodeClientConfig::new_with_port(port);
let (c, reconnect_loop) = FramedNodeClient::new(config).await.unwrap();

let scenario = async {
// Request id = 0
// Request id = 1
assert!(query_global_state_for_string_value(&mut rng, &c)
.await
.is_ok());

shutdown.notify_one();
// shutdown node
shutdown_server.notify_one();
let _ = mock_server_handle.await;

// Request id = 1
// Request id = 2
let err = query_global_state_for_string_value(&mut rng, &c)
.await
.unwrap_err();
Expand All @@ -1333,17 +1369,35 @@ mod tests {
Error::RequestFailed(e) if e == "disconnected"
));

let _mock_server_handle = start_mock_binary_port_responding_with_stored_value(
// restart node
let mock_server_handle = start_mock_binary_port_responding_with_stored_value(
port,
Some(INITIAL_REQUEST_ID + 2),
None,
Arc::clone(&shutdown),
Arc::clone(&shutdown_server),
)
.await;

// wait for reconnect loop to do it's business
tokio::time::sleep(Duration::from_secs(2)).await;

// Request id = 2
// Request id = 3
assert!(query_global_state_for_string_value(&mut rng, &c)
.await
.is_ok());

// restart node between requests
shutdown_server.notify_one();
let _ = mock_server_handle.await;
let _mock_server_handle = start_mock_binary_port_responding_with_stored_value(
port,
Some(INITIAL_REQUEST_ID + 4),
None,
Arc::clone(&shutdown_server),
)
.await;

// Request id = 4 & 5 (retry)
assert!(query_global_state_for_string_value(&mut rng, &c)
.await
.is_ok());
Expand All @@ -1366,7 +1420,7 @@ mod tests {

let generated_ids: Vec<_> = (INITIAL_REQUEST_ID..INITIAL_REQUEST_ID + 10)
.map(|_| {
let (_, binary_message) = c.generate_payload(get_dummy_request());
let (_, binary_message) = c.generate_payload(&get_dummy_request());
let header = BinaryRequestHeader::from_bytes(binary_message.payload())
.unwrap()
.0;
Expand Down

0 comments on commit a630bc4

Please sign in to comment.