Skip to content

Commit

Permalink
Adds a silent-retry support for factory messages, such that if a work…
Browse files Browse the repository at this point in the history
…er dies, the message can be silently retried up to a number of retries, without the caller being notified or having the message dropped.

Also: Add debug implementations everywhere and more build deny rules
  • Loading branch information
slawlor committed Oct 16, 2024
1 parent 9a9a938 commit 9466957
Show file tree
Hide file tree
Showing 17 changed files with 579 additions and 42 deletions.
6 changes: 4 additions & 2 deletions ractor/src/actor/actor_cell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ impl ActorPortSet {
/// Returns [Ok(`TState`)] when the future completes without
/// signal interruption, [Err(Signal)] in the event the
/// signal interrupts the async work.
pub async fn run_with_signal<TState>(
pub(crate) async fn run_with_signal<TState>(
&mut self,
future: impl std::future::Future<Output = TState>,
) -> Result<TState, Signal>
Expand Down Expand Up @@ -148,7 +148,9 @@ impl ActorPortSet {
///
/// Returns [Ok(ActorPortMessage)] on a successful message reception, [MessagingErr]
/// in the event any of the channels is closed.
pub async fn listen_in_priority(&mut self) -> Result<ActorPortMessage, MessagingErr<()>> {
pub(crate) async fn listen_in_priority(
&mut self,
) -> Result<ActorPortMessage, MessagingErr<()>> {
#[cfg(feature = "async-std")]
{
crate::concurrency::select! {
Expand Down
35 changes: 22 additions & 13 deletions ractor/src/actor/actor_properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub(crate) struct ActorProperties {
}

impl ActorProperties {
pub fn new<TActor>(
pub(crate) fn new<TActor>(
name: Option<ActorName>,
) -> (
Self,
Expand All @@ -57,7 +57,7 @@ impl ActorProperties {
Self::new_remote::<TActor>(name, crate::actor::actor_id::get_new_local_id())
}

pub fn new_remote<TActor>(
pub(crate) fn new_remote<TActor>(
name: Option<ActorName>,
id: ActorId,
) -> (
Expand Down Expand Up @@ -96,7 +96,7 @@ impl ActorProperties {
)
}

pub fn get_status(&self) -> ActorStatus {
pub(crate) fn get_status(&self) -> ActorStatus {
match self.status.load(Ordering::SeqCst) {
0u8 => ActorStatus::Unstarted,
1u8 => ActorStatus::Starting,
Expand All @@ -108,11 +108,11 @@ impl ActorProperties {
}
}

pub fn set_status(&self, status: ActorStatus) {
pub(crate) fn set_status(&self, status: ActorStatus) {
self.status.store(status as u8, Ordering::SeqCst);
}

pub fn send_signal(&self, signal: Signal) -> Result<(), MessagingErr<()>> {
pub(crate) fn send_signal(&self, signal: Signal) -> Result<(), MessagingErr<()>> {
self.signal
.lock()
.unwrap()
Expand All @@ -122,14 +122,17 @@ impl ActorProperties {
})
}

pub fn send_supervisor_evt(
pub(crate) fn send_supervisor_evt(
&self,
message: SupervisionEvent,
) -> Result<(), MessagingErr<SupervisionEvent>> {
self.supervision.send(message).map_err(|e| e.into())
}

pub fn send_message<TMessage>(&self, message: TMessage) -> Result<(), MessagingErr<TMessage>>
pub(crate) fn send_message<TMessage>(
&self,
message: TMessage,
) -> Result<(), MessagingErr<TMessage>>
where
TMessage: Message,
{
Expand All @@ -156,7 +159,7 @@ impl ActorProperties {
})
}

pub fn drain(&self) -> Result<(), MessagingErr<()>> {
pub(crate) fn drain(&self) -> Result<(), MessagingErr<()>> {
let _ = self
.status
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |f| {
Expand All @@ -172,7 +175,7 @@ impl ActorProperties {
}

#[cfg(feature = "cluster")]
pub fn send_serialized(
pub(crate) fn send_serialized(
&self,
message: SerializedMessage,
) -> Result<(), MessagingErr<SerializedMessage>> {
Expand All @@ -189,7 +192,10 @@ impl ActorProperties {
})
}

pub fn send_stop(&self, reason: Option<String>) -> Result<(), MessagingErr<StopMessage>> {
pub(crate) fn send_stop(
&self,
reason: Option<String>,
) -> Result<(), MessagingErr<StopMessage>> {
let msg = reason.map(StopMessage::Reason).unwrap_or(StopMessage::Stop);
self.stop
.lock()
Expand All @@ -201,7 +207,7 @@ impl ActorProperties {
}

/// Send the stop signal, threading in a OneShot sender which notifies when the shutdown is completed
pub async fn send_stop_and_wait(
pub(crate) async fn send_stop_and_wait(
&self,
reason: Option<String>,
) -> Result<(), MessagingErr<StopMessage>> {
Expand All @@ -212,15 +218,18 @@ impl ActorProperties {
}

/// Send the kill signal, threading in a OneShot sender which notifies when the shutdown is completed
pub async fn send_signal_and_wait(&self, signal: Signal) -> Result<(), MessagingErr<()>> {
pub(crate) async fn send_signal_and_wait(
&self,
signal: Signal,
) -> Result<(), MessagingErr<()>> {
// first bind the wait handler
let rx = self.wait_handler.notified();
let _ = self.send_signal(signal);
rx.await;
Ok(())
}

pub fn notify_stop_listener(&self) {
pub(crate) fn notify_stop_listener(&self) {
self.wait_handler.notify_waiters();
}
}
6 changes: 6 additions & 0 deletions ractor/src/actor/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ pub struct BoxedState {
pub msg: Option<Box<dyn Any + Send>>,
}

impl Debug for BoxedState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "BoxedState")
}

Check warning on line 30 in ractor/src/actor/messages.rs

View check run for this annotation

Codecov / codecov/patch

ractor/src/actor/messages.rs#L28-L30

Added lines #L28 - L30 were not covered by tests
}

impl BoxedState {
/// Create a new [BoxedState] from a strongly-typed message
pub fn new<T>(msg: T) -> Self
Expand Down
11 changes: 11 additions & 0 deletions ractor/src/actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
//! log to `stderr` for tracing. You can additionally setup a [panic hook](https://doc.rust-lang.org/std/panic/fn.set_hook.html)
//! to do things like capturing backtraces on the unwinding panic.
use std::fmt::Debug;
#[cfg(not(feature = "async-trait"))]
use std::future::Future;
use std::panic::AssertUnwindSafe;
Expand Down Expand Up @@ -478,6 +479,16 @@ where
name: Option<String>,
}

impl<TActor: Actor> Debug for ActorRuntime<TActor> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if let Some(name) = self.name.as_ref() {
write!(f, "ActorRuntime('{}' - {})", name, self.id)

Check warning on line 485 in ractor/src/actor/mod.rs

View check run for this annotation

Codecov / codecov/patch

ractor/src/actor/mod.rs#L483-L485

Added lines #L483 - L485 were not covered by tests
} else {
write!(f, "ActorRuntime({})", self.id)

Check warning on line 487 in ractor/src/actor/mod.rs

View check run for this annotation

Codecov / codecov/patch

ractor/src/actor/mod.rs#L487

Added line #L487 was not covered by tests
}
}

Check warning on line 489 in ractor/src/actor/mod.rs

View check run for this annotation

Codecov / codecov/patch

ractor/src/actor/mod.rs#L489

Added line #L489 was not covered by tests
}

impl<TActor> ActorRuntime<TActor>
where
TActor: Actor,
Expand Down
22 changes: 11 additions & 11 deletions ractor/src/actor/supervision.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,37 +21,37 @@ use super::{actor_cell::ActorCell, messages::SupervisionEvent};
use crate::ActorId;

/// A supervision tree
#[derive(Default)]
pub struct SupervisionTree {
#[derive(Default, Debug)]
pub(crate) struct SupervisionTree {
children: Arc<Mutex<HashMap<ActorId, ActorCell>>>,
supervisor: Arc<Mutex<Option<ActorCell>>>,
}

impl SupervisionTree {
/// Push a child into the tere
pub fn insert_child(&self, child: ActorCell) {
pub(crate) fn insert_child(&self, child: ActorCell) {
self.children.lock().unwrap().insert(child.get_id(), child);
}

/// Remove a specific actor from the supervision tree (e.g. actor died)
pub fn remove_child(&self, child: ActorId) {
pub(crate) fn remove_child(&self, child: ActorId) {
self.children.lock().unwrap().remove(&child);
}

/// Push a parent into the tere
pub fn set_supervisor(&self, parent: ActorCell) {
pub(crate) fn set_supervisor(&self, parent: ActorCell) {
*(self.supervisor.lock().unwrap()) = Some(parent);
}

/// Remove a specific actor from the supervision tree (e.g. actor died)
pub fn clear_supervisor(&self) {
pub(crate) fn clear_supervisor(&self) {
*(self.supervisor.lock().unwrap()) = None;
}

/// Terminate all your supervised children and unlink them
/// from the supervision tree since the supervisor is shutting down
/// and can't deal with superivison events anyways
pub fn terminate_all_children(&self) {
pub(crate) fn terminate_all_children(&self) {
let mut guard = self.children.lock().unwrap();
let cells = guard.iter().map(|(_, a)| a.clone()).collect::<Vec<_>>();
guard.clear();
Expand All @@ -64,7 +64,7 @@ impl SupervisionTree {
}

/// Determine if the specified actor is a parent of this actor
pub fn is_child_of(&self, id: ActorId) -> bool {
pub(crate) fn is_child_of(&self, id: ActorId) -> bool {
if let Some(parent) = &*(self.supervisor.lock().unwrap()) {
parent.get_id() == id
} else {
Expand All @@ -73,21 +73,21 @@ impl SupervisionTree {
}

/// Send a notification to the supervisor.
pub fn notify_supervisor(&self, evt: SupervisionEvent) {
pub(crate) fn notify_supervisor(&self, evt: SupervisionEvent) {
if let Some(parent) = &*(self.supervisor.lock().unwrap()) {
let _ = parent.send_supervisor_evt(evt);
}
}

/// Retrieve the number of supervised children
#[cfg(test)]
pub fn get_num_children(&self) -> usize {
pub(crate) fn get_num_children(&self) -> usize {
self.children.lock().unwrap().len()
}

/// Retrieve the number of supervised children
#[cfg(test)]
pub fn get_num_parents(&self) -> usize {
pub(crate) fn get_num_parents(&self) -> usize {
usize::from(self.supervisor.lock().unwrap().is_some())
}
}
24 changes: 23 additions & 1 deletion ractor/src/factory/discard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use super::JobKey;
use crate::Message;

/// The discard mode of a factory
#[derive(Eq, PartialEq, Clone, Copy)]
#[derive(Debug, Eq, PartialEq, Clone, Copy)]
pub enum DiscardMode {
/// Discard oldest incoming jobs under backpressure
Oldest,
Expand All @@ -28,6 +28,7 @@ pub enum DiscardMode {
/// workers. The workers "think" it's static, but the factory handles the dynamics.
/// This way the factory can keep the [DynamicDiscardHandler] as a single, uncloned
/// instance. It also moves NUM_WORKER calculations to 1.
#[derive(Debug)]
pub(crate) enum WorkerDiscardSettings {
None,
Static { limit: usize, mode: DiscardMode },
Expand Down Expand Up @@ -84,6 +85,26 @@ pub enum DiscardSettings {
},
}

impl std::fmt::Debug for DiscardSettings {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {

Check warning on line 90 in ractor/src/factory/discard.rs

View check run for this annotation

Codecov / codecov/patch

ractor/src/factory/discard.rs#L89-L90

Added lines #L89 - L90 were not covered by tests
DiscardSettings::None => {
write!(f, "DiscardSettings::None")

Check warning on line 92 in ractor/src/factory/discard.rs

View check run for this annotation

Codecov / codecov/patch

ractor/src/factory/discard.rs#L92

Added line #L92 was not covered by tests
}
DiscardSettings::Static { limit, mode } => f
.debug_struct("DiscardSettings::Static")
.field("limit", limit)
.field("mode", mode)
.finish(),
DiscardSettings::Dynamic { limit, mode, .. } => f
.debug_struct("DiscardSettings::Dynamic")
.field("limit", limit)
.field("mode", mode)
.finish(),

Check warning on line 103 in ractor/src/factory/discard.rs

View check run for this annotation

Codecov / codecov/patch

ractor/src/factory/discard.rs#L94-L103

Added lines #L94 - L103 were not covered by tests
}
}

Check warning on line 105 in ractor/src/factory/discard.rs

View check run for this annotation

Codecov / codecov/patch

ractor/src/factory/discard.rs#L105

Added line #L105 was not covered by tests
}

impl DiscardSettings {
pub(crate) fn get_worker_settings(&self) -> WorkerDiscardSettings {
match &self {
Expand Down Expand Up @@ -128,6 +149,7 @@ pub trait DynamicDiscardController: Send + Sync + 'static {
}

/// Reason for discarding a job
#[derive(Debug)]
pub enum DiscardReason {
/// The job TTLd
TtlExpired,
Expand Down
Loading

0 comments on commit 9466957

Please sign in to comment.