Skip to content

Commit

Permalink
validate network name on SSE connection
Browse files Browse the repository at this point in the history
  • Loading branch information
Maciej Wójcik committed Nov 14, 2024
1 parent 53e7d1b commit 06c1e68
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 14 deletions.
1 change: 1 addition & 0 deletions event_sidecar/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ fn builder(
ip_address: connection.ip_address,
sse_port: connection.sse_port,
rest_port: connection.rest_port,
network_name: connection.network_name.clone(),
};
let event_listener_builder = EventListenerBuilder {
node: node_interface,
Expand Down
2 changes: 2 additions & 0 deletions event_sidecar/src/testing/testing_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ impl TestingConfig {
ip_address: Option<IpAddr>,
sse_port: Option<u16>,
rest_port: Option<u16>,
network_name: Option<String>,
) -> Port {
let random_port_for_sse = get_port();
let random_port_for_rest = get_port();
Expand All @@ -103,6 +104,7 @@ impl TestingConfig {
connection_timeout_in_seconds: Some(100),
sleep_between_keep_alive_checks_in_seconds: Some(100),
no_message_timeout_in_seconds: Some(100),
network_name,
};
self.event_server_config.connections.push(connection);
random_port_for_sse
Expand Down
71 changes: 62 additions & 9 deletions event_sidecar/src/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ use crate::{
sse_events::{BlockAdded, Fault},
},
utils::tests::{
any_string_contains, build_test_config, build_test_config_with_retries,
build_test_config_without_connections, build_test_config_without_db_storage,
start_nodes_and_wait, start_sidecar, start_sidecar_with_rest_api, stop_nodes_and_wait,
wait_for_n_messages,
any_string_contains, build_test_config, build_test_config_with_network_name,
build_test_config_with_retries, build_test_config_without_connections,
build_test_config_without_db_storage, start_nodes_and_wait, start_sidecar,
start_sidecar_with_rest_api, stop_nodes_and_wait, wait_for_n_messages,
},
};

Expand All @@ -40,7 +40,7 @@ async fn should_not_allow_zero_max_attempts() {

let mut testing_config = prepare_config(&temp_storage_dir, true);

let sse_port_for_node = testing_config.add_connection(None, None, None);
let sse_port_for_node = testing_config.add_connection(None, None, None, None);

testing_config.set_retries_for_node(sse_port_for_node, 0, 0);
let sqlite_database = SqliteDatabase::new_from_config(&testing_config.storage_config)
Expand Down Expand Up @@ -290,7 +290,7 @@ async fn should_fail_to_reconnect() {
node_port_for_sse_connection,
node_port_for_rest_connection,
event_stream_server_port,
) = build_test_config_with_retries(2, 2, true);
) = build_test_config_with_retries(2, 2, true, None);
let (data_of_node, test_rng) = random_n_block_added(30, 0, test_rng);
let mut node_mock = MockNodeBuilder {
version: "2.0.0".to_string(),
Expand Down Expand Up @@ -339,7 +339,7 @@ async fn should_reconnect() {
node_port_for_sse_connection,
node_port_for_rest_connection,
event_stream_server_port,
) = build_test_config_with_retries(10, 1, true);
) = build_test_config_with_retries(10, 1, true, None);
let (data_of_node, test_rng) = random_n_block_added(30, 0, test_rng);
let mut node_mock = MockNodeBuilder {
version: "2.0.0".to_string(),
Expand Down Expand Up @@ -428,7 +428,50 @@ async fn connecting_to_node_prior_to_2_0_0_should_fail() {
}
.build();
start_nodes_and_wait(vec![&mut node_mock]).await;
start_sidecar(testing_config).await;
let sidecar_join = start_sidecar(testing_config).await;
let (join_handle, _) = fetch_data_from_endpoint_with_panic_flag(
"/events?start_from=0",
event_stream_server_port,
false,
)
.await;
sleep(Duration::from_secs(10)).await; //Give some time for sidecar to read data from node (which it actually shouldn't do in this scenario)
stop_nodes_and_wait(vec![&mut node_mock]).await;

let events_received = tokio::join!(join_handle).0.unwrap();
assert_eq!(events_received.len(), 0);

let shutdown_err = sidecar_join
.await
.unwrap()
.expect_err("Sidecar should return an Err message on shutdown");

assert_eq!(
shutdown_err.to_string(),
"Connected node(s) are unavailable"
)
}

#[tokio::test(flavor = "multi_thread", worker_threads = 5)]
async fn connecting_to_node_with_wrong_network_name_should_fail() {
let (
testing_config,
_temp_storage_dir,
node_port_for_sse_connection,
node_port_for_rest_connection,
event_stream_server_port,
) = build_test_config_with_network_name("network-1");
let mut node_mock = MockNodeBuilder {
version: "2.0.0".to_string(),
network_name: "not-network-1".to_string(),
data_of_node: sse_server_shutdown_2_0_0_data(),
cache_of_node: None,
sse_port: Some(node_port_for_sse_connection),
rest_port: Some(node_port_for_rest_connection),
}
.build();
start_nodes_and_wait(vec![&mut node_mock]).await;
let sidecar_join = start_sidecar(testing_config).await;
let (join_handle, _) = fetch_data_from_endpoint_with_panic_flag(
"/events?start_from=0",
event_stream_server_port,
Expand All @@ -440,6 +483,16 @@ async fn connecting_to_node_prior_to_2_0_0_should_fail() {

let events_received = tokio::join!(join_handle).0.unwrap();
assert_eq!(events_received.len(), 0);

let shutdown_err = sidecar_join
.await
.unwrap()
.expect_err("Sidecar should return an Err message on shutdown");

assert_eq!(
shutdown_err.to_string(),
"Connected node(s) are unavailable"
)
}

#[tokio::test(flavor = "multi_thread", worker_threads = 5)]
Expand Down Expand Up @@ -827,7 +880,7 @@ pub fn build_testing_config_based_on_ports(
let (mut testing_config, temp_storage_dir, event_stream_server_port) =
build_test_config_without_connections(true);
for (sse_port, rest_port) in ports_of_nodes {
testing_config.add_connection(None, Some(sse_port), Some(rest_port));
testing_config.add_connection(None, Some(sse_port), Some(rest_port), None);
testing_config.set_retries_for_node(sse_port, 5, 2);
testing_config.set_allow_partial_connection_for_node(sse_port, true);
}
Expand Down
11 changes: 10 additions & 1 deletion event_sidecar/src/tests/performance_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ async fn performance_check(scenario: Scenario, duration: Duration, acceptable_la

let temp_storage_dir = tempdir().expect("Should have created a temporary storage directory");
let mut testing_config = prepare_config(&temp_storage_dir, true);
testing_config.add_connection(None, None, None);
testing_config.add_connection(None, None, None, None);
let node_port_for_sse_connection = testing_config
.event_server_config
.connections
Expand All @@ -278,6 +278,13 @@ async fn performance_check(scenario: Scenario, duration: Duration, acceptable_la
.first()
.unwrap()
.rest_port;
let network_name = testing_config
.event_server_config
.connections
.first()
.unwrap()
.network_name
.clone();
let (_shutdown_tx, _after_shutdown_rx) =
setup_mock_build_version_server(node_port_for_rest_connection).await;

Expand All @@ -293,6 +300,7 @@ async fn performance_check(scenario: Scenario, duration: Duration, acceptable_la
ip_address,
sse_port: node_port_for_sse_connection,
rest_port: node_port_for_rest_connection,
network_name: network_name.clone(),
};
let (node_event_tx, node_event_rx) = mpsc::channel(100);
let mut node_event_listener = EventListenerBuilder {
Expand Down Expand Up @@ -321,6 +329,7 @@ async fn performance_check(scenario: Scenario, duration: Duration, acceptable_la
ip_address: IpAddr::from_str("127.0.0.1").expect("Couldn't parse IpAddr"),
sse_port: node_port_for_sse_connection,
rest_port: node_port_for_rest_connection,
network_name,
};
let mut sidecar_event_listener = EventListenerBuilder {
node: sidecar_node_interface,
Expand Down
15 changes: 11 additions & 4 deletions event_sidecar/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,11 +274,17 @@ pub mod tests {
}

pub fn build_test_config() -> (TestingConfig, TempDir, u16, u16, u16) {
build_test_config_with_retries(10, 1, true)
build_test_config_with_retries(10, 1, true, None)
}

pub fn build_test_config_without_db_storage() -> (TestingConfig, TempDir, u16, u16, u16) {
build_test_config_with_retries(10, 1, false)
build_test_config_with_retries(10, 1, false, None)
}

pub fn build_test_config_with_network_name(
network_name: &str,
) -> (TestingConfig, TempDir, u16, u16, u16) {
build_test_config_with_retries(10, 1, true, Some(network_name.into()))
}

pub fn build_test_config_without_connections(
Expand All @@ -294,10 +300,11 @@ pub mod tests {
max_attempts: usize,
delay_between_retries: usize,
enable_db_storage: bool,
network_name: Option<String>,
) -> (TestingConfig, TempDir, u16, u16, u16) {
let (mut testing_config, temp_storage_dir, event_stream_server_port) =
build_test_config_without_connections(enable_db_storage);
testing_config.add_connection(None, None, None);
testing_config.add_connection(None, None, None, network_name);
let node_port_for_sse_connection = testing_config
.event_server_config
.connections
Expand Down Expand Up @@ -409,7 +416,7 @@ pub mod tests {
let mut testing_config = prepare_config(&temp_storage_dir, true);
let event_stream_server_port = testing_config.event_stream_server_port();
testing_config.set_storage(StorageConfig::postgres_with_port(context.port));
testing_config.add_connection(None, None, None);
testing_config.add_connection(None, None, None, None);
let node_port_for_sse_connection = testing_config
.event_server_config
.connections
Expand Down
8 changes: 8 additions & 0 deletions listener/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,14 @@ impl EventListener {
let fetch_result = self.version_fetcher.fetch().await;
match fetch_result {
Ok(node_metadata) => {
// check if reveived network name matches optional configuration
if let Some(network_name) = &self.node.network_name {
if *network_name != node_metadata.network_name {
let msg = format!("Network name {network_name} does't match name {} configured for node connection", node_metadata.network_name);
error!("{msg}");
return GetNodeMetadataResult::Error(Error::msg(msg));
}
}
if self.node_metadata != node_metadata {
return GetNodeMetadataResult::Ok(Some(node_metadata));
}
Expand Down
2 changes: 2 additions & 0 deletions listener/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub struct NodeConnectionInterface {
pub ip_address: IpAddr,
pub sse_port: u16,
pub rest_port: u16,
pub network_name: Option<String>,
}

#[cfg(test)]
Expand All @@ -20,6 +21,7 @@ impl Default for NodeConnectionInterface {
ip_address: "127.0.0.1".parse().unwrap(),
sse_port: 100,
rest_port: 200,
network_name: None,
}
}
}
Expand Down

0 comments on commit 06c1e68

Please sign in to comment.