Skip to content

Commit

Permalink
Marking that the converter function in the derived actor is cross-thr…
Browse files Browse the repository at this point in the history
…ead safe since it's just an Into() call.
  • Loading branch information
slawlor committed Jan 9, 2025
1 parent d385bc9 commit a5b6189
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 33 deletions.
2 changes: 1 addition & 1 deletion ractor/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ractor"
version = "0.14.4"
version = "0.14.5"
authors = ["Sean Lawlor", "Evan Au", "Dillon George"]
description = "A actor framework for Rust"
documentation = "https://docs.rs/ractor"
Expand Down
24 changes: 16 additions & 8 deletions ractor/src/actor/derived_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,21 @@ use std::sync::Arc;
/// kitchen_actor_handle.await.unwrap();
/// }
/// ```
#[derive(Clone)]
pub struct DerivedActorRef<TFrom> {
converter: Arc<dyn Fn(TFrom) -> Result<(), MessagingErr<TFrom>>>,
inner: ActorCell,
pub struct DerivedActorRef<TFrom: Message> {
converter: Arc<dyn Fn(TFrom) -> Result<(), MessagingErr<TFrom>> + Send + Sync + 'static>,
pub(crate) inner: ActorCell,
}

impl<TFrom> std::fmt::Debug for DerivedActorRef<TFrom> {
impl<TFrom: Message> Clone for DerivedActorRef<TFrom> {
fn clone(&self) -> Self {
Self {
converter: self.converter.clone(),
inner: self.inner.clone(),
}
}
}

impl<TFrom: Message> std::fmt::Debug for DerivedActorRef<TFrom> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DerivedActorRef")
.field("cell", &self.inner)
Expand All @@ -155,15 +163,15 @@ impl<TFrom> std::fmt::Debug for DerivedActorRef<TFrom> {
}

// Allows all the functionality of ActorCell on DerivedActorRef
impl<TMessage> std::ops::Deref for DerivedActorRef<TMessage> {
impl<TMessage: Message> std::ops::Deref for DerivedActorRef<TMessage> {
type Target = ActorCell;

fn deref(&self) -> &Self::Target {
&self.inner
}
}

impl<TFrom> DerivedActorRef<TFrom> {
impl<TFrom: Message> DerivedActorRef<TFrom> {
/// Casts the message to the target message type of [ActorCell] and sends it
///
/// * `message` - The message to send
Expand Down Expand Up @@ -193,7 +201,7 @@ impl<TMessage: Message> ActorRef<TMessage> {
pub fn get_derived<TFrom>(&self) -> DerivedActorRef<TFrom>
where
TMessage: From<TFrom>,
TFrom: TryFrom<TMessage>,
TFrom: Message + TryFrom<TMessage>,
{
let actor_ref = self.clone();
let cast_and_send = move |msg: TFrom| {
Expand Down
87 changes: 65 additions & 22 deletions ractor/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,53 @@
use crate::concurrency::{self, Duration, JoinHandle};

use crate::{ActorCell, ActorRef, Message, MessagingErr, RpcReplyPort};
use crate::{ActorCell, ActorRef, DerivedActorRef, Message, MessagingErr, RpcReplyPort};

pub mod call_result;
pub use call_result::CallResult;
#[cfg(test)]
mod tests;

fn internal_cast<F, TMessage>(sender: F, msg: TMessage) -> Result<(), MessagingErr<TMessage>>
where
F: Fn(TMessage) -> Result<(), MessagingErr<TMessage>>,
TMessage: Message,
{
sender(msg)
}

async fn internal_call<F, TMessage, TReply, TMsgBuilder>(
sender: F,
msg_builder: TMsgBuilder,
timeout_option: Option<Duration>,
) -> Result<CallResult<TReply>, MessagingErr<TMessage>>
where
F: Fn(TMessage) -> Result<(), MessagingErr<TMessage>>,
TMessage: Message,
TMsgBuilder: FnOnce(RpcReplyPort<TReply>) -> TMessage,
{
let (tx, rx) = concurrency::oneshot();
let port: RpcReplyPort<TReply> = match timeout_option {
Some(duration) => (tx, duration).into(),
None => tx.into(),
};
sender(msg_builder(port))?;

// wait for the reply
Ok(if let Some(duration) = timeout_option {
match crate::concurrency::timeout(duration, rx).await {
Ok(Ok(result)) => CallResult::Success(result),
Ok(Err(_send_err)) => CallResult::SenderError,
Err(_timeout_err) => CallResult::Timeout,
}
} else {
match rx.await {
Ok(result) => CallResult::Success(result),
Err(_send_err) => CallResult::SenderError,
}
})
}

/// Sends an asynchronous request to the specified actor, ignoring if the
/// actor is alive or healthy and simply returns immediately
///
Expand All @@ -107,7 +147,7 @@ pub fn cast<TMessage>(actor: &ActorCell, msg: TMessage) -> Result<(), MessagingE
where
TMessage: Message,
{
actor.send_message::<TMessage>(msg)
internal_cast(|m| actor.send_message::<TMessage>(m), msg)
}

/// Sends an asynchronous request to the specified actor, building a one-time
Expand All @@ -129,26 +169,7 @@ where
TMessage: Message,
TMsgBuilder: FnOnce(RpcReplyPort<TReply>) -> TMessage,
{
let (tx, rx) = concurrency::oneshot();
let port: RpcReplyPort<TReply> = match timeout_option {
Some(duration) => (tx, duration).into(),
None => tx.into(),
};
actor.send_message::<TMessage>(msg_builder(port))?;

// wait for the reply
Ok(if let Some(duration) = timeout_option {
match crate::concurrency::timeout(duration, rx).await {
Ok(Ok(result)) => CallResult::Success(result),
Ok(Err(_send_err)) => CallResult::SenderError,
Err(_timeout_err) => CallResult::Timeout,
}
} else {
match rx.await {
Ok(result) => CallResult::Success(result),
Err(_send_err) => CallResult::SenderError,
}
})
internal_call(|m| actor.send_message(m), msg_builder, timeout_option).await
}

/// Sends an asynchronous request to the specified actors, building a one-time
Expand Down Expand Up @@ -327,3 +348,25 @@ where
)
}
}

impl<TMessage> DerivedActorRef<TMessage>
where
TMessage: Message,
{
/// Alias of [cast]
pub fn cast(&self, msg: TMessage) -> Result<(), MessagingErr<TMessage>> {
internal_cast(|m| self.send_message(m), msg)
}

/// Alias of [call]
pub async fn call<TReply, TMsgBuilder>(
&self,
msg_builder: TMsgBuilder,
timeout_option: Option<Duration>,
) -> Result<CallResult<TReply>, MessagingErr<TMessage>>
where
TMsgBuilder: FnOnce(RpcReplyPort<TReply>) -> TMessage,
{
internal_call(|m| self.send_message(m), msg_builder, timeout_option).await
}
}
36 changes: 36 additions & 0 deletions ractor/src/time/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,3 +216,39 @@ where
kill_after(period, self.get_cell())
}
}

/// Add the timing functionality on top of the [crate::ActorRef]
impl<TMessage> crate::DerivedActorRef<TMessage>
where
TMessage: crate::Message,
{
/// Alias of [send_interval]
pub fn send_interval<F>(&self, period: Duration, msg: F) -> JoinHandle<()>
where
F: Fn() -> TMessage + Send + 'static,
{
send_interval::<TMessage, F>(period, self.get_cell(), msg)
}

/// Alias of [send_after]
pub fn send_after<F>(
&self,
period: Duration,
msg: F,
) -> JoinHandle<Result<(), MessagingErr<TMessage>>>
where
F: FnOnce() -> TMessage + Send + 'static,
{
send_after::<TMessage, F>(period, self.get_cell(), msg)
}

/// Alias of [exit_after]
pub fn exit_after(&self, period: Duration) -> JoinHandle<()> {
exit_after(period, self.get_cell())
}

/// Alias of [kill_after]
pub fn kill_after(&self, period: Duration) -> JoinHandle<()> {
kill_after(period, self.get_cell())
}
}
2 changes: 1 addition & 1 deletion ractor_cluster/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ractor_cluster"
version = "0.14.4"
version = "0.14.5"
authors = ["Sean Lawlor <slawlor>"]
description = "Distributed cluster environment of Ractor actors"
documentation = "https://docs.rs/ractor"
Expand Down
2 changes: 1 addition & 1 deletion ractor_cluster_derive/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ractor_cluster_derive"
version = "0.14.4"
version = "0.14.5"
authors = ["Sean Lawlor <slawlor>"]
description = "Derives for ractor_cluster"
license = "MIT"
Expand Down

0 comments on commit a5b6189

Please sign in to comment.