From d651856a64bb30c0c1e10780eee3b6482fafe0e7 Mon Sep 17 00:00:00 2001 From: Sean Lawlor Date: Wed, 22 Jan 2025 20:09:18 -0500 Subject: [PATCH] Fix async-std and re-enable Ci for async-std builds --- .github/workflows/ci.yaml | 15 ++- ractor/Cargo.toml | 2 +- ractor/src/actor/supervision.rs | 36 +++++-- ractor/src/actor/tests/supervisor.rs | 101 ++++++++---------- .../src/concurrency/async_std_primitives.rs | 20 ++++ ractor_cluster/Cargo.toml | 2 +- ractor_cluster_derive/Cargo.toml | 2 +- 7 files changed, 110 insertions(+), 68 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 8f805734..7d22b5ec 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -17,18 +17,27 @@ jobs: - name: Run the default tests package: ractor # flags: - - name: Test ractor without async-trait + - name: Test ractor with async-trait package: ractor - flags: --no-default-features -F tokio_runtime,message_span_propogation + flags: -F async-trait - name: Test ractor without span propogation package: ractor - flags: --no-default-features -F tokio_runtime,async-trait + flags: --no-default-features -F tokio_runtime - name: Test ractor with the `cluster` feature package: ractor flags: -F cluster - name: Test ractor with the `blanket_serde` feature package: ractor flags: -F blanket_serde + - name: Test ractor with async-std runtime + package: ractor + flags: --no-default-features -F async-std,message_span_propogation + - name: Test ractor with async-std runtime but no span propagation + package: ractor + flags: --no-default-features -F async-std + - name: Test ractor with async-std runtime and async-trait + package: ractor + flags: --no-default-features -F async-std,async-trait - name: Test ractor_cluster with native async traits package: ractor_cluster # flags: diff --git a/ractor/Cargo.toml b/ractor/Cargo.toml index 3887fbe6..7911dcd6 100644 --- a/ractor/Cargo.toml +++ b/ractor/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ractor" -version = "0.14.6" +version = "0.14.7" authors = ["Sean Lawlor", "Evan Au", "Dillon George"] description = "A actor framework for Rust" documentation = "https://docs.rs/ractor" diff --git a/ractor/src/actor/supervision.rs b/ractor/src/actor/supervision.rs index e8ac370d..cc4f15b5 100644 --- a/ractor/src/actor/supervision.rs +++ b/ractor/src/actor/supervision.rs @@ -134,10 +134,20 @@ impl SupervisionTree { } // drain the tasks while let Some(res) = js.join_next().await { - match res { - Err(err) if err.is_panic() => std::panic::resume_unwind(err.into_panic()), - Err(err) => panic!("{err}"), - _ => {} + #[cfg(feature = "async-std")] + { + match res { + Err(_) => panic!("JoinSet join error"), + _ => {} + } + } + #[cfg(not(feature = "async-std"))] + { + match res { + Err(err) if err.is_panic() => std::panic::resume_unwind(err.into_panic()), + Err(err) => panic!("{err}"), + _ => {} + } } } } @@ -155,10 +165,20 @@ impl SupervisionTree { } // drain the tasks while let Some(res) = js.join_next().await { - match res { - Err(err) if err.is_panic() => std::panic::resume_unwind(err.into_panic()), - Err(err) => panic!("{err}"), - _ => {} + #[cfg(feature = "async-std")] + { + match res { + Err(_) => panic!("JoinSet join error"), + _ => {} + } + } + #[cfg(not(feature = "async-std"))] + { + match res { + Err(err) if err.is_panic() => std::panic::resume_unwind(err.into_panic()), + Err(err) => panic!("{err}"), + _ => {} + } } } } diff --git a/ractor/src/actor/tests/supervisor.rs b/ractor/src/actor/tests/supervisor.rs index 5230e11e..fa497942 100644 --- a/ractor/src/actor/tests/supervisor.rs +++ b/ractor/src/actor/tests/supervisor.rs @@ -10,10 +10,7 @@ use std::sync::{ Arc, }; -use crate::{ - common_test::periodic_check, concurrency::Duration, message::BoxedDowncastErr, - ActorProcessingErr, -}; +use crate::{concurrency::Duration, message::BoxedDowncastErr, periodic_check, ActorProcessingErr}; use crate::{Actor, ActorCell, ActorRef, ActorStatus, SupervisionEvent}; @@ -77,8 +74,7 @@ async fn test_supervision_panic_in_post_startup() { // check that the panic was captured if let SupervisionEvent::ActorFailed(dead_actor, _panic_msg) = message { - self.flag - .store(dead_actor.get_id().pid(), Ordering::Relaxed); + self.flag.store(dead_actor.get_id().pid(), Ordering::SeqCst); this_actor.stop(None); } Ok(()) @@ -103,7 +99,7 @@ async fn test_supervision_panic_in_post_startup() { let (_, _) = tokio::join!(s_handle, c_handle); - assert_eq!(child_ref.get_id().pid(), flag.load(Ordering::Relaxed)); + assert_eq!(child_ref.get_id().pid(), flag.load(Ordering::SeqCst)); // supervisor relationship cleaned up correctly assert_eq!(0, supervisor_ref.get_num_children()); @@ -169,8 +165,7 @@ async fn test_supervision_error_in_post_startup() { // check that the panic was captured if let SupervisionEvent::ActorFailed(dead_actor, _panic_msg) = message { - self.flag - .store(dead_actor.get_id().pid(), Ordering::Relaxed); + self.flag.store(dead_actor.get_id().pid(), Ordering::SeqCst); this_actor.stop(None); } Ok(()) @@ -191,7 +186,7 @@ async fn test_supervision_error_in_post_startup() { let (_, _) = tokio::join!(s_handle, c_handle); - assert_eq!(child_ref.get_id().pid(), flag.load(Ordering::Relaxed)); + assert_eq!(child_ref.get_id().pid(), flag.load(Ordering::SeqCst)); // supervisor relationship cleaned up correctly assert_eq!(0, supervisor_ref.get_num_children()); @@ -258,8 +253,7 @@ async fn test_supervision_panic_in_handle() { // check that the panic was captured if let SupervisionEvent::ActorFailed(dead_actor, _panic_msg) = message { - self.flag - .store(dead_actor.get_id().pid(), Ordering::Relaxed); + self.flag.store(dead_actor.get_id().pid(), Ordering::SeqCst); this_actor.stop(None); } Ok(()) @@ -288,7 +282,7 @@ async fn test_supervision_panic_in_handle() { let _ = s_handle.await; let _ = c_handle.await; - assert_eq!(child_ref.get_id().pid(), flag.load(Ordering::Relaxed)); + assert_eq!(child_ref.get_id().pid(), flag.load(Ordering::SeqCst)); // supervisor relationship cleaned up correctly assert_eq!(0, supervisor_ref.get_num_children()); @@ -355,8 +349,7 @@ async fn test_supervision_error_in_handle() { // check that the panic was captured if let SupervisionEvent::ActorFailed(dead_actor, _panic_msg) = message { - self.flag - .store(dead_actor.get_id().pid(), Ordering::Relaxed); + self.flag.store(dead_actor.get_id().pid(), Ordering::SeqCst); this_actor.stop(None); } Ok(()) @@ -385,7 +378,7 @@ async fn test_supervision_error_in_handle() { let _ = s_handle.await; let _ = c_handle.await; - assert_eq!(child_ref.get_id().pid(), flag.load(Ordering::Relaxed)); + assert_eq!(child_ref.get_id().pid(), flag.load(Ordering::SeqCst)); // supervisor relationship cleaned up correctly assert_eq!(0, supervisor_ref.get_num_children()); @@ -444,8 +437,7 @@ async fn test_supervision_panic_in_post_stop() { // check that the panic was captured if let SupervisionEvent::ActorFailed(dead_actor, _panic_msg) = message { - self.flag - .store(dead_actor.get_id().pid(), Ordering::Relaxed); + self.flag.store(dead_actor.get_id().pid(), Ordering::SeqCst); this_actor.stop(None); } @@ -466,7 +458,7 @@ async fn test_supervision_panic_in_post_stop() { let _ = s_handle.await; let _ = c_handle.await; - assert_eq!(child_ref.get_id().pid(), flag.load(Ordering::Relaxed)); + assert_eq!(child_ref.get_id().pid(), flag.load(Ordering::SeqCst)); // supervisor relationship cleaned up correctly assert_eq!(0, supervisor_ref.get_num_children()); @@ -525,8 +517,7 @@ async fn test_supervision_error_in_post_stop() { // check that the panic was captured if let SupervisionEvent::ActorFailed(dead_actor, _panic_msg) = message { - self.flag - .store(dead_actor.get_id().pid(), Ordering::Relaxed); + self.flag.store(dead_actor.get_id().pid(), Ordering::SeqCst); this_actor.stop(None); } @@ -547,7 +538,7 @@ async fn test_supervision_error_in_post_stop() { let _ = s_handle.await; let _ = c_handle.await; - assert_eq!(child_ref.get_id().pid(), flag.load(Ordering::Relaxed)); + assert_eq!(child_ref.get_id().pid(), flag.load(Ordering::SeqCst)); // supervisor relationship cleaned up correctly assert_eq!(0, supervisor_ref.get_num_children()); @@ -642,8 +633,7 @@ async fn test_supervision_panic_in_supervisor_handle() { // check that the panic was captured if let SupervisionEvent::ActorFailed(dead_actor, _panic_msg) = message { - self.flag - .store(dead_actor.get_id().pid(), Ordering::Relaxed); + self.flag.store(dead_actor.get_id().pid(), Ordering::SeqCst); this_actor.stop(None); } Ok(()) @@ -688,7 +678,7 @@ async fn test_supervision_panic_in_supervisor_handle() { // check that we got the midpoint's ref id assert_eq!( midpoint_ref_clone.get_id().pid(), - flag.load(Ordering::Relaxed) + flag.load(Ordering::SeqCst) ); // supervisor relationship cleaned up correctly @@ -784,8 +774,7 @@ async fn test_supervision_error_in_supervisor_handle() { // check that the panic was captured if let SupervisionEvent::ActorFailed(dead_actor, _panic_msg) = message { - self.flag - .store(dead_actor.get_id().pid(), Ordering::Relaxed); + self.flag.store(dead_actor.get_id().pid(), Ordering::SeqCst); this_actor.stop(None); } Ok(()) @@ -830,7 +819,7 @@ async fn test_supervision_error_in_supervisor_handle() { // check that we got the midpoint's ref id assert_eq!( midpoint_ref_clone.get_id().pid(), - flag.load(Ordering::Relaxed) + flag.load(Ordering::SeqCst) ); // supervisor relationship cleaned up correctly @@ -928,9 +917,12 @@ async fn instant_supervised_spawns() { async fn handle_supervisor_evt( &self, _: ActorRef, - _: SupervisionEvent, + evt: SupervisionEvent, _: &mut Self::State, ) -> Result<(), ActorProcessingErr> { + if let SupervisionEvent::ActorStarted(_) = evt { + return Ok(()); + } Err(From::from( "Supervision event received when it shouldn't have been!", )) @@ -949,7 +941,7 @@ async fn instant_supervised_spawns() { counter: Arc, ) -> Result { // delay startup by some amount - crate::concurrency::sleep(Duration::from_millis(200)).await; + crate::concurrency::sleep(Duration::from_millis(100)).await; Ok(counter) } @@ -959,7 +951,8 @@ async fn instant_supervised_spawns() { _message: String, state: &mut Self::State, ) -> Result<(), ActorProcessingErr> { - state.fetch_add(1, Ordering::Relaxed); + tracing::error!("message: {_message}"); + state.fetch_add(1, Ordering::SeqCst); Ok(()) } } @@ -982,20 +975,25 @@ async fn instant_supervised_spawns() { } // actor is still starting up - assert_eq!(0, counter.load(Ordering::Relaxed)); + assert_eq!(0, counter.load(Ordering::SeqCst)); // wait for everything processed periodic_check( - || counter.load(Ordering::Relaxed) >= 10, + || actor.get_status() == ActorStatus::Running, Duration::from_secs(5), ) .await; + actor + .drain_and_wait(None) + .await + .expect("Failed to drain actor"); + assert_eq!(counter.load(Ordering::SeqCst), 10); // Cleanup supervisor.stop(None); shandle.await.unwrap(); - actor.stop(None); + // actor.stop(None); handles .await .unwrap() @@ -1071,8 +1069,7 @@ async fn test_supervisor_captures_dead_childs_state() { ) = message { if let Ok(1) = boxed_state.take::() { - self.flag - .store(dead_actor.get_id().pid(), Ordering::Relaxed); + self.flag.store(dead_actor.get_id().pid(), Ordering::SeqCst); this_actor.stop(None); } } @@ -1100,7 +1097,7 @@ async fn test_supervisor_captures_dead_childs_state() { let (_, _) = tokio::join!(s_handle, c_handle); - assert_eq!(child_ref.get_id().pid(), flag.load(Ordering::Relaxed)); + assert_eq!(child_ref.get_id().pid(), flag.load(Ordering::SeqCst)); // supervisor relationship cleaned up correctly assert_eq!(0, supervisor_ref.get_num_children()); @@ -1176,7 +1173,7 @@ async fn test_supervisor_exit_doesnt_call_child_post_stop() { _this_actor: ActorRef, _state: &mut Self::State, ) -> Result<(), ActorProcessingErr> { - self.post_stop_calls.fetch_add(1, Ordering::Relaxed); + self.post_stop_calls.fetch_add(1, Ordering::SeqCst); Ok(()) } } @@ -1232,7 +1229,7 @@ async fn test_supervisor_exit_doesnt_call_child_post_stop() { c_handle.await.unwrap(); // Child's post-stop should NOT have been called. - assert_eq!(0, flag.load(Ordering::Relaxed)); + assert_eq!(0, flag.load(Ordering::SeqCst)); } #[crate::concurrency::test] @@ -1260,7 +1257,7 @@ async fn stopping_children_and_wait_during_parent_shutdown() { _this_actor: ActorRef, _state: &mut Self::State, ) -> Result<(), ActorProcessingErr> { - self.post_stop_calls.fetch_add(1, Ordering::Relaxed); + self.post_stop_calls.fetch_add(1, Ordering::SeqCst); Ok(()) } } @@ -1314,7 +1311,7 @@ async fn stopping_children_and_wait_during_parent_shutdown() { c_handle.await.unwrap(); // Child's post-stop should have been called. - assert_eq!(1, flag.load(Ordering::Relaxed)); + assert_eq!(1, flag.load(Ordering::SeqCst)); } #[crate::concurrency::test] @@ -1342,7 +1339,7 @@ async fn stopping_children_will_shutdown_parent_too() { _this_actor: ActorRef, _state: &mut Self::State, ) -> Result<(), ActorProcessingErr> { - self.post_stop_calls.fetch_add(1, Ordering::Relaxed); + self.post_stop_calls.fetch_add(1, Ordering::SeqCst); Ok(()) } } @@ -1397,7 +1394,7 @@ async fn stopping_children_will_shutdown_parent_too() { c_handle.await.unwrap(); // Child's post-stop should have been called. - assert_eq!(1, flag.load(Ordering::Relaxed)); + assert_eq!(1, flag.load(Ordering::SeqCst)); } #[crate::concurrency::test] @@ -1425,7 +1422,7 @@ async fn draining_children_and_wait_during_parent_shutdown() { _this_actor: ActorRef, _state: &mut Self::State, ) -> Result<(), ActorProcessingErr> { - self.post_stop_calls.fetch_add(1, Ordering::Relaxed); + self.post_stop_calls.fetch_add(1, Ordering::SeqCst); Ok(()) } } @@ -1479,7 +1476,7 @@ async fn draining_children_and_wait_during_parent_shutdown() { c_handle.await.unwrap(); // Child's post-stop should have been called. - assert_eq!(1, flag.load(Ordering::Relaxed)); + assert_eq!(1, flag.load(Ordering::SeqCst)); } #[crate::concurrency::test] @@ -1507,7 +1504,7 @@ async fn draining_children_will_shutdown_parent_too() { _this_actor: ActorRef, _state: &mut Self::State, ) -> Result<(), ActorProcessingErr> { - self.post_stop_calls.fetch_add(1, Ordering::Relaxed); + self.post_stop_calls.fetch_add(1, Ordering::SeqCst); Ok(()) } } @@ -1565,7 +1562,7 @@ async fn draining_children_will_shutdown_parent_too() { c_handle.await.unwrap(); // Child's post-stop should have been called. - assert_eq!(1, flag.load(Ordering::Relaxed)); + assert_eq!(1, flag.load(Ordering::SeqCst)); } #[crate::concurrency::test] @@ -1624,7 +1621,7 @@ async fn test_simple_monitor() { ) -> Result<(), ActorProcessingErr> { if let SupervisionEvent::ActorTerminated(_who, _state, Some(msg)) = evt { if msg.as_str() == "oh no!" { - self.counter.fetch_add(1, Ordering::Relaxed); + self.counter.fetch_add(1, Ordering::SeqCst); } } Ok(()) @@ -1650,11 +1647,7 @@ async fn test_simple_monitor() { // stopping the peer should notify the monitor, who can capture the state p.cast(()).expect("Failed to contact peer"); - periodic_check( - || count.load(Ordering::Relaxed) == 1, - Duration::from_secs(1), - ) - .await; + periodic_check(|| count.load(Ordering::SeqCst) == 1, Duration::from_secs(1)).await; ph.await.unwrap(); let (p, ph) = Actor::spawn(None, Peer, ()) @@ -1669,7 +1662,7 @@ async fn test_simple_monitor() { // The count doesn't increment when the peer exits (we give some time // to schedule the supervision evt) crate::concurrency::sleep(Duration::from_millis(100)).await; - assert_eq!(1, count.load(Ordering::Relaxed)); + assert_eq!(1, count.load(Ordering::SeqCst)); m.stop(None); mh.await.unwrap(); diff --git a/ractor/src/concurrency/async_std_primitives.rs b/ractor/src/concurrency/async_std_primitives.rs index e6f8ac64..c1d7b870 100644 --- a/ractor/src/concurrency/async_std_primitives.rs +++ b/ractor/src/concurrency/async_std_primitives.rs @@ -9,6 +9,7 @@ //! such as channels (see: https://github.com/tokio-rs/tokio/issues/4232#issuecomment-968329443). use std::{ + fmt::Debug, future::Future, pin::Pin, sync::{ @@ -28,6 +29,16 @@ pub struct JoinHandle { handle: Option>, is_done: Arc, } + +impl Debug for JoinHandle { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("JoinHandle") + .field("name", &self.is_done.load(Ordering::Relaxed)) + .field("handle", &self.handle.is_some()) + .finish() + } +} + impl JoinHandle { /// Determine if the handle is currently finished pub fn is_finished(&self) -> bool { @@ -74,6 +85,7 @@ pub type Instant = std::time::Instant; /// An asynchronous interval calculation which waits until /// a checkpoint time to tick. This is a replication of the /// basic functionality from `tokio`'s `Interval`. +#[derive(Debug, Clone)] pub struct Interval { dur: Duration, next_tick: Instant, @@ -110,6 +122,14 @@ pub struct JoinSet { set: FuturesUnordered>, } +impl Debug for JoinSet { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("JoinSet") + .field("size", &self.set.len()) + .finish() + } +} + impl JoinSet { /// Creates a new [JoinSet] pub fn new() -> JoinSet { diff --git a/ractor_cluster/Cargo.toml b/ractor_cluster/Cargo.toml index d6b6288e..68889a39 100644 --- a/ractor_cluster/Cargo.toml +++ b/ractor_cluster/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ractor_cluster" -version = "0.14.6" +version = "0.14.7" authors = ["Sean Lawlor "] description = "Distributed cluster environment of Ractor actors" documentation = "https://docs.rs/ractor" diff --git a/ractor_cluster_derive/Cargo.toml b/ractor_cluster_derive/Cargo.toml index ca527c49..1acae860 100644 --- a/ractor_cluster_derive/Cargo.toml +++ b/ractor_cluster_derive/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ractor_cluster_derive" -version = "0.14.6" +version = "0.14.7" authors = ["Sean Lawlor "] description = "Derives for ractor_cluster" license = "MIT"