From b4268941a2f1ba58ae77cf0eabfb3458f4aa8502 Mon Sep 17 00:00:00 2001 From: Sean Lawlor Date: Tue, 15 Oct 2024 21:46:17 -0400 Subject: [PATCH] Adds a silent-retry support for factory messages, such that if a worker dies, the message can be silently retried up to a number of retries, without the caller being notified or having the message dropped. Also: Add debug implementations everywhere and more build deny rules --- ractor/src/actor/actor_cell.rs | 6 +- ractor/src/actor/actor_properties.rs | 35 ++-- ractor/src/actor/messages.rs | 6 + ractor/src/actor/mod.rs | 11 ++ ractor/src/actor/supervision.rs | 22 +-- ractor/src/factory/discard.rs | 24 ++- ractor/src/factory/factoryimpl.rs | 78 ++++++++- ractor/src/factory/job.rs | 174 ++++++++++++++++++- ractor/src/factory/mod.rs | 3 +- ractor/src/factory/queues.rs | 24 +++ ractor/src/factory/routing.rs | 4 + ractor/src/factory/tests/worker_lifecycle.rs | 129 ++++++++++++++ ractor/src/factory/worker.rs | 20 +++ ractor/src/lib.rs | 18 +- ractor/src/message.rs | 11 ++ ractor/src/pg/mod.rs | 4 +- ractor/src/port/output/mod.rs | 14 +- 17 files changed, 541 insertions(+), 42 deletions(-) diff --git a/ractor/src/actor/actor_cell.rs b/ractor/src/actor/actor_cell.rs index 7533b61a..3edf7486 100644 --- a/ractor/src/actor/actor_cell.rs +++ b/ractor/src/actor/actor_cell.rs @@ -103,7 +103,7 @@ impl ActorPortSet { /// Returns [Ok(`TState`)] when the future completes without /// signal interruption, [Err(Signal)] in the event the /// signal interrupts the async work. - pub async fn run_with_signal( + pub(crate) async fn run_with_signal( &mut self, future: impl std::future::Future, ) -> Result @@ -148,7 +148,9 @@ impl ActorPortSet { /// /// Returns [Ok(ActorPortMessage)] on a successful message reception, [MessagingErr] /// in the event any of the channels is closed. - pub async fn listen_in_priority(&mut self) -> Result> { + pub(crate) async fn listen_in_priority( + &mut self, + ) -> Result> { #[cfg(feature = "async-std")] { crate::concurrency::select! { diff --git a/ractor/src/actor/actor_properties.rs b/ractor/src/actor/actor_properties.rs index f844fec5..0f770514 100644 --- a/ractor/src/actor/actor_properties.rs +++ b/ractor/src/actor/actor_properties.rs @@ -42,7 +42,7 @@ pub(crate) struct ActorProperties { } impl ActorProperties { - pub fn new( + pub(crate) fn new( name: Option, ) -> ( Self, @@ -57,7 +57,7 @@ impl ActorProperties { Self::new_remote::(name, crate::actor::actor_id::get_new_local_id()) } - pub fn new_remote( + pub(crate) fn new_remote( name: Option, id: ActorId, ) -> ( @@ -96,7 +96,7 @@ impl ActorProperties { ) } - pub fn get_status(&self) -> ActorStatus { + pub(crate) fn get_status(&self) -> ActorStatus { match self.status.load(Ordering::SeqCst) { 0u8 => ActorStatus::Unstarted, 1u8 => ActorStatus::Starting, @@ -108,11 +108,11 @@ impl ActorProperties { } } - pub fn set_status(&self, status: ActorStatus) { + pub(crate) fn set_status(&self, status: ActorStatus) { self.status.store(status as u8, Ordering::SeqCst); } - pub fn send_signal(&self, signal: Signal) -> Result<(), MessagingErr<()>> { + pub(crate) fn send_signal(&self, signal: Signal) -> Result<(), MessagingErr<()>> { self.signal .lock() .unwrap() @@ -122,14 +122,17 @@ impl ActorProperties { }) } - pub fn send_supervisor_evt( + pub(crate) fn send_supervisor_evt( &self, message: SupervisionEvent, ) -> Result<(), MessagingErr> { self.supervision.send(message).map_err(|e| e.into()) } - pub fn send_message(&self, message: TMessage) -> Result<(), MessagingErr> + pub(crate) fn send_message( + &self, + message: TMessage, + ) -> Result<(), MessagingErr> where TMessage: Message, { @@ -156,7 +159,7 @@ impl ActorProperties { }) } - pub fn drain(&self) -> Result<(), MessagingErr<()>> { + pub(crate) fn drain(&self) -> Result<(), MessagingErr<()>> { let _ = self .status .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |f| { @@ -172,7 +175,7 @@ impl ActorProperties { } #[cfg(feature = "cluster")] - pub fn send_serialized( + pub(crate) fn send_serialized( &self, message: SerializedMessage, ) -> Result<(), MessagingErr> { @@ -189,7 +192,10 @@ impl ActorProperties { }) } - pub fn send_stop(&self, reason: Option) -> Result<(), MessagingErr> { + pub(crate) fn send_stop( + &self, + reason: Option, + ) -> Result<(), MessagingErr> { let msg = reason.map(StopMessage::Reason).unwrap_or(StopMessage::Stop); self.stop .lock() @@ -201,7 +207,7 @@ impl ActorProperties { } /// Send the stop signal, threading in a OneShot sender which notifies when the shutdown is completed - pub async fn send_stop_and_wait( + pub(crate) async fn send_stop_and_wait( &self, reason: Option, ) -> Result<(), MessagingErr> { @@ -212,7 +218,10 @@ impl ActorProperties { } /// Send the kill signal, threading in a OneShot sender which notifies when the shutdown is completed - pub async fn send_signal_and_wait(&self, signal: Signal) -> Result<(), MessagingErr<()>> { + pub(crate) async fn send_signal_and_wait( + &self, + signal: Signal, + ) -> Result<(), MessagingErr<()>> { // first bind the wait handler let rx = self.wait_handler.notified(); let _ = self.send_signal(signal); @@ -220,7 +229,7 @@ impl ActorProperties { Ok(()) } - pub fn notify_stop_listener(&self) { + pub(crate) fn notify_stop_listener(&self) { self.wait_handler.notify_waiters(); } } diff --git a/ractor/src/actor/messages.rs b/ractor/src/actor/messages.rs index 1d3b2bc4..cd7077ec 100644 --- a/ractor/src/actor/messages.rs +++ b/ractor/src/actor/messages.rs @@ -24,6 +24,12 @@ pub struct BoxedState { pub msg: Option>, } +impl Debug for BoxedState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "BoxedState") + } +} + impl BoxedState { /// Create a new [BoxedState] from a strongly-typed message pub fn new(msg: T) -> Self diff --git a/ractor/src/actor/mod.rs b/ractor/src/actor/mod.rs index 9866b48d..4d7c5cf9 100644 --- a/ractor/src/actor/mod.rs +++ b/ractor/src/actor/mod.rs @@ -50,6 +50,7 @@ //! log to `stderr` for tracing. You can additionally setup a [panic hook](https://doc.rust-lang.org/std/panic/fn.set_hook.html) //! to do things like capturing backtraces on the unwinding panic. +use std::fmt::Debug; #[cfg(not(feature = "async-trait"))] use std::future::Future; use std::panic::AssertUnwindSafe; @@ -478,6 +479,16 @@ where name: Option, } +impl Debug for ActorRuntime { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + if let Some(name) = self.name.as_ref() { + write!(f, "ActorRuntime('{}' - {})", name, self.id) + } else { + write!(f, "ActorRuntime({})", self.id) + } + } +} + impl ActorRuntime where TActor: Actor, diff --git a/ractor/src/actor/supervision.rs b/ractor/src/actor/supervision.rs index d956a6d3..a6f36873 100644 --- a/ractor/src/actor/supervision.rs +++ b/ractor/src/actor/supervision.rs @@ -21,37 +21,37 @@ use super::{actor_cell::ActorCell, messages::SupervisionEvent}; use crate::ActorId; /// A supervision tree -#[derive(Default)] -pub struct SupervisionTree { +#[derive(Default, Debug)] +pub(crate) struct SupervisionTree { children: Arc>>, supervisor: Arc>>, } impl SupervisionTree { /// Push a child into the tere - pub fn insert_child(&self, child: ActorCell) { + pub(crate) fn insert_child(&self, child: ActorCell) { self.children.lock().unwrap().insert(child.get_id(), child); } /// Remove a specific actor from the supervision tree (e.g. actor died) - pub fn remove_child(&self, child: ActorId) { + pub(crate) fn remove_child(&self, child: ActorId) { self.children.lock().unwrap().remove(&child); } /// Push a parent into the tere - pub fn set_supervisor(&self, parent: ActorCell) { + pub(crate) fn set_supervisor(&self, parent: ActorCell) { *(self.supervisor.lock().unwrap()) = Some(parent); } /// Remove a specific actor from the supervision tree (e.g. actor died) - pub fn clear_supervisor(&self) { + pub(crate) fn clear_supervisor(&self) { *(self.supervisor.lock().unwrap()) = None; } /// Terminate all your supervised children and unlink them /// from the supervision tree since the supervisor is shutting down /// and can't deal with superivison events anyways - pub fn terminate_all_children(&self) { + pub(crate) fn terminate_all_children(&self) { let mut guard = self.children.lock().unwrap(); let cells = guard.iter().map(|(_, a)| a.clone()).collect::>(); guard.clear(); @@ -64,7 +64,7 @@ impl SupervisionTree { } /// Determine if the specified actor is a parent of this actor - pub fn is_child_of(&self, id: ActorId) -> bool { + pub(crate) fn is_child_of(&self, id: ActorId) -> bool { if let Some(parent) = &*(self.supervisor.lock().unwrap()) { parent.get_id() == id } else { @@ -73,7 +73,7 @@ impl SupervisionTree { } /// Send a notification to the supervisor. - pub fn notify_supervisor(&self, evt: SupervisionEvent) { + pub(crate) fn notify_supervisor(&self, evt: SupervisionEvent) { if let Some(parent) = &*(self.supervisor.lock().unwrap()) { let _ = parent.send_supervisor_evt(evt); } @@ -81,13 +81,13 @@ impl SupervisionTree { /// Retrieve the number of supervised children #[cfg(test)] - pub fn get_num_children(&self) -> usize { + pub(crate) fn get_num_children(&self) -> usize { self.children.lock().unwrap().len() } /// Retrieve the number of supervised children #[cfg(test)] - pub fn get_num_parents(&self) -> usize { + pub(crate) fn get_num_parents(&self) -> usize { usize::from(self.supervisor.lock().unwrap().is_some()) } } diff --git a/ractor/src/factory/discard.rs b/ractor/src/factory/discard.rs index f5a30dd0..49aa73ff 100644 --- a/ractor/src/factory/discard.rs +++ b/ractor/src/factory/discard.rs @@ -9,7 +9,7 @@ use super::JobKey; use crate::Message; /// The discard mode of a factory -#[derive(Eq, PartialEq, Clone, Copy)] +#[derive(Debug, Eq, PartialEq, Clone, Copy)] pub enum DiscardMode { /// Discard oldest incoming jobs under backpressure Oldest, @@ -28,6 +28,7 @@ pub enum DiscardMode { /// workers. The workers "think" it's static, but the factory handles the dynamics. /// This way the factory can keep the [DynamicDiscardHandler] as a single, uncloned /// instance. It also moves NUM_WORKER calculations to 1. +#[derive(Debug)] pub(crate) enum WorkerDiscardSettings { None, Static { limit: usize, mode: DiscardMode }, @@ -84,6 +85,26 @@ pub enum DiscardSettings { }, } +impl std::fmt::Debug for DiscardSettings { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + DiscardSettings::None => { + write!(f, "DiscardSettings::None") + } + DiscardSettings::Static { limit, mode } => f + .debug_struct("DiscardSettings::Static") + .field("limit", limit) + .field("mode", mode) + .finish(), + DiscardSettings::Dynamic { limit, mode, .. } => f + .debug_struct("DiscardSettings::Dynamic") + .field("limit", limit) + .field("mode", mode) + .finish(), + } + } +} + impl DiscardSettings { pub(crate) fn get_worker_settings(&self) -> WorkerDiscardSettings { match &self { @@ -128,6 +149,7 @@ pub trait DynamicDiscardController: Send + Sync + 'static { } /// Reason for discarding a job +#[derive(Debug)] pub enum DiscardReason { /// The job TTLd TtlExpired, diff --git a/ractor/src/factory/factoryimpl.rs b/ractor/src/factory/factoryimpl.rs index 77a3e90b..99344180 100644 --- a/ractor/src/factory/factoryimpl.rs +++ b/ractor/src/factory/factoryimpl.rs @@ -7,6 +7,7 @@ use std::cmp::Ordering; use std::collections::HashMap; +use std::fmt::Debug; use std::marker::PhantomData; use std::sync::Arc; @@ -38,7 +39,7 @@ const PING_FREQUENCY: Duration = Duration::from_millis(150); const PING_FREQUENCY: Duration = Duration::from_millis(10_000); const CALCULATE_FREQUENCY: Duration = Duration::from_millis(100); -#[derive(Eq, PartialEq)] +#[derive(Debug, Eq, PartialEq)] enum DrainState { NotDraining, Draining, @@ -49,6 +50,7 @@ enum DrainState { /// /// This is a placeholder instance which contains all of the type specifications /// for the factories properties +#[derive(Debug)] pub struct Factory where TKey: JobKey, @@ -148,6 +150,30 @@ where pub stats: Option>, } +impl Debug + for FactoryArguments +where + TKey: JobKey, + TMsg: Message, + TWorkerStart: Message, + TWorker: Actor< + Msg = WorkerMessage, + Arguments = WorkerStartContext, + >, + TRouter: Router, + TQueue: Queue, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("FactoryArguments") + .field("num_initial_workers", &self.num_initial_workers) + .field("router", &std::any::type_name::()) + .field("queue", &std::any::type_name::()) + .field("discard_settings", &self.discard_settings) + .field("dead_mans_switch", &self.dead_mans_switch) + .finish() + } +} + /// Builder for [FactoryArguments] which can be used to build the /// [Factory]'s startup arguments. pub struct FactoryArgumentsBuilder @@ -176,6 +202,30 @@ where stats: Option>, } +impl Debug + for FactoryArgumentsBuilder +where + TKey: JobKey, + TMsg: Message, + TWorkerStart: Message, + TWorker: Actor< + Msg = WorkerMessage, + Arguments = WorkerStartContext, + >, + TRouter: Router, + TQueue: Queue, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("FactoryArgumentsBuilder") + .field("num_initial_workers", &self.num_initial_workers) + .field("router", &std::any::type_name::()) + .field("queue", &std::any::type_name::()) + .field("discard_settings", &self.discard_settings) + .field("dead_mans_switch", &self.dead_mans_switch) + .finish() + } +} + impl FactoryArgumentsBuilder where @@ -373,6 +423,32 @@ where lifecycle_hooks: Option>>, } +impl Debug + for FactoryState +where + TKey: JobKey, + TMsg: Message, + TWorkerStart: Message, + TWorker: Actor< + Msg = WorkerMessage, + Arguments = WorkerStartContext, + >, + TRouter: Router, + TQueue: Queue, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("FactoryState") + .field("factory_name", &self.factory_name) + .field("pool_size", &self.pool_size) + .field("router", &std::any::type_name::()) + .field("queue", &std::any::type_name::()) + .field("discard_settings", &self.discard_settings) + .field("dead_mans_switch", &self.dead_mans_switch) + .field("drain_state", &self.drain_state) + .finish() + } +} + impl FactoryState where diff --git a/ractor/src/factory/job.rs b/ractor/src/factory/job.rs index 8225e3f4..ff9c1d08 100644 --- a/ractor/src/factory/job.rs +++ b/ractor/src/factory/job.rs @@ -8,12 +8,14 @@ use std::fmt::Debug; use std::{hash::Hash, time::SystemTime}; -use crate::RpcReplyPort; use crate::{concurrency::Duration, Message}; +use crate::{ActorRef, RpcReplyPort}; #[cfg(feature = "cluster")] use crate::{message::BoxedDowncastErr, BytesConvertable}; +use super::FactoryMessage; + /// Represents a key to a job. Needs to be hashable for routing properties. Additionally needs /// to be serializable for remote factories #[cfg(feature = "cluster")] @@ -129,6 +131,18 @@ where pub accepted: Option>>, } +impl Debug for Job +where + TKey: JobKey, + TMsg: Message, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Job") + .field("options", &self.options) + .finish() + } +} + #[cfg(feature = "cluster")] impl Job where @@ -286,6 +300,164 @@ where } } +/// A retriable job is a job which will automatically be resubmitted to the factory in the event of +/// a factory worker's failure (panic or unhandled error). This wraps the inner message in a struct which +/// captures the drop, and if there's still some retries left, will reschedule the work to the factory. +/// +/// CAVEATS: +/// +/// 1. This is unable to handle front-of-queue loadshedding, and regular loadshed events will +/// cause this message to be retried if there is still retries left and the job isn't expired. +/// 2. Consumable types are not well supported here without some wrapping in Option types, which is +/// because the value is handled everywhere as `&mut ref`` due to the drop implementation requiring that +/// it be so. This means that RPCs using [crate::concurrency::oneshot]s likely won't work without +/// some real painful ergonomics. +/// 3. Upon successful handling of the job, you need to mark it as `completed()` at the end of your +/// handling logic to state that it shouldn't be retried on drop. +pub struct RetriableJob { + /// The key to the retriable job + pub key: TKey, + /// The message, which will be retried until it's completed. + pub message: Option, + /// The remaining number of retries this job can be + /// rescheduled in the factory + pub retries_remaining: Option, + state: Option<(JobOptions, ActorRef>)>, +} + +impl Debug for RetriableJob +where + TKey: JobKey, + TMsg: Message, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RetriableJob") + .field("retries_remaining", &self.retries_remaining) + .finish() + } +} + +#[cfg(feature = "cluster")] +impl Message for RetriableJob {} + +impl Drop for RetriableJob { + fn drop(&mut self) { + if self.retries_remaining == Some(0) { + // no more retries left (None or Some(>0) mean there's still retries left) + return; + } + if self.message.is_none() { + // The payload has been consumed, there's nothing to retry + return; + } + let Some((options, factory)) = self.state.as_ref() else { + // can't do a retry if the factory and options are not available + return; + }; + let job = Self { + key: self.key.clone(), + message: self.message.take(), + retries_remaining: self.retries_remaining.map(|a| a - 1), + state: Some((options.clone(), factory.clone())), + }; + let job = Job { + accepted: None, + key: self.key.clone(), + msg: job, + options: options.clone(), + }; + tracing::info!("Attempting message retry"); + // SAFETY: A silent-drop here is OK should the dispatch to the factory fail. This is + // because if a worker died, it would be a silent drop anyways + _ = factory.cast(FactoryMessage::Dispatch(job)); + } +} + +impl ActorRef>> +where + TKey: JobKey, + TMsg: Message, +{ + /// When you're talking to a factory, which accepts "retriable" jobs, this + /// convenience function sets up the retriable state for you and converts + /// your job to a retriable equivalent. + /// + /// * `job`: The [Job] which will be auto-converted into a [RetriableJob] for you + /// however the `accepted` field, if set, will be dropped as [RetriableJob]s do not + /// support accepted replies + /// * `num_retries`: The number of retries to attempt re-executing the job for if failures + /// occur. [Some(`num`)] will retry up to a specified limit, similar to a TTL, and [None] means + /// to retry forever. + /// + /// Returns the propogated error message from the underlying `cast` operation to the factory + /// [ActorRef]. + #[allow(clippy::type_complexity)] + pub fn submit_retriable_job( + &self, + Job { + key, msg, options, .. + }: Job, + num_retries: Option, + ) -> Result<(), crate::MessagingErr>>> { + let mut retriable = RetriableJob::new(key.clone(), msg, num_retries); + retriable.capture_retry_state(&options, self.clone()); + let job = Job::> { + accepted: None, + key: key.clone(), + msg: retriable, + options, + }; + self.cast(FactoryMessage::Dispatch(job)) + } +} + +impl RetriableJob { + /// Construct a new retriable job with the provided number of retries. If the + /// retries are non, that means retry forever + pub fn new(key: TKey, message: TMessage, num_retries: Option) -> Self { + Self { + key, + message: Some(message), + retries_remaining: num_retries, + state: None, + } + } + + /// This needs to be called prior to sending the job to the factory the first time. This is where the factory's + /// handle and the job's options are captured which are needed to facilitate retries + /// + /// ``` + /// use ractor::factory::*; + /// + /// let key = (); + /// let message = 42; + /// let mut job = Job { + /// key: key.clone(), + /// message: RetriableJob::new(key, message, Some(2)), + /// options: JobOptions::default(), + /// accepted: None, + /// }; + /// job.message.capture_retry_state(&job.options, factory); + /// + /// _ = factory.cast(FactoryMessage::Dispatch(job)); + /// + /// ``` + pub fn capture_retry_state( + &mut self, + options: &JobOptions, + factory: ActorRef>, + ) { + self.state = Some((options.clone(), factory)); + } + + /// Tell this message to not be retried upon being dropped, since it + /// was handled successfully. + pub fn completed(&mut self) { + self.retries_remaining = Some(0); + self.message = None; + } +} + #[cfg(feature = "cluster")] #[cfg(test)] mod tests { diff --git a/ractor/src/factory/mod.rs b/ractor/src/factory/mod.rs index e3d94604..4e32aa39 100644 --- a/ractor/src/factory/mod.rs +++ b/ractor/src/factory/mod.rs @@ -194,7 +194,7 @@ pub use discard::{ DiscardHandler, DiscardMode, DiscardReason, DiscardSettings, DynamicDiscardController, }; pub use factoryimpl::{Factory, FactoryArguments, FactoryArgumentsBuilder}; -pub use job::{Job, JobKey, JobOptions}; +pub use job::{Job, JobKey, JobOptions, RetriableJob}; pub use lifecycle::FactoryLifecycleHooks; pub use worker::{ DeadMansSwitchConfiguration, WorkerBuilder, WorkerCapacityController, WorkerMessage, @@ -212,6 +212,7 @@ pub type WorkerId = usize; /// in-host communication. This means if you're communicating to a factory you would /// send only a serialized [Job] which would automatically be converted to a /// [FactoryMessage::Dispatch(Job)] +#[derive(Debug)] pub enum FactoryMessage where TKey: JobKey, diff --git a/ractor/src/factory/queues.rs b/ractor/src/factory/queues.rs index f52cc522..253b0c1e 100644 --- a/ractor/src/factory/queues.rs +++ b/ractor/src/factory/queues.rs @@ -6,6 +6,7 @@ //! Queue implementations for Factories use std::collections::VecDeque; +use std::fmt::Debug; use std::marker::PhantomData; use std::sync::Arc; @@ -148,6 +149,16 @@ where q: VecDeque>, } +impl Debug for DefaultQueue +where + TKey: JobKey, + TMsg: Message, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "DefaultQueue({} items)", self.q.len()) + } +} + impl Default for DefaultQueue where TKey: JobKey, @@ -229,6 +240,19 @@ where _p: PhantomData TPriority>, } +impl Debug + for PriorityQueue +where + TKey: JobKey, + TMsg: Message, + TPriority: Priority, + TPriorityManager: PriorityManager, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "PriorityQueue({} items)", self.len()) + } +} + impl PriorityQueue where diff --git a/ractor/src/factory/routing.rs b/ractor/src/factory/routing.rs index c243a017..19842cc5 100644 --- a/ractor/src/factory/routing.rs +++ b/ractor/src/factory/routing.rs @@ -25,6 +25,7 @@ where } /// The possible results from a routing operation. +#[derive(Debug)] pub enum RouteResult where TKey: JobKey, @@ -92,6 +93,7 @@ where macro_rules! impl_routing_mode { ($routing_mode: ident, $doc:expr) => { #[doc = $doc] + #[derive(Debug)] pub struct $routing_mode where TKey: JobKey, @@ -256,6 +258,7 @@ where /// Factory will dispatch to the next worker in order. /// /// Workers will have jobs placed into their incoming message queue's +#[derive(Debug)] pub struct RoundRobinRouting where TKey: JobKey, @@ -324,6 +327,7 @@ where /// /// The factory maintains no queue in this scenario, and jobs are pushed /// to worker's queues. +#[derive(Debug)] pub struct CustomRouting where TKey: JobKey, diff --git a/ractor/src/factory/tests/worker_lifecycle.rs b/ractor/src/factory/tests/worker_lifecycle.rs index 33063073..11be3e8d 100644 --- a/ractor/src/factory/tests/worker_lifecycle.rs +++ b/ractor/src/factory/tests/worker_lifecycle.rs @@ -3,12 +3,14 @@ // This source code is licensed under both the MIT license found in the // LICENSE-MIT file in the root directory of this source tree. +use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicU16; use std::sync::atomic::Ordering; use std::sync::Arc; use crate::concurrency::sleep; use crate::concurrency::Duration; +use crate::concurrency::JoinHandle; use crate::Actor; use crate::ActorProcessingErr; use crate::ActorRef; @@ -175,3 +177,130 @@ async fn test_worker_death_restarts_and_gets_next_message() { factory.stop(None); factory_handle.await.unwrap(); } + +enum MockFactoryMessage { + Boom(bool, crate::concurrency::MpscUnboundedSender<()>), +} + +#[cfg(feature = "cluster")] +impl Message for MockFactoryMessage {} + +/// Create a mock factory with specific, defined logic +async fn make_mock_factory( + panicked: Arc, + mock_logic: F, +) -> ( + ActorRef>>, + JoinHandle<()>, +) +where + F: Fn(&mut Job<(), RetriableJob<(), MockFactoryMessage>>, Arc) + + Send + + Sync + + 'static, +{ + struct MockFactory + where + F2: Fn(&mut Job<(), RetriableJob<(), MockFactoryMessage>>, Arc) + + Send + + Sync + + 'static, + { + message_handler_logic: F2, + panicked: Arc, + } + + #[crate::async_trait] + impl Actor for MockFactory + where + F2: Fn(&mut Job<(), RetriableJob<(), MockFactoryMessage>>, Arc) + + Send + + Sync + + 'static, + { + type Msg = FactoryMessage<(), RetriableJob<(), MockFactoryMessage>>; + type State = (); + type Arguments = (); + + async fn pre_start( + &self, + _: ActorRef, + _: Self::Arguments, + ) -> Result { + Ok(()) + } + + async fn handle( + &self, + _myself: ActorRef, + message: Self::Msg, + _state: &mut Self::State, + ) -> Result<(), ActorProcessingErr> { + if let FactoryMessage::Dispatch(mut job) = message { + (self.message_handler_logic)(&mut job, self.panicked.clone()); + } + Ok(()) + } + } + + Actor::spawn( + None, + MockFactory { + message_handler_logic: mock_logic, + panicked, + }, + (), + ) + .await + .expect("Failed to spawn mock factory") +} + +#[crate::concurrency::test] +#[tracing_test::traced_test] +async fn test_factory_can_silent_retry() { + let panicked = Arc::new(AtomicBool::new(false)); + + let (factory, handle) = + make_mock_factory(panicked.clone(), move |msg, panic_check| { + match msg.msg.message.as_mut() { + Some(MockFactoryMessage::Boom(should_panic, response)) => { + if *should_panic { + assert!(!panic_check.load(std::sync::atomic::Ordering::SeqCst), "We already fake-panicked once during this test, and we should have updated our internal message state to not doing it > 1 time"); + panic_check.store(true, std::sync::atomic::Ordering::SeqCst); + *should_panic = false; + // Simulate a panic by dropping the message without sending a reply + return; + } + tracing::info!("Sending response"); + _ = response.send(()); + } + _ => { + tracing::info!("Got handler with no message payload"); + } + } + msg.msg.completed(); + }) + .await; + let (tx, mut rx) = crate::concurrency::mpsc_unbounded(); + + let message = MockFactoryMessage::Boom(true, tx); + factory + .submit_retriable_job( + Job { + accepted: None, + options: JobOptions::default(), + key: (), + msg: message, + }, + Some(1), + ) + .expect("Failed to dispatch job"); + + // wait for RPC to complete + let result = rx.recv().await; + assert!(result.is_some()); + + assert!(panicked.load(std::sync::atomic::Ordering::SeqCst)); + factory.stop(None); + handle.await.unwrap(); +} diff --git a/ractor/src/factory/worker.rs b/ractor/src/factory/worker.rs index 2b73d924..dca8b78f 100644 --- a/ractor/src/factory/worker.rs +++ b/ractor/src/factory/worker.rs @@ -6,6 +6,7 @@ //! Factory worker properties use std::collections::{HashMap, VecDeque}; +use std::fmt::Debug; use std::sync::Arc; use crate::concurrency::{Duration, Instant, JoinHandle}; @@ -21,6 +22,7 @@ use super::WorkerId; use super::{DiscardHandler, DiscardReason, JobOptions}; /// The configuration for the dead-man's switch functionality +#[derive(Debug)] pub struct DeadMansSwitchConfiguration { /// Duration before determining worker is stuck pub detection_timeout: Duration, @@ -61,6 +63,7 @@ pub trait WorkerCapacityController: 'static + Send + Sync { } /// Message to a worker +#[derive(Debug)] pub enum WorkerMessage where TKey: JobKey, @@ -86,6 +89,7 @@ where } /// Startup context data (`Arguments`) which are passed to a worker on start +#[derive(Debug)] pub struct WorkerStartContext where TKey: JobKey, @@ -148,6 +152,22 @@ where pub(crate) is_draining: bool, } +impl Debug for WorkerProperties +where + TKey: JobKey, + TMsg: Message, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("WorkerProperties") + .field("wid", &self.wid) + .field("actor", &self.actor) + .field("factory_name", &self.factory_name) + .field("discard_settings", &self.discard_settings) + .field("is_draining", &self.is_draining) + .finish() + } +} + impl WorkerProperties where TKey: JobKey, diff --git a/ractor/src/lib.rs b/ractor/src/lib.rs index 191d873b..ea26c66f 100644 --- a/ractor/src/lib.rs +++ b/ractor/src/lib.rs @@ -143,12 +143,18 @@ //! are how an actor's supervisor(s) are notified of events of their children and can handle lifetime events for them. //! 4. Messages: Regular, user-defined, messages are the last channel of communication to actors. They are the lowest priority of the 4 message types and denote general actor work. The first //! 3 messages types (signals, stop, supervision) are generally quiet unless it's a lifecycle event for the actor, but this channel is the "work" channel doing what your actor wants to do! - -#![warn(unused_imports)] -// #![warn(unsafe_code)] -#![warn(missing_docs)] -#![warn(unused_crate_dependencies)] -// #![cfg_attr(docsrs, feature(doc_cfg))] +#![warn( + dead_code, + missing_debug_implementations, + missing_docs, + rust_2018_idioms, + rustdoc::all, + rustdoc::missing_crate_level_docs, + unreachable_pub, + unused_imports, + unused_variables, + unused_crate_dependencies +)] // ======================== Modules ======================== // diff --git a/ractor/src/message.rs b/ractor/src/message.rs index fb91f7cc..d0a44792 100644 --- a/ractor/src/message.rs +++ b/ractor/src/message.rs @@ -26,6 +26,7 @@ impl std::error::Error for BoxedDowncastErr {} /// Represents a serialized call or cast message #[cfg(feature = "cluster")] +#[derive(Debug)] pub enum SerializedMessage { /// A cast (one-way) with the serialized payload Cast { @@ -66,6 +67,16 @@ pub struct BoxedMessage { pub(crate) span: Option, } +impl std::fmt::Debug for BoxedMessage { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + if self.msg.is_some() { + write!(f, "BoxedMessage(Local)") + } else { + write!(f, "BoxedMessage(Serialized)") + } + } +} + /// Message type for an actor. Generally an enum /// which muxes the various types of inner-messages the actor /// supports diff --git a/ractor/src/pg/mod.rs b/ractor/src/pg/mod.rs index f2629588..8c5bd4ea 100644 --- a/ractor/src/pg/mod.rs +++ b/ractor/src/pg/mod.rs @@ -91,7 +91,7 @@ pub const ALL_GROUPS_NOTIFICATION: &str = "__world_group_"; mod tests; /// Represents a change in a process group's membership -#[derive(Clone)] +#[derive(Clone, Debug)] pub enum GroupChangeMessage { /// Some actors joined a group Join(ScopeName, GroupName, Vec), @@ -119,7 +119,7 @@ impl GroupChangeMessage { /// Represents the combination of a `ScopeName` and a `GroupName` /// that uniquely identifies a specific group in a specific scope -#[derive(Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] pub struct ScopeGroupKey { /// the `ScopeName` scope: ScopeName, diff --git a/ractor/src/port/output/mod.rs b/ractor/src/port/output/mod.rs index 2ef783aa..2c3e2e1b 100644 --- a/ractor/src/port/output/mod.rs +++ b/ractor/src/port/output/mod.rs @@ -10,7 +10,7 @@ //! to them which are automatically forwarded to downstream actors waiting for inputs. They optionally //! have a message transformer attached to them to convert them to the appropriate message type -use std::sync::RwLock; +use std::{fmt::Debug, sync::RwLock}; use crate::concurrency::JoinHandle; use tokio::sync::broadcast as pubsub; @@ -40,6 +40,12 @@ where subscriptions: RwLock>, } +impl Debug for OutputPort { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "OutputPort({})", std::any::type_name::()) + } +} + impl Default for OutputPort where TMsg: OutputMessage, @@ -118,17 +124,17 @@ struct OutputPortSubscription { impl OutputPortSubscription { /// Determine if the subscription is dead - pub fn is_dead(&self) -> bool { + pub(crate) fn is_dead(&self) -> bool { self.handle.is_finished() } /// Stop the subscription, by aborting the underlying [JoinHandle] - pub fn stop(&mut self) { + pub(crate) fn stop(&mut self) { self.handle.abort(); } /// Create a new subscription - pub fn new( + pub(crate) fn new( mut port: pubsub::Receiver>, converter: F, receiver: ActorRef,