Skip to content

Commit

Permalink
Making keepalive loop use the standard mechnism of sending messages t…
Browse files Browse the repository at this point in the history
…o gain retries and id-checks
  • Loading branch information
zajko committed Jan 29, 2025
1 parent 28e022b commit abe3a1a
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 45 deletions.
2 changes: 1 addition & 1 deletion rpc_sidecar/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub async fn build_rpc_server<'a>(
) -> MaybeRpcServerReturn<'a> {
let (node_client, reconnect_loop, keepalive_loop) =
FramedNodeClient::new(config.node_client.clone(), maybe_network_name).await?;
let node_client: Arc<dyn NodeClient> = Arc::new(node_client);
let node_client: Arc<dyn NodeClient> = node_client;
let mut futures = Vec::new();
let main_server_config = config.main_server;
if main_server_config.enable_server {
Expand Down
61 changes: 17 additions & 44 deletions rpc_sidecar/src/node_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -701,8 +701,8 @@ pub enum Error {
TooManyMismatchedResponses { max: u8 },
#[error("failed to deserialize the original request provided with the response: {0}")]
OriginalRequestDeserialization(String),
#[error("failed to deserialize the envelope of a response: {0}")]
EnvelopeDeserialization(String),
#[error("failed to deserialize the envelope of a response")]
EnvelopeDeserialization,
#[error("failed to deserialize a response: {0}")]
Deserialization(String),
#[error("failed to serialize a request: {0}")]
Expand Down Expand Up @@ -939,7 +939,7 @@ impl FramedNodeClient {
maybe_network_name: Option<String>,
) -> Result<
(
Self,
Arc<Self>,
impl Future<Output = Result<(), AnyhowError>>,
impl Future<Output = Result<(), AnyhowError>>,
),
Expand All @@ -964,21 +964,15 @@ impl FramedNodeClient {
Arc::clone(&shutdown),
Arc::clone(&reconnect),
);

let current_request_id = Arc::new(AtomicU16::new(INITIAL_REQUEST_ID));
let keepalive_loop = Self::keepalive_loop(
config.clone(),
Arc::clone(&stream),
Arc::clone(&current_request_id),
);

let node_client = Self {
let keepalive_timeout = Duration::from_millis(config.keepalive_timeout_ms);
let node_client = Arc::new(Self {
client: Arc::clone(&stream),
reconnect,
shutdown,
config,
current_request_id: AtomicU16::new(INITIAL_REQUEST_ID).into(),
};
});
let keepalive_loop = Self::keepalive_loop(node_client.clone(), keepalive_timeout);

// validate network name
if let Some(network_name) = maybe_network_name {
Expand Down Expand Up @@ -1019,39 +1013,12 @@ impl FramedNodeClient {
}

async fn keepalive_loop(
config: NodeClientConfig,
client: Arc<RwLock<Framed<TcpStream, BinaryMessageCodec>>>,
current_request_id: Arc<AtomicU16>,
client: Arc<dyn NodeClient>,
keepalive_timeout: Duration,
) -> Result<(), AnyhowError> {
let keepalive_timeout = Duration::from_millis(config.keepalive_timeout_ms);

loop {
tokio::time::sleep(keepalive_timeout).await;

let mut client = client.write().await;

let next_id = current_request_id.fetch_add(1, Ordering::Relaxed);
let payload = BinaryMessage::new(
encode_request(&BinaryRequest::KeepAliveRequest, next_id)
.expect("should always serialize a request"),
);

if tokio::time::timeout(
Duration::from_secs(config.message_timeout_secs),
client.send(payload),
)
.await
.is_err()
{
continue;
}

tokio::time::timeout(
Duration::from_secs(config.message_timeout_secs),
client.next(),
)
.await
.ok();
client.send_request(BinaryRequest::KeepAliveRequest).await?;
}
}

Expand Down Expand Up @@ -1090,7 +1057,13 @@ impl FramedNodeClient {
.map_err(|err| Error::RequestFailed(err.to_string()))?
.payload(),
)
.map_err(|err| Error::EnvelopeDeserialization(err.to_string()))?;
.map_err(|err| {
error!(
"Error when deserializing binary port envelope: {}",
err.to_string()
);
Error::EnvelopeDeserialization
})?;
match validate_response(resp, request_id, &self.shutdown) {
Ok(response) => return Ok(response),
Err(err) if matches!(err, Error::RequestResponseIdMismatch { expected, got } if expected > got) =>
Expand Down

0 comments on commit abe3a1a

Please sign in to comment.