Skip to content

Commit

Permalink
Add a new NodeSessionMessage to get the state of initial sync
Browse files Browse the repository at this point in the history
Remote actors and PG memberships are sync after authentication
are done. This new message can be used to query the state of
this initial sync. The session setup is complete once the query
returns true.

Signed-off-by: Kan-Ru Chen <[email protected]>
  • Loading branch information
kanru committed Jan 10, 2025
1 parent 3b7cf3d commit 511ccb9
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 2 deletions.
3 changes: 3 additions & 0 deletions ractor_cluster/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,9 @@ pub enum NodeSessionMessage {

/// Retrieve whether the session is authenticated or not
GetAuthenticationState(RpcReplyPort<bool>),

/// Retrieve whether the session has finished initial sync
GetInitialSyncState(RpcReplyPort<bool>),
}

/// Node connection mode from the [Erlang](https://www.erlang.org/doc/reference_manual/distributed.html#node-connections)
Expand Down
13 changes: 13 additions & 0 deletions ractor_cluster/src/node/node_session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,9 @@ impl NodeSession {

if let Some(msg) = message.msg {
match msg {
control_protocol::control_message::Msg::InitialSyncDone(_) => {
state.initial_sync_done = true;
}

Check warning on line 460 in ractor_cluster/src/node/node_session/mod.rs

View check run for this annotation

Codecov / codecov/patch

ractor_cluster/src/node/node_session/mod.rs#L458-L460

Added lines #L458 - L460 were not covered by tests
control_protocol::control_message::Msg::Spawn(spawned_actors) => {
for net_actor in spawned_actors.actors {
if let Err(spawn_err) = self
Expand Down Expand Up @@ -720,6 +723,11 @@ impl NodeSession {
state.tcp_send_control(control_message);
}
}
state.tcp_send_control(control_protocol::ControlMessage {
msg: Some(control_protocol::control_message::Msg::InitialSyncDone(
control_protocol::InitialSyncDone {},
)),
});

Check warning on line 730 in ractor_cluster/src/node/node_session/mod.rs

View check run for this annotation

Codecov / codecov/patch

ractor_cluster/src/node/node_session/mod.rs#L726-L730

Added lines #L726 - L730 were not covered by tests
// TODO: subscribe to the named registry and synchronize it? What happes on a name clash? How would this be handled
// if both sessions had a "node_a" for example? Which resolves, local only?
}
Expand Down Expand Up @@ -759,6 +767,7 @@ pub struct NodeSessionState {
epoch: Instant,
name: Option<auth_protocol::NameMessage>,
auth: AuthenticationState,
initial_sync_done: bool,
remote_actors: HashMap<u64, ActorRef<RemoteActorMessage>>,
}

Expand Down Expand Up @@ -861,6 +870,7 @@ impl Actor for NodeSession {
} else {
AuthenticationState::AsClient(auth::ClientAuthenticationProcess::init())
},
initial_sync_done: false,
remote_actors: HashMap::new(),
peer_addr,
local_addr,
Expand Down Expand Up @@ -947,6 +957,9 @@ impl Actor for NodeSession {
Self::Msg::GetAuthenticationState(reply) => {
let _ = reply.send(state.auth.is_ok());
}
Self::Msg::GetInitialSyncState(reply) => {
let _ = reply.send(state.initial_sync_done);
}

Check warning on line 962 in ractor_cluster/src/node/node_session/mod.rs

View check run for this annotation

Codecov / codecov/patch

ractor_cluster/src/node/node_session/mod.rs#L960-L962

Added lines #L960 - L962 were not covered by tests
_ => {
// no-op, ignore
}
Expand Down
6 changes: 6 additions & 0 deletions ractor_cluster/src/node/node_session/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ async fn node_sesison_client_auth_success() {

let mut state = NodeSessionState {
auth: AuthenticationState::AsClient(auth::ClientAuthenticationProcess::init()),
initial_sync_done: false,
local_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
peer_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
name: None,
Expand Down Expand Up @@ -254,6 +255,7 @@ async fn node_session_client_auth_session_state_failures() {

let mut state = NodeSessionState {
auth: AuthenticationState::AsClient(auth::ClientAuthenticationProcess::init()),
initial_sync_done: false,
local_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
peer_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
name: None,
Expand Down Expand Up @@ -384,6 +386,7 @@ async fn node_session_server_auth_success() {
// let addr = SocketAddr::
let mut state = NodeSessionState {
auth: AuthenticationState::AsServer(auth::ServerAuthenticationProcess::init()),
initial_sync_done: false,
local_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
peer_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
name: None,
Expand Down Expand Up @@ -478,6 +481,7 @@ async fn node_session_server_auth_session_state_failures() {

let mut state = NodeSessionState {
auth: AuthenticationState::AsServer(auth::ServerAuthenticationProcess::init()),
initial_sync_done: false,
local_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
peer_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
name: None,
Expand Down Expand Up @@ -627,6 +631,7 @@ async fn node_session_handle_node_msg() {

let mut state = NodeSessionState {
auth: AuthenticationState::AsServer(auth::ServerAuthenticationProcess::Ok([0u8; 32])),
initial_sync_done: false,
local_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
peer_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
name: None,
Expand Down Expand Up @@ -724,6 +729,7 @@ async fn node_session_handle_control() {

let mut state = NodeSessionState {
auth: AuthenticationState::AsClient(auth::ClientAuthenticationProcess::Ok),
initial_sync_done: false,
local_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
peer_addr: SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
name: None,
Expand Down
6 changes: 6 additions & 0 deletions ractor_cluster/src/protocol/control.proto
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ message NodeSessions {
repeated auth.NameMessage sessions = 1;
}

// All state for initial sync has been pushed
message InitialSyncDone {
}

// Control messages between authenticated `node()`s which are dist-connected
message ControlMessage {
// The message payload
Expand All @@ -96,5 +100,7 @@ message ControlMessage {
auth.NameMessage enumerate_node_sessions = 7;
// The list of node sessions on the remote host for transitive connections
NodeSessions node_sessions = 8;
// All state for initial sync has been pushed
InitialSyncDone initial_sync_done = 9;
}
}
4 changes: 2 additions & 2 deletions ractor_cluster_integration_tests/src/tests/pg_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ pub(crate) async fn test(config: PgGroupsConfig) -> i32 {
{
let is_authenticated = ractor::call_t!(
item.actor,
ractor_cluster::NodeSessionMessage::GetAuthenticationState,
ractor_cluster::NodeSessionMessage::GetInitialSyncState,
200
);
match is_authenticated {
Expand All @@ -167,7 +167,7 @@ pub(crate) async fn test(config: PgGroupsConfig) -> i32 {
}
Ok(true) => {
err_code = 0;
tracing::info!("Authentication succeeded. Exiting test");
tracing::info!("Authentication and initial sync succeeded.");
break;
}
}
Expand Down

0 comments on commit 511ccb9

Please sign in to comment.