Skip to content

Commit

Permalink
Error handling in leader failover situation
Browse files Browse the repository at this point in the history
  • Loading branch information
jopemachine committed Feb 1, 2024
1 parent 5dccee2 commit 2b06c3a
Showing 1 changed file with 21 additions and 6 deletions.
27 changes: 21 additions & 6 deletions raftify/src/raft_node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -848,8 +848,13 @@ impl<
chan: ResponseSender<LogEntry, FSM>,
) -> Result<()> {
if !self.is_leader() {
// wrong leader send client cluster data
let leader_id = self.get_leader_id();
if leader_id == 0 {
self.logger
.error("There is no leader in the cluster at the time. try later...");
return Ok(());
}

// leader can't be an empty node
let leader_addr = self
.peers
Expand Down Expand Up @@ -908,9 +913,13 @@ impl<
}

if !self.is_leader() {
// wrong leader send client cluster data
// TODO: retry strategy in case of failure
let leader_id = self.get_leader_id();
if leader_id == 0 {
self.logger
.error("There is no leader in the cluster at the time. try later...");
return Ok(());
}

let peers = self.peers.lock().await;
let leader_addr = peers.get(&leader_id).unwrap().addr.to_string();

Expand Down Expand Up @@ -1055,7 +1064,7 @@ impl<
}
LocalRequestMsg::SendMessage { message, chan } => {
self.logger.debug(&format!(
"Node {} received local Raft message, Message: {}",
"Node {} received Raft message from itself, Message: {}",
self.raw_node.raft.id,
format_message(&message)
));
Expand Down Expand Up @@ -1099,6 +1108,12 @@ impl<
ServerRequestMsg::RequestId { raft_addr, chan } => {
if !self.is_leader() {
let leader_id = self.get_leader_id();
if leader_id == 0 {
self.logger
.error("There is no leader in the cluster at the time. try later...");
return Ok(());
}

let peers = self.peers.lock().await;
let leader_addr = peers.get(&leader_id).unwrap().addr.to_string();

Expand All @@ -1115,12 +1130,12 @@ impl<
let reserved_id =
if let Some(existing_node_id) = peers.get_node_id_by_addr(raft_addr) {
self.logger
.info(&format!("Node {} connection restored", existing_node_id));
.info(&format!("Node {} connection restored.", existing_node_id));
existing_node_id
} else {
let reserved = peers.reserve_id();
self.logger.info(&format!(
"Node {} reserved new node id {}",
"Node {} reserved new node_id {}.",
self.get_id(),
reserved
));
Expand Down

0 comments on commit 2b06c3a

Please sign in to comment.