Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add silent-retry support for factory messages + support for factories without async-trait #275

Merged
merged 2 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ractor/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ractor"
version = "0.12.2"
version = "0.12.3"
authors = ["Sean Lawlor", "Evan Au", "Dillon George"]
description = "A actor framework for Rust"
documentation = "https://docs.rs/ractor"
Expand Down
6 changes: 4 additions & 2 deletions ractor/src/actor/actor_cell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ impl ActorPortSet {
/// Returns [Ok(`TState`)] when the future completes without
/// signal interruption, [Err(Signal)] in the event the
/// signal interrupts the async work.
pub async fn run_with_signal<TState>(
pub(crate) async fn run_with_signal<TState>(
&mut self,
future: impl std::future::Future<Output = TState>,
) -> Result<TState, Signal>
Expand Down Expand Up @@ -148,7 +148,9 @@ impl ActorPortSet {
///
/// Returns [Ok(ActorPortMessage)] on a successful message reception, [MessagingErr]
/// in the event any of the channels is closed.
pub async fn listen_in_priority(&mut self) -> Result<ActorPortMessage, MessagingErr<()>> {
pub(crate) async fn listen_in_priority(
&mut self,
) -> Result<ActorPortMessage, MessagingErr<()>> {
#[cfg(feature = "async-std")]
{
crate::concurrency::select! {
Expand Down
35 changes: 22 additions & 13 deletions ractor/src/actor/actor_properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub(crate) struct ActorProperties {
}

impl ActorProperties {
pub fn new<TActor>(
pub(crate) fn new<TActor>(
name: Option<ActorName>,
) -> (
Self,
Expand All @@ -57,7 +57,7 @@ impl ActorProperties {
Self::new_remote::<TActor>(name, crate::actor::actor_id::get_new_local_id())
}

pub fn new_remote<TActor>(
pub(crate) fn new_remote<TActor>(
name: Option<ActorName>,
id: ActorId,
) -> (
Expand Down Expand Up @@ -96,7 +96,7 @@ impl ActorProperties {
)
}

pub fn get_status(&self) -> ActorStatus {
pub(crate) fn get_status(&self) -> ActorStatus {
match self.status.load(Ordering::SeqCst) {
0u8 => ActorStatus::Unstarted,
1u8 => ActorStatus::Starting,
Expand All @@ -108,11 +108,11 @@ impl ActorProperties {
}
}

pub fn set_status(&self, status: ActorStatus) {
pub(crate) fn set_status(&self, status: ActorStatus) {
self.status.store(status as u8, Ordering::SeqCst);
}

pub fn send_signal(&self, signal: Signal) -> Result<(), MessagingErr<()>> {
pub(crate) fn send_signal(&self, signal: Signal) -> Result<(), MessagingErr<()>> {
self.signal
.lock()
.unwrap()
Expand All @@ -122,14 +122,17 @@ impl ActorProperties {
})
}

pub fn send_supervisor_evt(
pub(crate) fn send_supervisor_evt(
&self,
message: SupervisionEvent,
) -> Result<(), MessagingErr<SupervisionEvent>> {
self.supervision.send(message).map_err(|e| e.into())
}

pub fn send_message<TMessage>(&self, message: TMessage) -> Result<(), MessagingErr<TMessage>>
pub(crate) fn send_message<TMessage>(
&self,
message: TMessage,
) -> Result<(), MessagingErr<TMessage>>
where
TMessage: Message,
{
Expand All @@ -156,7 +159,7 @@ impl ActorProperties {
})
}

pub fn drain(&self) -> Result<(), MessagingErr<()>> {
pub(crate) fn drain(&self) -> Result<(), MessagingErr<()>> {
let _ = self
.status
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |f| {
Expand All @@ -172,7 +175,7 @@ impl ActorProperties {
}

#[cfg(feature = "cluster")]
pub fn send_serialized(
pub(crate) fn send_serialized(
&self,
message: SerializedMessage,
) -> Result<(), MessagingErr<SerializedMessage>> {
Expand All @@ -189,7 +192,10 @@ impl ActorProperties {
})
}

pub fn send_stop(&self, reason: Option<String>) -> Result<(), MessagingErr<StopMessage>> {
pub(crate) fn send_stop(
&self,
reason: Option<String>,
) -> Result<(), MessagingErr<StopMessage>> {
let msg = reason.map(StopMessage::Reason).unwrap_or(StopMessage::Stop);
self.stop
.lock()
Expand All @@ -201,7 +207,7 @@ impl ActorProperties {
}

/// Send the stop signal, threading in a OneShot sender which notifies when the shutdown is completed
pub async fn send_stop_and_wait(
pub(crate) async fn send_stop_and_wait(
&self,
reason: Option<String>,
) -> Result<(), MessagingErr<StopMessage>> {
Expand All @@ -212,15 +218,18 @@ impl ActorProperties {
}

/// Send the kill signal, threading in a OneShot sender which notifies when the shutdown is completed
pub async fn send_signal_and_wait(&self, signal: Signal) -> Result<(), MessagingErr<()>> {
pub(crate) async fn send_signal_and_wait(
&self,
signal: Signal,
) -> Result<(), MessagingErr<()>> {
// first bind the wait handler
let rx = self.wait_handler.notified();
let _ = self.send_signal(signal);
rx.await;
Ok(())
}

pub fn notify_stop_listener(&self) {
pub(crate) fn notify_stop_listener(&self) {
self.wait_handler.notify_waiters();
}
}
6 changes: 6 additions & 0 deletions ractor/src/actor/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@
pub msg: Option<Box<dyn Any + Send>>,
}

impl Debug for BoxedState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "BoxedState")
}

Check warning on line 30 in ractor/src/actor/messages.rs

View check run for this annotation

Codecov / codecov/patch

ractor/src/actor/messages.rs#L28-L30

Added lines #L28 - L30 were not covered by tests
}

impl BoxedState {
/// Create a new [BoxedState] from a strongly-typed message
pub fn new<T>(msg: T) -> Self
Expand Down
11 changes: 11 additions & 0 deletions ractor/src/actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
//! log to `stderr` for tracing. You can additionally setup a [panic hook](https://doc.rust-lang.org/std/panic/fn.set_hook.html)
//! to do things like capturing backtraces on the unwinding panic.

use std::fmt::Debug;
#[cfg(not(feature = "async-trait"))]
use std::future::Future;
use std::panic::AssertUnwindSafe;
Expand Down Expand Up @@ -478,6 +479,16 @@
name: Option<String>,
}

impl<TActor: Actor> Debug for ActorRuntime<TActor> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if let Some(name) = self.name.as_ref() {
write!(f, "ActorRuntime('{}' - {})", name, self.id)

Check warning on line 485 in ractor/src/actor/mod.rs

View check run for this annotation

Codecov / codecov/patch

ractor/src/actor/mod.rs#L483-L485

Added lines #L483 - L485 were not covered by tests
} else {
write!(f, "ActorRuntime({})", self.id)

Check warning on line 487 in ractor/src/actor/mod.rs

View check run for this annotation

Codecov / codecov/patch

ractor/src/actor/mod.rs#L487

Added line #L487 was not covered by tests
}
}

Check warning on line 489 in ractor/src/actor/mod.rs

View check run for this annotation

Codecov / codecov/patch

ractor/src/actor/mod.rs#L489

Added line #L489 was not covered by tests
}

impl<TActor> ActorRuntime<TActor>
where
TActor: Actor,
Expand Down
22 changes: 11 additions & 11 deletions ractor/src/actor/supervision.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,37 +21,37 @@ use super::{actor_cell::ActorCell, messages::SupervisionEvent};
use crate::ActorId;

/// A supervision tree
#[derive(Default)]
pub struct SupervisionTree {
#[derive(Default, Debug)]
pub(crate) struct SupervisionTree {
children: Arc<Mutex<HashMap<ActorId, ActorCell>>>,
supervisor: Arc<Mutex<Option<ActorCell>>>,
}

impl SupervisionTree {
/// Push a child into the tere
pub fn insert_child(&self, child: ActorCell) {
pub(crate) fn insert_child(&self, child: ActorCell) {
self.children.lock().unwrap().insert(child.get_id(), child);
}

/// Remove a specific actor from the supervision tree (e.g. actor died)
pub fn remove_child(&self, child: ActorId) {
pub(crate) fn remove_child(&self, child: ActorId) {
self.children.lock().unwrap().remove(&child);
}

/// Push a parent into the tere
pub fn set_supervisor(&self, parent: ActorCell) {
pub(crate) fn set_supervisor(&self, parent: ActorCell) {
*(self.supervisor.lock().unwrap()) = Some(parent);
}

/// Remove a specific actor from the supervision tree (e.g. actor died)
pub fn clear_supervisor(&self) {
pub(crate) fn clear_supervisor(&self) {
*(self.supervisor.lock().unwrap()) = None;
}

/// Terminate all your supervised children and unlink them
/// from the supervision tree since the supervisor is shutting down
/// and can't deal with superivison events anyways
pub fn terminate_all_children(&self) {
pub(crate) fn terminate_all_children(&self) {
let mut guard = self.children.lock().unwrap();
let cells = guard.iter().map(|(_, a)| a.clone()).collect::<Vec<_>>();
guard.clear();
Expand All @@ -64,7 +64,7 @@ impl SupervisionTree {
}

/// Determine if the specified actor is a parent of this actor
pub fn is_child_of(&self, id: ActorId) -> bool {
pub(crate) fn is_child_of(&self, id: ActorId) -> bool {
if let Some(parent) = &*(self.supervisor.lock().unwrap()) {
parent.get_id() == id
} else {
Expand All @@ -73,21 +73,21 @@ impl SupervisionTree {
}

/// Send a notification to the supervisor.
pub fn notify_supervisor(&self, evt: SupervisionEvent) {
pub(crate) fn notify_supervisor(&self, evt: SupervisionEvent) {
if let Some(parent) = &*(self.supervisor.lock().unwrap()) {
let _ = parent.send_supervisor_evt(evt);
}
}

/// Retrieve the number of supervised children
#[cfg(test)]
pub fn get_num_children(&self) -> usize {
pub(crate) fn get_num_children(&self) -> usize {
self.children.lock().unwrap().len()
}

/// Retrieve the number of supervised children
#[cfg(test)]
pub fn get_num_parents(&self) -> usize {
pub(crate) fn get_num_parents(&self) -> usize {
usize::from(self.supervisor.lock().unwrap().is_some())
}
}
4 changes: 2 additions & 2 deletions ractor/src/actor/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -967,7 +967,7 @@ fn returns_actor_references() {
async fn actor_failing_in_spawn_err_doesnt_poison_registries() {
struct Test;

#[crate::async_trait]
#[cfg_attr(feature = "async-trait", crate::async_trait)]
impl Actor for Test {
type Msg = ();
type State = ();
Expand All @@ -980,7 +980,7 @@ async fn actor_failing_in_spawn_err_doesnt_poison_registries() {

struct Test2;

#[crate::async_trait]
#[cfg_attr(feature = "async-trait", crate::async_trait)]
impl Actor for Test2 {
type Msg = ();
type State = ();
Expand Down
39 changes: 38 additions & 1 deletion ractor/src/factory/discard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
use crate::Message;

/// The discard mode of a factory
#[derive(Eq, PartialEq, Clone, Copy)]
#[derive(Debug, Eq, PartialEq, Clone, Copy)]
pub enum DiscardMode {
/// Discard oldest incoming jobs under backpressure
Oldest,
Expand All @@ -28,6 +28,7 @@
/// workers. The workers "think" it's static, but the factory handles the dynamics.
/// This way the factory can keep the [DynamicDiscardHandler] as a single, uncloned
/// instance. It also moves NUM_WORKER calculations to 1.
#[derive(Debug)]
pub(crate) enum WorkerDiscardSettings {
None,
Static { limit: usize, mode: DiscardMode },
Expand Down Expand Up @@ -84,6 +85,26 @@
},
}

impl std::fmt::Debug for DiscardSettings {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {

Check warning on line 90 in ractor/src/factory/discard.rs

View check run for this annotation

Codecov / codecov/patch

ractor/src/factory/discard.rs#L89-L90

Added lines #L89 - L90 were not covered by tests
DiscardSettings::None => {
write!(f, "DiscardSettings::None")

Check warning on line 92 in ractor/src/factory/discard.rs

View check run for this annotation

Codecov / codecov/patch

ractor/src/factory/discard.rs#L92

Added line #L92 was not covered by tests
}
DiscardSettings::Static { limit, mode } => f
.debug_struct("DiscardSettings::Static")
.field("limit", limit)
.field("mode", mode)
.finish(),
DiscardSettings::Dynamic { limit, mode, .. } => f
.debug_struct("DiscardSettings::Dynamic")
.field("limit", limit)
.field("mode", mode)
.finish(),

Check warning on line 103 in ractor/src/factory/discard.rs

View check run for this annotation

Codecov / codecov/patch

ractor/src/factory/discard.rs#L94-L103

Added lines #L94 - L103 were not covered by tests
}
}

Check warning on line 105 in ractor/src/factory/discard.rs

View check run for this annotation

Codecov / codecov/patch

ractor/src/factory/discard.rs#L105

Added line #L105 was not covered by tests
}

impl DiscardSettings {
pub(crate) fn get_worker_settings(&self) -> WorkerDiscardSettings {
match &self {
Expand Down Expand Up @@ -124,10 +145,26 @@
/// `base_controller.factory.{FACTORY_NAME}.{STAT}`
///
/// If no factory name is set, then "all" will be inserted
#[cfg(feature = "async-trait")]
async fn compute(&mut self, current_threshold: usize) -> usize;

/// Compute the new threshold for discarding
///
/// If you want to utilize metrics exposed in [crate::factory::stats] you can gather them
/// by utilizing `stats_facebook::service_data::get_service_data_singleton` to retrieve a
/// accessor to `ServiceData` which you can then resolve stats by name (either timeseries or
/// counters)
///
/// The namespace of stats collected on the base controller factory are
/// `base_controller.factory.{FACTORY_NAME}.{STAT}`
///
/// If no factory name is set, then "all" will be inserted
#[cfg(not(feature = "async-trait"))]
fn compute(&mut self, current_threshold: usize) -> futures::future::BoxFuture<'_, usize>;
}

/// Reason for discarding a job
#[derive(Debug)]
pub enum DiscardReason {
/// The job TTLd
TtlExpired,
Expand Down
Loading
Loading