diff --git a/ractor_cluster/src/node/mod.rs b/ractor_cluster/src/node/mod.rs index 923e2770..641c2f9a 100644 --- a/ractor_cluster/src/node/mod.rs +++ b/ractor_cluster/src/node/mod.rs @@ -151,6 +151,9 @@ pub enum NodeSessionMessage { /// Retrieve whether the session is authenticated or not GetAuthenticationState(RpcReplyPort), + + /// Retrieve whether the session has finished initial sync + GetInitialSyncState(RpcReplyPort), } /// Node connection mode from the [Erlang](https://www.erlang.org/doc/reference_manual/distributed.html#node-connections) diff --git a/ractor_cluster/src/node/node_session/mod.rs b/ractor_cluster/src/node/node_session/mod.rs index addf35d2..ad5b92ed 100644 --- a/ractor_cluster/src/node/node_session/mod.rs +++ b/ractor_cluster/src/node/node_session/mod.rs @@ -455,6 +455,9 @@ impl NodeSession { if let Some(msg) = message.msg { match msg { + control_protocol::control_message::Msg::InitialSyncDone(_) => { + state.initial_sync_done = true; + } control_protocol::control_message::Msg::Spawn(spawned_actors) => { for net_actor in spawned_actors.actors { if let Err(spawn_err) = self @@ -720,6 +723,11 @@ impl NodeSession { state.tcp_send_control(control_message); } } + state.tcp_send_control(control_protocol::ControlMessage { + msg: Some(control_protocol::control_message::Msg::InitialSyncDone( + control_protocol::InitialSyncDone {}, + )), + }); // TODO: subscribe to the named registry and synchronize it? What happes on a name clash? How would this be handled // if both sessions had a "node_a" for example? Which resolves, local only? } @@ -759,6 +767,7 @@ pub struct NodeSessionState { epoch: Instant, name: Option, auth: AuthenticationState, + initial_sync_done: bool, remote_actors: HashMap>, } @@ -861,6 +870,7 @@ impl Actor for NodeSession { } else { AuthenticationState::AsClient(auth::ClientAuthenticationProcess::init()) }, + initial_sync_done: false, remote_actors: HashMap::new(), peer_addr, local_addr, @@ -947,6 +957,9 @@ impl Actor for NodeSession { Self::Msg::GetAuthenticationState(reply) => { let _ = reply.send(state.auth.is_ok()); } + Self::Msg::GetInitialSyncState(reply) => { + let _ = reply.send(state.initial_sync_done); + } _ => { // no-op, ignore } diff --git a/ractor_cluster/src/node/node_session/tests.rs b/ractor_cluster/src/node/node_session/tests.rs index 30ae7f03..42978445 100644 --- a/ractor_cluster/src/node/node_session/tests.rs +++ b/ractor_cluster/src/node/node_session/tests.rs @@ -109,6 +109,7 @@ async fn node_sesison_client_auth_success() { let mut state = NodeSessionState { auth: AuthenticationState::AsClient(auth::ClientAuthenticationProcess::init()), + initial_sync_done: false, local_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0), peer_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0), name: None, @@ -254,6 +255,7 @@ async fn node_session_client_auth_session_state_failures() { let mut state = NodeSessionState { auth: AuthenticationState::AsClient(auth::ClientAuthenticationProcess::init()), + initial_sync_done: false, local_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0), peer_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0), name: None, @@ -384,6 +386,7 @@ async fn node_session_server_auth_success() { // let addr = SocketAddr:: let mut state = NodeSessionState { auth: AuthenticationState::AsServer(auth::ServerAuthenticationProcess::init()), + initial_sync_done: false, local_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0), peer_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0), name: None, @@ -478,6 +481,7 @@ async fn node_session_server_auth_session_state_failures() { let mut state = NodeSessionState { auth: AuthenticationState::AsServer(auth::ServerAuthenticationProcess::init()), + initial_sync_done: false, local_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0), peer_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0), name: None, @@ -627,6 +631,7 @@ async fn node_session_handle_node_msg() { let mut state = NodeSessionState { auth: AuthenticationState::AsServer(auth::ServerAuthenticationProcess::Ok([0u8; 32])), + initial_sync_done: false, local_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0), peer_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0), name: None, @@ -724,6 +729,7 @@ async fn node_session_handle_control() { let mut state = NodeSessionState { auth: AuthenticationState::AsClient(auth::ClientAuthenticationProcess::Ok), + initial_sync_done: false, local_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0), peer_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0), name: None, diff --git a/ractor_cluster/src/protocol/control.proto b/ractor_cluster/src/protocol/control.proto index dd5804e2..b6c43489 100644 --- a/ractor_cluster/src/protocol/control.proto +++ b/ractor_cluster/src/protocol/control.proto @@ -76,6 +76,10 @@ message NodeSessions { repeated auth.NameMessage sessions = 1; } +// All state for initial sync has been pushed +message InitialSyncDone { +} + // Control messages between authenticated `node()`s which are dist-connected message ControlMessage { // The message payload @@ -96,5 +100,7 @@ message ControlMessage { auth.NameMessage enumerate_node_sessions = 7; // The list of node sessions on the remote host for transitive connections NodeSessions node_sessions = 8; + // All state for initial sync has been pushed + InitialSyncDone initial_sync_done = 9; } } diff --git a/ractor_cluster_integration_tests/src/tests/pg_groups.rs b/ractor_cluster_integration_tests/src/tests/pg_groups.rs index 7908d35a..552c8c1c 100644 --- a/ractor_cluster_integration_tests/src/tests/pg_groups.rs +++ b/ractor_cluster_integration_tests/src/tests/pg_groups.rs @@ -154,7 +154,7 @@ pub(crate) async fn test(config: PgGroupsConfig) -> i32 { { let is_authenticated = ractor::call_t!( item.actor, - ractor_cluster::NodeSessionMessage::GetAuthenticationState, + ractor_cluster::NodeSessionMessage::GetInitialSyncState, 200 ); match is_authenticated { @@ -167,7 +167,7 @@ pub(crate) async fn test(config: PgGroupsConfig) -> i32 { } Ok(true) => { err_code = 0; - tracing::info!("Authentication succeeded. Exiting test"); + tracing::info!("Authentication and initial sync succeeded."); break; } }