From 511ccb975d16e2f2641692283fc0a3e835e2460b Mon Sep 17 00:00:00 2001 From: Kan-Ru Chen Date: Fri, 10 Jan 2025 21:35:21 +0900 Subject: [PATCH] Add a new NodeSessionMessage to get the state of initial sync Remote actors and PG memberships are sync after authentication are done. This new message can be used to query the state of this initial sync. The session setup is complete once the query returns true. Signed-off-by: Kan-Ru Chen --- ractor_cluster/src/node/mod.rs | 3 +++ ractor_cluster/src/node/node_session/mod.rs | 13 +++++++++++++ ractor_cluster/src/node/node_session/tests.rs | 6 ++++++ ractor_cluster/src/protocol/control.proto | 6 ++++++ .../src/tests/pg_groups.rs | 4 ++-- 5 files changed, 30 insertions(+), 2 deletions(-) diff --git a/ractor_cluster/src/node/mod.rs b/ractor_cluster/src/node/mod.rs index 923e2770..641c2f9a 100644 --- a/ractor_cluster/src/node/mod.rs +++ b/ractor_cluster/src/node/mod.rs @@ -151,6 +151,9 @@ pub enum NodeSessionMessage { /// Retrieve whether the session is authenticated or not GetAuthenticationState(RpcReplyPort), + + /// Retrieve whether the session has finished initial sync + GetInitialSyncState(RpcReplyPort), } /// Node connection mode from the [Erlang](https://www.erlang.org/doc/reference_manual/distributed.html#node-connections) diff --git a/ractor_cluster/src/node/node_session/mod.rs b/ractor_cluster/src/node/node_session/mod.rs index addf35d2..ad5b92ed 100644 --- a/ractor_cluster/src/node/node_session/mod.rs +++ b/ractor_cluster/src/node/node_session/mod.rs @@ -455,6 +455,9 @@ impl NodeSession { if let Some(msg) = message.msg { match msg { + control_protocol::control_message::Msg::InitialSyncDone(_) => { + state.initial_sync_done = true; + } control_protocol::control_message::Msg::Spawn(spawned_actors) => { for net_actor in spawned_actors.actors { if let Err(spawn_err) = self @@ -720,6 +723,11 @@ impl NodeSession { state.tcp_send_control(control_message); } } + state.tcp_send_control(control_protocol::ControlMessage { + msg: Some(control_protocol::control_message::Msg::InitialSyncDone( + control_protocol::InitialSyncDone {}, + )), + }); // TODO: subscribe to the named registry and synchronize it? What happes on a name clash? How would this be handled // if both sessions had a "node_a" for example? Which resolves, local only? } @@ -759,6 +767,7 @@ pub struct NodeSessionState { epoch: Instant, name: Option, auth: AuthenticationState, + initial_sync_done: bool, remote_actors: HashMap>, } @@ -861,6 +870,7 @@ impl Actor for NodeSession { } else { AuthenticationState::AsClient(auth::ClientAuthenticationProcess::init()) }, + initial_sync_done: false, remote_actors: HashMap::new(), peer_addr, local_addr, @@ -947,6 +957,9 @@ impl Actor for NodeSession { Self::Msg::GetAuthenticationState(reply) => { let _ = reply.send(state.auth.is_ok()); } + Self::Msg::GetInitialSyncState(reply) => { + let _ = reply.send(state.initial_sync_done); + } _ => { // no-op, ignore } diff --git a/ractor_cluster/src/node/node_session/tests.rs b/ractor_cluster/src/node/node_session/tests.rs index 30ae7f03..42978445 100644 --- a/ractor_cluster/src/node/node_session/tests.rs +++ b/ractor_cluster/src/node/node_session/tests.rs @@ -109,6 +109,7 @@ async fn node_sesison_client_auth_success() { let mut state = NodeSessionState { auth: AuthenticationState::AsClient(auth::ClientAuthenticationProcess::init()), + initial_sync_done: false, local_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0), peer_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0), name: None, @@ -254,6 +255,7 @@ async fn node_session_client_auth_session_state_failures() { let mut state = NodeSessionState { auth: AuthenticationState::AsClient(auth::ClientAuthenticationProcess::init()), + initial_sync_done: false, local_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0), peer_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0), name: None, @@ -384,6 +386,7 @@ async fn node_session_server_auth_success() { // let addr = SocketAddr:: let mut state = NodeSessionState { auth: AuthenticationState::AsServer(auth::ServerAuthenticationProcess::init()), + initial_sync_done: false, local_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0), peer_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0), name: None, @@ -478,6 +481,7 @@ async fn node_session_server_auth_session_state_failures() { let mut state = NodeSessionState { auth: AuthenticationState::AsServer(auth::ServerAuthenticationProcess::init()), + initial_sync_done: false, local_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0), peer_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0), name: None, @@ -627,6 +631,7 @@ async fn node_session_handle_node_msg() { let mut state = NodeSessionState { auth: AuthenticationState::AsServer(auth::ServerAuthenticationProcess::Ok([0u8; 32])), + initial_sync_done: false, local_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0), peer_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0), name: None, @@ -724,6 +729,7 @@ async fn node_session_handle_control() { let mut state = NodeSessionState { auth: AuthenticationState::AsClient(auth::ClientAuthenticationProcess::Ok), + initial_sync_done: false, local_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0), peer_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0), name: None, diff --git a/ractor_cluster/src/protocol/control.proto b/ractor_cluster/src/protocol/control.proto index dd5804e2..b6c43489 100644 --- a/ractor_cluster/src/protocol/control.proto +++ b/ractor_cluster/src/protocol/control.proto @@ -76,6 +76,10 @@ message NodeSessions { repeated auth.NameMessage sessions = 1; } +// All state for initial sync has been pushed +message InitialSyncDone { +} + // Control messages between authenticated `node()`s which are dist-connected message ControlMessage { // The message payload @@ -96,5 +100,7 @@ message ControlMessage { auth.NameMessage enumerate_node_sessions = 7; // The list of node sessions on the remote host for transitive connections NodeSessions node_sessions = 8; + // All state for initial sync has been pushed + InitialSyncDone initial_sync_done = 9; } } diff --git a/ractor_cluster_integration_tests/src/tests/pg_groups.rs b/ractor_cluster_integration_tests/src/tests/pg_groups.rs index 7908d35a..552c8c1c 100644 --- a/ractor_cluster_integration_tests/src/tests/pg_groups.rs +++ b/ractor_cluster_integration_tests/src/tests/pg_groups.rs @@ -154,7 +154,7 @@ pub(crate) async fn test(config: PgGroupsConfig) -> i32 { { let is_authenticated = ractor::call_t!( item.actor, - ractor_cluster::NodeSessionMessage::GetAuthenticationState, + ractor_cluster::NodeSessionMessage::GetInitialSyncState, 200 ); match is_authenticated { @@ -167,7 +167,7 @@ pub(crate) async fn test(config: PgGroupsConfig) -> i32 { } Ok(true) => { err_code = 0; - tracing::info!("Authentication succeeded. Exiting test"); + tracing::info!("Authentication and initial sync succeeded."); break; } }