Skip to content

Commit

Permalink
Add monitor API back (#317)
Browse files Browse the repository at this point in the history
* Re-add the monitor API, but with an option in the map to save allocations when unused.

Additionally move to Option<HashMap> types to save on allocations for regular supervisions (if the actor doesn't supervise anything, we don't need to create the empty map)

* Gate monitor API behind a feature, update crate version, add CI coverage on monitors, and update API and README docs
  • Loading branch information
slawlor authored Jan 8, 2025
1 parent 0f0483f commit b0cd1a0
Show file tree
Hide file tree
Showing 10 changed files with 229 additions and 18 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ jobs:
- name: Test ractor_cluster with async_trait
package: ractor_cluster
flags: -F async-trait
- name: Test ractor with the monitor API
package: ractor
flags: -F monitors

steps:
- uses: actions/checkout@main
Expand Down
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ Install `ractor` by adding the following to your Cargo.toml dependencies.

```toml
[dependencies]
ractor = "0.13"
ractor = "0.14"
```

The minimum supported Rust version (MSRV) of `ractor` is `1.64`. However to utilize the native `async fn` support in traits and not rely on the `async-trait` crate's desugaring functionliaty, you need to be on Rust version `>= 1.75`. The stabilization of `async fn` in traits [was recently added](https://blog.rust-lang.org/2023/12/21/async-fn-rpit-in-traits.html).
Expand All @@ -77,6 +77,8 @@ The minimum supported Rust version (MSRV) of `ractor` is `1.64`. However to util

1. `cluster`, which exposes various functionality required for `ractor_cluster` to set up and manage a cluster of actors over a network link. This is work-in-progress and is being tracked in [#16](https://github.com/slawlor/ractor/issues/16).
2. `async-std`, which enables usage of `async-std`'s asynchronous runtime instead of the `tokio` runtime. **However** `tokio` with the `sync` feature remains a dependency because we utilize the messaging synchronization primatives from `tokio` regardless of runtime as they are not specific to the `tokio` runtime. This work is tracked in [#173](https://github.com/slawlor/ractor/pull/173). You can remove default features to "minimize" the tokio dependencies to just the synchronization primatives.
3. `monitors`, Adds support for an erlang-style monitoring api which is an alternative to direct linkage. Akin to [Process Monitors](https://www.erlang.org/doc/system/ref_man_processes.html#monitors)
4. `message_span_propogation`, Propagates the span through the message between actors to keep tracing context.

## Working with Actors

Expand Down
3 changes: 2 additions & 1 deletion ractor/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ractor"
version = "0.14.3"
version = "0.14.4"
authors = ["Sean Lawlor", "Evan Au", "Dillon George"]
description = "A actor framework for Rust"
documentation = "https://docs.rs/ractor"
Expand All @@ -19,6 +19,7 @@ unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)'] }
[features]
### Other features
cluster = []
monitors = []
message_span_propogation = []
tokio_runtime = ["tokio/time", "tokio/rt", "tokio/macros", "tokio/tracing"]
blanket_serde = ["serde", "pot", "cluster"]
Expand Down
19 changes: 19 additions & 0 deletions ractor/src/actor/actor_cell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,25 @@ impl ActorCell {
self.inner.tree.clear_supervisor();
}

/// Monitor the provided [super::Actor] for supervision events. An actor in `ractor` can
/// only have a single supervisor, denoted by the `link` function, however they
/// may have multiple `monitors`. Monitor's receive copies of the [SupervisionEvent]s,
/// with non-cloneable information removed.
///
/// * `who`: The actor to monitor
#[cfg(feature = "monitors")]
pub fn monitor(&self, who: ActorCell) {
who.inner.tree.set_monitor(self.clone());
}

/// Stop monitoring the provided [super::Actor] for supervision events.
///
/// * `who`: The actor to stop monitoring
#[cfg(feature = "monitors")]
pub fn unmonitor(&self, who: ActorCell) {
who.inner.tree.remove_monitor(self.get_id());
}

/// Kill this [super::Actor] forcefully (terminates async work)
pub fn kill(&self) {
let _ = self.inner.send_signal(Signal::Kill);
Expand Down
21 changes: 21 additions & 0 deletions ractor/src/actor/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,27 @@ impl SupervisionEvent {
pub fn actor_id(&self) -> Option<super::actor_id::ActorId> {
self.actor_cell().map(|cell| cell.get_id())
}

/// Clone the supervision event, without requiring inner data
/// be cloneable. This means that the actor error (if present) is converted
/// to a string and copied as well as the state upon termination being not
/// propogated. If the state were cloneable, we could propogate it, however
/// that restriction is overly restrictive, so we've avoided it.
#[cfg(feature = "monitors")]
pub(crate) fn clone_no_data(&self) -> Self {
match self {
Self::ActorStarted(who) => Self::ActorStarted(who.clone()),
Self::ActorFailed(who, what) => {
Self::ActorFailed(who.clone(), From::from(format!("{what}")))
}
Self::ProcessGroupChanged(what) => Self::ProcessGroupChanged(what.clone()),
Self::ActorTerminated(who, _state, msg) => {
Self::ActorTerminated(who.clone(), None, msg.as_ref().cloned())
}
#[cfg(feature = "cluster")]
Self::PidLifecycleEvent(evt) => Self::PidLifecycleEvent(evt.clone()),
}
}
}

impl Debug for SupervisionEvent {
Expand Down
79 changes: 66 additions & 13 deletions ractor/src/actor/supervision.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,32 +10,39 @@
//! when a child actor starts, stops, or panics (when possible). The supervisor can then decide
//! how to handle the event. Should it restart the actor, leave it dead, potentially die itself
//! notifying the supervisor's supervisor? That's up to the implementation of the [super::Actor]
//!
//! This is currently an initial implementation of [Erlang supervisors](https://www.erlang.org/doc/man/supervisor.html)
//! which will be expanded upon as the library develops.
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::sync::Mutex;

use super::{actor_cell::ActorCell, messages::SupervisionEvent};
use crate::ActorId;

/// A supervision tree
#[derive(Default, Debug)]
pub(crate) struct SupervisionTree {
children: Arc<Mutex<HashMap<ActorId, ActorCell>>>,
supervisor: Arc<Mutex<Option<ActorCell>>>,
children: Mutex<Option<HashMap<ActorId, ActorCell>>>,
supervisor: Mutex<Option<ActorCell>>,
#[cfg(feature = "monitors")]
monitors: Mutex<Option<HashMap<ActorId, ActorCell>>>,
}

impl SupervisionTree {
/// Push a child into the tere
pub(crate) fn insert_child(&self, child: ActorCell) {
self.children.lock().unwrap().insert(child.get_id(), child);
let mut guard = self.children.lock().unwrap();
if let Some(map) = &mut *(guard) {
map.insert(child.get_id(), child);
} else {
*guard = Some(HashMap::from_iter([(child.get_id(), child)]));
}
}

/// Remove a specific actor from the supervision tree (e.g. actor died)
pub(crate) fn remove_child(&self, child: ActorId) {
self.children.lock().unwrap().remove(&child);
let mut guard = self.children.lock().unwrap();
if let Some(map) = &mut *(guard) {
map.remove(&child);
}
}

/// Push a parent into the tere
Expand All @@ -53,13 +60,40 @@ impl SupervisionTree {
self.supervisor.lock().unwrap().clone()
}

/// Set a monitor of this supervision tree
#[cfg(feature = "monitors")]
pub(crate) fn set_monitor(&self, who: ActorCell) {
let mut guard = self.monitors.lock().unwrap();
if let Some(map) = &mut *guard {
map.insert(who.get_id(), who);
} else {
*guard = Some(HashMap::from_iter([(who.get_id(), who)]))
}
}

/// Remove a specific monitor from the supervision tree
#[cfg(feature = "monitors")]
pub(crate) fn remove_monitor(&self, who: ActorId) {
let mut guard = self.monitors.lock().unwrap();
if let Some(map) = &mut *guard {
map.remove(&who);
if map.is_empty() {
*guard = None;
}
}
}

/// Terminate all your supervised children and unlink them
/// from the supervision tree since the supervisor is shutting down
/// and can't deal with superivison events anyways
pub(crate) fn terminate_all_children(&self) {
let mut guard = self.children.lock().unwrap();
let cells = guard.values().cloned().collect::<Vec<_>>();
guard.clear();
let cells = if let Some(map) = &mut *guard {
map.values().cloned().collect()
} else {
vec![]
};
*guard = None;
// drop the guard to not deadlock on double-link
drop(guard);
for cell in cells {
Expand Down Expand Up @@ -141,20 +175,39 @@ impl SupervisionTree {
/// Return all linked children
pub(crate) fn get_children(&self) -> Vec<ActorCell> {
let guard = self.children.lock().unwrap();
guard.values().cloned().collect()
if let Some(map) = &*guard {
map.values().cloned().collect()
} else {
vec![]
}
}

/// Send a notification to the supervisor.
///
/// CAVEAT: Monitors get notified first, in order to save an unnecessary
/// clone if there are no monitors.
pub(crate) fn notify_supervisor(&self, evt: SupervisionEvent) {
#[cfg(feature = "monitors")]
if let Some(monitors) = &mut *(self.monitors.lock().unwrap()) {
// We notify the monitors on a best-effort basis, and if we fail to send the event, we remove
// the monitor
monitors.retain(|_, v| v.send_supervisor_evt(evt.clone_no_data()).is_ok());
}

if let Some(parent) = &*(self.supervisor.lock().unwrap()) {
let _ = parent.send_supervisor_evt(evt);
_ = parent.send_supervisor_evt(evt);
}
}

/// Retrieve the number of supervised children
#[cfg(test)]
pub(crate) fn get_num_children(&self) -> usize {
self.children.lock().unwrap().len()
let guard = self.children.lock().unwrap();
if let Some(map) = &*guard {
map.len()
} else {
0
}
}

/// Retrieve the number of supervised children
Expand Down
107 changes: 107 additions & 0 deletions ractor/src/actor/tests/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1567,3 +1567,110 @@ async fn draining_children_will_shutdown_parent_too() {
// Child's post-stop should have been called.
assert_eq!(1, flag.load(Ordering::Relaxed));
}

#[crate::concurrency::test]
#[tracing_test::traced_test]
#[cfg(feature = "monitors")]
async fn test_simple_monitor() {
struct Peer;
struct Monitor {
counter: Arc<AtomicU8>,
}

#[cfg_attr(feature = "async-trait", crate::async_trait)]
impl Actor for Peer {
type Msg = ();
type State = ();
type Arguments = ();

async fn pre_start(
&self,
_: ActorRef<Self::Msg>,
_: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
Ok(())
}

async fn handle(
&self,
myself: ActorRef<Self::Msg>,
_: Self::Msg,
_: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
myself.stop(Some("oh no!".to_string()));
Ok(())
}
}

#[cfg_attr(feature = "async-trait", crate::async_trait)]
impl Actor for Monitor {
type Msg = ();
type State = ();
type Arguments = ();

async fn pre_start(
&self,
_: ActorRef<Self::Msg>,
_: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
Ok(())
}

async fn handle_supervisor_evt(
&self,
_: ActorRef<Self::Msg>,
evt: SupervisionEvent,
_: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
if let SupervisionEvent::ActorTerminated(_who, _state, Some(msg)) = evt {
if msg.as_str() == "oh no!" {
self.counter.fetch_add(1, Ordering::Relaxed);
}
}
Ok(())
}
}

let count = Arc::new(AtomicU8::new(0));

let (p, ph) = Actor::spawn(None, Peer, ())
.await
.expect("Failed to start peer");
let (m, mh) = Actor::spawn(
None,
Monitor {
counter: count.clone(),
},
(),
)
.await
.expect("Faield to start monitor");

m.monitor(p.get_cell());

// stopping the peer should notify the monitor, who can capture the state
p.cast(()).expect("Failed to contact peer");
periodic_check(
|| count.load(Ordering::Relaxed) == 1,
Duration::from_secs(1),
)
.await;
ph.await.unwrap();

let (p, ph) = Actor::spawn(None, Peer, ())
.await
.expect("Failed to start peer");
m.monitor(p.get_cell());
m.unmonitor(p.get_cell());

p.cast(()).expect("Failed to contact peer");
ph.await.unwrap();

// The count doesn't increment when the peer exits (we give some time
// to schedule the supervision evt)
crate::concurrency::sleep(Duration::from_millis(100)).await;
assert_eq!(1, count.load(Ordering::Relaxed));

m.stop(None);
mh.await.unwrap();
}
5 changes: 4 additions & 1 deletion ractor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
//!
//! ```toml
//! [dependencies]
//! ractor = "0.13"
//! ractor = "0.14"
//! ```
//!
//! The minimum supported Rust version (MSRV) is 1.64. However if you disable the `async-trait` feature, then you need Rust >= 1.75 due to the native
Expand Down Expand Up @@ -129,6 +129,9 @@
//! to its supervisor yet. However failures in `post_start`, `handle`, `handle_supervisor_evt`, `post_stop` will notify the supervisor should a failure
//! occur. See [crate::Actor] documentation for more information
//!
//! There is additionally a "monitor" API which gives non-direct-supervision logic style monitoring akin to Erlang's [process monitors](https://www.erlang.org/doc/system/ref_man_processes.html#monitors).
//! This functionality is opt-in via feature `monitors` on the `ractor` crate.
//!
//! ## Messaging actors
//!
//! The means of communication between actors is that they pass messages to each other. A developer can define any message type which is `Send + 'static` and it
Expand Down
4 changes: 3 additions & 1 deletion ractor_cluster/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ractor_cluster"
version = "0.14.3"
version = "0.14.4"
authors = ["Sean Lawlor <slawlor>"]
description = "Distributed cluster environment of Ractor actors"
documentation = "https://docs.rs/ractor"
Expand All @@ -15,6 +15,8 @@ build = "src/build.rs"
rust-version = "1.64"

[features]
monitors = ["ractor/monitors"]
message_span_propogation = ["ractor/message_span_propogation"]
async-trait = ["dep:async-trait", "ractor/async-trait"]

default = []
Expand Down
2 changes: 1 addition & 1 deletion ractor_cluster_derive/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ractor_cluster_derive"
version = "0.14.3"
version = "0.14.4"
authors = ["Sean Lawlor <slawlor>"]
description = "Derives for ractor_cluster"
license = "MIT"
Expand Down

0 comments on commit b0cd1a0

Please sign in to comment.