Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
Jakub Zajkowski committed Dec 13, 2023
1 parent f87bfda commit de01a61
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 16 deletions.
33 changes: 18 additions & 15 deletions listener/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ pub use types::{NodeConnectionInterface, SseEvent};
use url::Url;
use version_fetcher::{for_status_endpoint, BuildVersionFetchError, VersionFetcher};

const MAX_CONNECTION_ATTEMPTS_REACHED: &str = "Max connection attempts reached";

pub struct EventListenerBuilder {
pub node: NodeConnectionInterface,
pub max_connection_attempts: usize,
Expand Down Expand Up @@ -109,43 +111,37 @@ impl EventListener {
log_status_for_event_listener(EventListenerStatus::Connecting, self);
let mut current_attempt = 1;
while current_attempt <= self.max_connection_attempts {
if current_attempt > 1 {
sleep(self.delay_between_attempts).await;
}
match self.get_version(current_attempt).await {
GetVersionResult::Ok(Some(protocol_version)) => {
self.node_build_version = protocol_version;
current_attempt = 1
current_attempt = 1 // Restart counter if the nodes version changed
}
GetVersionResult::Retry => {
sleep(self.delay_between_attempts).await;
current_attempt += 1;
if current_attempt >= self.max_connection_attempts {
log_status_for_event_listener(EventListenerStatus::Defunct, self);
break;
}
continue;
}
GetVersionResult::Error(e) => return Err(e),
_ => {}
}
match self
if let ConnectOutcome::ConnectionLost = self
.do_connect(
last_event_id_for_filter.clone(),
last_seen_event_id_sender.clone(),
)
.await?
{
ConnectOutcome::ConnectionLost => {
current_attempt += 1;
warn!(
"Lost connection to node {}, on attempt {}/{}",
self.node.ip_address, current_attempt, self.max_connection_attempts
);
}
ConnectOutcome::SystemReconnect => {}
};
sleep(Duration::from_secs(1)).await;
current_attempt += 1;
warn_connection_lost(self, current_attempt);
}
}
log_status_for_event_listener(EventListenerStatus::Defunct, self);
Err(Error::msg("Max connection attempts reached"))
Err(Error::msg(MAX_CONNECTION_ATTEMPTS_REACHED))
}

async fn do_connect(
Expand Down Expand Up @@ -323,6 +319,13 @@ fn status_endpoint(ip_address: IpAddr, rest_port: u16) -> Result<Url, Error> {
Url::from_str(&status_endpoint_str).map_err(Error::from)
}

fn warn_connection_lost(listener: &EventListener, current_attempt: usize) {
warn!(
"Lost connection to node {}, on attempt {}/{}",
listener.node.ip_address, current_attempt, listener.max_connection_attempts
);
}

#[cfg(test)]
mod tests {
use crate::{
Expand Down
6 changes: 5 additions & 1 deletion listener/src/sse_connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,11 @@ pub mod tests {
timeout_after: Duration,
) -> Vec<String> {
let mut data = vec![];
if let Ok(mut receiver) = connection.connect(None).await {
let connection = timeout(Duration::from_secs(5), connection.connect(None)).await;
if connection.is_err() {
panic!("Couln't connect to sse endpoint in 5 seconds");
}
if let Ok(mut receiver) = connection.unwrap() {
while let Ok(res) = timeout(timeout_after, receiver.next()).await {
if let Some(event_res) = res {
if let Ok(event) = event_res {
Expand Down

0 comments on commit de01a61

Please sign in to comment.