From 06c1e680af6a164253aed4e9789e5780098101a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20W=C3=B3jcik?= Date: Thu, 14 Nov 2024 12:11:32 +0100 Subject: [PATCH] validate network name on SSE connection --- event_sidecar/src/lib.rs | 1 + event_sidecar/src/testing/testing_config.rs | 2 + event_sidecar/src/tests/integration_tests.rs | 71 +++++++++++++++++--- event_sidecar/src/tests/performance_tests.rs | 11 ++- event_sidecar/src/utils.rs | 15 +++-- listener/src/lib.rs | 8 +++ listener/src/types.rs | 2 + 7 files changed, 96 insertions(+), 14 deletions(-) diff --git a/event_sidecar/src/lib.rs b/event_sidecar/src/lib.rs index b804a688..0570d7ea 100644 --- a/event_sidecar/src/lib.rs +++ b/event_sidecar/src/lib.rs @@ -259,6 +259,7 @@ fn builder( ip_address: connection.ip_address, sse_port: connection.sse_port, rest_port: connection.rest_port, + network_name: connection.network_name.clone(), }; let event_listener_builder = EventListenerBuilder { node: node_interface, diff --git a/event_sidecar/src/testing/testing_config.rs b/event_sidecar/src/testing/testing_config.rs index 9ed3d579..e1875834 100644 --- a/event_sidecar/src/testing/testing_config.rs +++ b/event_sidecar/src/testing/testing_config.rs @@ -89,6 +89,7 @@ impl TestingConfig { ip_address: Option, sse_port: Option, rest_port: Option, + network_name: Option, ) -> Port { let random_port_for_sse = get_port(); let random_port_for_rest = get_port(); @@ -103,6 +104,7 @@ impl TestingConfig { connection_timeout_in_seconds: Some(100), sleep_between_keep_alive_checks_in_seconds: Some(100), no_message_timeout_in_seconds: Some(100), + network_name, }; self.event_server_config.connections.push(connection); random_port_for_sse diff --git a/event_sidecar/src/tests/integration_tests.rs b/event_sidecar/src/tests/integration_tests.rs index d2dcf229..2d33ec7c 100644 --- a/event_sidecar/src/tests/integration_tests.rs +++ b/event_sidecar/src/tests/integration_tests.rs @@ -27,10 +27,10 @@ use crate::{ sse_events::{BlockAdded, Fault}, }, utils::tests::{ - any_string_contains, build_test_config, build_test_config_with_retries, - build_test_config_without_connections, build_test_config_without_db_storage, - start_nodes_and_wait, start_sidecar, start_sidecar_with_rest_api, stop_nodes_and_wait, - wait_for_n_messages, + any_string_contains, build_test_config, build_test_config_with_network_name, + build_test_config_with_retries, build_test_config_without_connections, + build_test_config_without_db_storage, start_nodes_and_wait, start_sidecar, + start_sidecar_with_rest_api, stop_nodes_and_wait, wait_for_n_messages, }, }; @@ -40,7 +40,7 @@ async fn should_not_allow_zero_max_attempts() { let mut testing_config = prepare_config(&temp_storage_dir, true); - let sse_port_for_node = testing_config.add_connection(None, None, None); + let sse_port_for_node = testing_config.add_connection(None, None, None, None); testing_config.set_retries_for_node(sse_port_for_node, 0, 0); let sqlite_database = SqliteDatabase::new_from_config(&testing_config.storage_config) @@ -290,7 +290,7 @@ async fn should_fail_to_reconnect() { node_port_for_sse_connection, node_port_for_rest_connection, event_stream_server_port, - ) = build_test_config_with_retries(2, 2, true); + ) = build_test_config_with_retries(2, 2, true, None); let (data_of_node, test_rng) = random_n_block_added(30, 0, test_rng); let mut node_mock = MockNodeBuilder { version: "2.0.0".to_string(), @@ -339,7 +339,7 @@ async fn should_reconnect() { node_port_for_sse_connection, node_port_for_rest_connection, event_stream_server_port, - ) = build_test_config_with_retries(10, 1, true); + ) = build_test_config_with_retries(10, 1, true, None); let (data_of_node, test_rng) = random_n_block_added(30, 0, test_rng); let mut node_mock = MockNodeBuilder { version: "2.0.0".to_string(), @@ -428,7 +428,50 @@ async fn connecting_to_node_prior_to_2_0_0_should_fail() { } .build(); start_nodes_and_wait(vec![&mut node_mock]).await; - start_sidecar(testing_config).await; + let sidecar_join = start_sidecar(testing_config).await; + let (join_handle, _) = fetch_data_from_endpoint_with_panic_flag( + "/events?start_from=0", + event_stream_server_port, + false, + ) + .await; + sleep(Duration::from_secs(10)).await; //Give some time for sidecar to read data from node (which it actually shouldn't do in this scenario) + stop_nodes_and_wait(vec![&mut node_mock]).await; + + let events_received = tokio::join!(join_handle).0.unwrap(); + assert_eq!(events_received.len(), 0); + + let shutdown_err = sidecar_join + .await + .unwrap() + .expect_err("Sidecar should return an Err message on shutdown"); + + assert_eq!( + shutdown_err.to_string(), + "Connected node(s) are unavailable" + ) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 5)] +async fn connecting_to_node_with_wrong_network_name_should_fail() { + let ( + testing_config, + _temp_storage_dir, + node_port_for_sse_connection, + node_port_for_rest_connection, + event_stream_server_port, + ) = build_test_config_with_network_name("network-1"); + let mut node_mock = MockNodeBuilder { + version: "2.0.0".to_string(), + network_name: "not-network-1".to_string(), + data_of_node: sse_server_shutdown_2_0_0_data(), + cache_of_node: None, + sse_port: Some(node_port_for_sse_connection), + rest_port: Some(node_port_for_rest_connection), + } + .build(); + start_nodes_and_wait(vec![&mut node_mock]).await; + let sidecar_join = start_sidecar(testing_config).await; let (join_handle, _) = fetch_data_from_endpoint_with_panic_flag( "/events?start_from=0", event_stream_server_port, @@ -440,6 +483,16 @@ async fn connecting_to_node_prior_to_2_0_0_should_fail() { let events_received = tokio::join!(join_handle).0.unwrap(); assert_eq!(events_received.len(), 0); + + let shutdown_err = sidecar_join + .await + .unwrap() + .expect_err("Sidecar should return an Err message on shutdown"); + + assert_eq!( + shutdown_err.to_string(), + "Connected node(s) are unavailable" + ) } #[tokio::test(flavor = "multi_thread", worker_threads = 5)] @@ -827,7 +880,7 @@ pub fn build_testing_config_based_on_ports( let (mut testing_config, temp_storage_dir, event_stream_server_port) = build_test_config_without_connections(true); for (sse_port, rest_port) in ports_of_nodes { - testing_config.add_connection(None, Some(sse_port), Some(rest_port)); + testing_config.add_connection(None, Some(sse_port), Some(rest_port), None); testing_config.set_retries_for_node(sse_port, 5, 2); testing_config.set_allow_partial_connection_for_node(sse_port, true); } diff --git a/event_sidecar/src/tests/performance_tests.rs b/event_sidecar/src/tests/performance_tests.rs index 4db84eae..eb0f89e5 100644 --- a/event_sidecar/src/tests/performance_tests.rs +++ b/event_sidecar/src/tests/performance_tests.rs @@ -265,7 +265,7 @@ async fn performance_check(scenario: Scenario, duration: Duration, acceptable_la let temp_storage_dir = tempdir().expect("Should have created a temporary storage directory"); let mut testing_config = prepare_config(&temp_storage_dir, true); - testing_config.add_connection(None, None, None); + testing_config.add_connection(None, None, None, None); let node_port_for_sse_connection = testing_config .event_server_config .connections @@ -278,6 +278,13 @@ async fn performance_check(scenario: Scenario, duration: Duration, acceptable_la .first() .unwrap() .rest_port; + let network_name = testing_config + .event_server_config + .connections + .first() + .unwrap() + .network_name + .clone(); let (_shutdown_tx, _after_shutdown_rx) = setup_mock_build_version_server(node_port_for_rest_connection).await; @@ -293,6 +300,7 @@ async fn performance_check(scenario: Scenario, duration: Duration, acceptable_la ip_address, sse_port: node_port_for_sse_connection, rest_port: node_port_for_rest_connection, + network_name: network_name.clone(), }; let (node_event_tx, node_event_rx) = mpsc::channel(100); let mut node_event_listener = EventListenerBuilder { @@ -321,6 +329,7 @@ async fn performance_check(scenario: Scenario, duration: Duration, acceptable_la ip_address: IpAddr::from_str("127.0.0.1").expect("Couldn't parse IpAddr"), sse_port: node_port_for_sse_connection, rest_port: node_port_for_rest_connection, + network_name, }; let mut sidecar_event_listener = EventListenerBuilder { node: sidecar_node_interface, diff --git a/event_sidecar/src/utils.rs b/event_sidecar/src/utils.rs index 5d847f82..84aaea57 100644 --- a/event_sidecar/src/utils.rs +++ b/event_sidecar/src/utils.rs @@ -274,11 +274,17 @@ pub mod tests { } pub fn build_test_config() -> (TestingConfig, TempDir, u16, u16, u16) { - build_test_config_with_retries(10, 1, true) + build_test_config_with_retries(10, 1, true, None) } pub fn build_test_config_without_db_storage() -> (TestingConfig, TempDir, u16, u16, u16) { - build_test_config_with_retries(10, 1, false) + build_test_config_with_retries(10, 1, false, None) + } + + pub fn build_test_config_with_network_name( + network_name: &str, + ) -> (TestingConfig, TempDir, u16, u16, u16) { + build_test_config_with_retries(10, 1, true, Some(network_name.into())) } pub fn build_test_config_without_connections( @@ -294,10 +300,11 @@ pub mod tests { max_attempts: usize, delay_between_retries: usize, enable_db_storage: bool, + network_name: Option, ) -> (TestingConfig, TempDir, u16, u16, u16) { let (mut testing_config, temp_storage_dir, event_stream_server_port) = build_test_config_without_connections(enable_db_storage); - testing_config.add_connection(None, None, None); + testing_config.add_connection(None, None, None, network_name); let node_port_for_sse_connection = testing_config .event_server_config .connections @@ -409,7 +416,7 @@ pub mod tests { let mut testing_config = prepare_config(&temp_storage_dir, true); let event_stream_server_port = testing_config.event_stream_server_port(); testing_config.set_storage(StorageConfig::postgres_with_port(context.port)); - testing_config.add_connection(None, None, None); + testing_config.add_connection(None, None, None, None); let node_port_for_sse_connection = testing_config .event_server_config .connections diff --git a/listener/src/lib.rs b/listener/src/lib.rs index b63135fa..49b47f2a 100644 --- a/listener/src/lib.rs +++ b/listener/src/lib.rs @@ -250,6 +250,14 @@ impl EventListener { let fetch_result = self.version_fetcher.fetch().await; match fetch_result { Ok(node_metadata) => { + // check if reveived network name matches optional configuration + if let Some(network_name) = &self.node.network_name { + if *network_name != node_metadata.network_name { + let msg = format!("Network name {network_name} does't match name {} configured for node connection", node_metadata.network_name); + error!("{msg}"); + return GetNodeMetadataResult::Error(Error::msg(msg)); + } + } if self.node_metadata != node_metadata { return GetNodeMetadataResult::Ok(Some(node_metadata)); } diff --git a/listener/src/types.rs b/listener/src/types.rs index 85609f3f..5b26645c 100644 --- a/listener/src/types.rs +++ b/listener/src/types.rs @@ -11,6 +11,7 @@ pub struct NodeConnectionInterface { pub ip_address: IpAddr, pub sse_port: u16, pub rest_port: u16, + pub network_name: Option, } #[cfg(test)] @@ -20,6 +21,7 @@ impl Default for NodeConnectionInterface { ip_address: "127.0.0.1".parse().unwrap(), sse_port: 100, rest_port: 200, + network_name: None, } } }