Skip to content

Commit

Permalink
Support factories without the async-trait crate, meaning natively.
Browse files Browse the repository at this point in the history
Resolves #272
  • Loading branch information
slawlor committed Oct 16, 2024
1 parent 7e4f2e9 commit fc230d8
Show file tree
Hide file tree
Showing 10 changed files with 186 additions and 53 deletions.
4 changes: 2 additions & 2 deletions ractor/src/actor/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -967,7 +967,7 @@ fn returns_actor_references() {
async fn actor_failing_in_spawn_err_doesnt_poison_registries() {
struct Test;

#[crate::async_trait]
#[cfg_attr(feature = "async-trait", crate::async_trait)]
impl Actor for Test {
type Msg = ();
type State = ();
Expand All @@ -980,7 +980,7 @@ async fn actor_failing_in_spawn_err_doesnt_poison_registries() {

struct Test2;

#[crate::async_trait]
#[cfg_attr(feature = "async-trait", crate::async_trait)]
impl Actor for Test2 {
type Msg = ();
type State = ();
Expand Down
15 changes: 15 additions & 0 deletions ractor/src/factory/discard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,22 @@ pub trait DynamicDiscardController: Send + Sync + 'static {
/// `base_controller.factory.{FACTORY_NAME}.{STAT}`
///
/// If no factory name is set, then "all" will be inserted
#[cfg(feature = "async-trait")]
async fn compute(&mut self, current_threshold: usize) -> usize;

/// Compute the new threshold for discarding
///
/// If you want to utilize metrics exposed in [crate::factory::stats] you can gather them
/// by utilizing `stats_facebook::service_data::get_service_data_singleton` to retrieve a
/// accessor to `ServiceData` which you can then resolve stats by name (either timeseries or
/// counters)
///
/// The namespace of stats collected on the base controller factory are
/// `base_controller.factory.{FACTORY_NAME}.{STAT}`
///
/// If no factory name is set, then "all" will be inserted
#[cfg(not(feature = "async-trait"))]
fn compute(&mut self, current_threshold: usize) -> futures::future::BoxFuture<'_, usize>;
}

/// Reason for discarding a job
Expand Down
17 changes: 11 additions & 6 deletions ractor/src/factory/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ pub struct RetriableMessage<TKey: JobKey, TMessage: Message> {
#[allow(clippy::type_complexity)]
pub retry_hook: Option<Arc<dyn Fn(&TKey) + 'static + Send + Sync + RefUnwindSafe>>,

state: Option<(JobOptions, ActorRef<FactoryMessage<TKey, Self>>)>,
retry_state: Option<(JobOptions, ActorRef<FactoryMessage<TKey, Self>>)>,
}

impl<TKey, TMsg> Debug for RetriableMessage<TKey, TMsg>
Expand All @@ -379,7 +379,11 @@ where
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RetriableMessage")
.field("key", &self.key)
.field("strategy", &self.strategy)
.field("message", &self.message.is_some())
.field("retry_hook", &self.retry_hook.is_some())
.field("retry_state", &self.retry_state.is_some())
.finish()
}
}
Expand All @@ -389,12 +393,13 @@ impl<TKey: JobKey, TMessage: Message> Message for RetriableMessage<TKey, TMessag

impl<TKey: JobKey, TMessage: Message> Drop for RetriableMessage<TKey, TMessage> {
fn drop(&mut self) {
if self.strategy.has_retries() || self.message.is_none() {
tracing::trace!("Drop handler for retriable message executing {self:?}");
if !self.strategy.has_retries() || self.message.is_none() {
// no more retries left (None or Some(>0) mean there's still retries left)
// or the payload has been consumed
return;
}
let Some((options, factory)) = self.state.as_ref() else {
let Some((options, factory)) = self.retry_state.as_ref() else {
// can't do a retry if the factory and options are not available
return;
};
Expand All @@ -403,7 +408,7 @@ impl<TKey: JobKey, TMessage: Message> Drop for RetriableMessage<TKey, TMessage>
key: self.key.clone(),
message: self.message.take(),
strategy: self.strategy.decrement(),
state: Some((options.clone(), factory.clone())),
retry_state: Some((options.clone(), factory.clone())),
retry_hook: self.retry_hook.take(),
};
let job = Job {
Expand Down Expand Up @@ -463,7 +468,7 @@ impl<TKey: JobKey, TMessage: Message> RetriableMessage<TKey, TMessage> {
key,
message: Some(message),
strategy,
state: None,
retry_state: None,
retry_hook: None,
}
}
Expand Down Expand Up @@ -509,7 +514,7 @@ impl<TKey: JobKey, TMessage: Message> RetriableMessage<TKey, TMessage> {
options: &JobOptions,
factory: ActorRef<FactoryMessage<TKey, Self>>,
) {
self.state = Some((options.clone(), factory));
self.retry_state = Some((options.clone(), factory));
}

/// Mark this message to not be retried upon being dropped, since it
Expand Down
51 changes: 51 additions & 0 deletions ractor/src/factory/lifecycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ use crate::ActorProcessingErr;
use crate::ActorRef;
use crate::Message;
use crate::State;
#[cfg(not(feature = "async-trait"))]
use futures::{future::BoxFuture, FutureExt};

/// Hooks for [crate::factory::Factory] lifecycle events based on the
/// underlying actor's lifecycle.
Expand All @@ -31,24 +33,55 @@ where
///
/// WARNING: An error or panic returned here WILL shutdown the factory and notify supervisors
#[allow(unused_variables)]
#[cfg(feature = "async-trait")]
async fn on_factory_started(
&self,
factory_ref: ActorRef<FactoryMessage<TKey, TMsg>>,
) -> Result<(), ActorProcessingErr> {
Ok(())
}

/// Called when the factory has completed it's startup routine but
/// PRIOR to processing any messages. Just before this point, the factory
/// is ready to accept and process requests and all workers are started.
///
/// This hook is there to provide custom startup logic you want to make sure has run
/// prior to processing messages on workers
///
/// WARNING: An error or panic returned here WILL shutdown the factory and notify supervisors
#[allow(unused_variables)]
#[cfg(not(feature = "async-trait"))]
fn on_factory_started(
&self,
factory_ref: ActorRef<FactoryMessage<TKey, TMsg>>,
) -> BoxFuture<'_, Result<(), ActorProcessingErr>> {
async { Ok(()) }.boxed()
}

/// Called when the factory has completed it's shutdown routine but
/// PRIOR to fully exiting and notifying any relevant supervisors. Just prior
/// to this call the factory has processed its last message and will process
/// no more messages.
///
/// This hook is there to provide custom shutdown logic you want to make sure has run
/// prior to the factory fully exiting
#[cfg(feature = "async-trait")]
async fn on_factory_stopped(&self) -> Result<(), ActorProcessingErr> {
Ok(())
}

/// Called when the factory has completed it's shutdown routine but
/// PRIOR to fully exiting and notifying any relevant supervisors. Just prior
/// to this call the factory has processed its last message and will process
/// no more messages.
///
/// This hook is there to provide custom shutdown logic you want to make sure has run
/// prior to the factory fully exiting
#[cfg(not(feature = "async-trait"))]
fn on_factory_stopped(&self) -> BoxFuture<'_, Result<(), ActorProcessingErr>> {
async { Ok(()) }.boxed()
}

/// Called when the factory has received a signal to drain requests and exit after
/// draining has completed.
///
Expand All @@ -58,10 +91,28 @@ where
///
/// WARNING: An error or panic returned here WILL shutdown the factory and notify supervisors
#[allow(unused_variables)]
#[cfg(feature = "async-trait")]
async fn on_factory_draining(
&self,
factory_ref: ActorRef<FactoryMessage<TKey, TMsg>>,
) -> Result<(), ActorProcessingErr> {
Ok(())
}

/// Called when the factory has received a signal to drain requests and exit after
/// draining has completed.
///
/// This hook is to provide the ability to notify external services that the factory
/// is in the process of shutting down. If the factory is never "drained" formally,
/// this hook won't be called.
///
/// WARNING: An error or panic returned here WILL shutdown the factory and notify supervisors
#[allow(unused_variables)]
#[cfg(not(feature = "async-trait"))]
fn on_factory_draining(
&self,
factory_ref: ActorRef<FactoryMessage<TKey, TMsg>>,
) -> BoxFuture<'_, Result<(), ActorProcessingErr>> {
async { Ok(()) }.boxed()
}
}
8 changes: 8 additions & 0 deletions ractor/src/factory/tests/dynamic_discarding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
// This source code is licensed under both the MIT license found in the
// LICENSE-MIT file in the root directory of this source tree.

#[cfg(not(feature = "async-trait"))]
use futures::{future::BoxFuture, FutureExt};
use std::sync::atomic::AtomicU16;
use std::sync::atomic::Ordering;
use std::sync::Arc;
Expand Down Expand Up @@ -120,9 +122,15 @@ struct DiscardController {}

#[cfg_attr(feature = "async-trait", crate::async_trait)]
impl DynamicDiscardController for DiscardController {
#[cfg(feature = "async-trait")]
async fn compute(&mut self, _current_threshold: usize) -> usize {
10
}

#[cfg(not(feature = "async-trait"))]
fn compute(&mut self, _current_threshold: usize) -> BoxFuture<'_, usize> {
async { 10 }.boxed()
}
}

#[crate::concurrency::test]
Expand Down
9 changes: 9 additions & 0 deletions ractor/src/factory/tests/dynamic_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
use std::sync::Arc;

#[cfg(not(feature = "async-trait"))]
use futures::{future::BoxFuture, FutureExt};

use crate::concurrency::Duration;
use crate::Actor;
use crate::ActorProcessingErr;
Expand Down Expand Up @@ -200,9 +203,15 @@ async fn test_worker_pool_adjustment_automatic() {

#[cfg_attr(feature = "async-trait", crate::async_trait)]
impl WorkerCapacityController for DynamicWorkerController {
#[cfg(feature = "async-trait")]
async fn get_pool_size(&mut self, _current: usize) -> usize {
10
}

#[cfg(not(feature = "async-trait"))]
fn get_pool_size(&mut self, _current: usize) -> BoxFuture<'_, usize> {
async { 10 }.boxed()
}
}

let id_map = Arc::new(dashmap::DashSet::new());
Expand Down
39 changes: 39 additions & 0 deletions ractor/src/factory/tests/lifecycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ use std::sync::atomic::AtomicU8;
use std::sync::atomic::Ordering;
use std::sync::Arc;

#[cfg(not(feature = "async-trait"))]
use futures::{future::BoxFuture, FutureExt};

use crate::concurrency::sleep;
use crate::concurrency::Duration;
use crate::Actor;
Expand All @@ -25,6 +28,7 @@ struct AtomicHooks {

#[cfg_attr(feature = "async-trait", crate::async_trait)]
impl FactoryLifecycleHooks<(), ()> for AtomicHooks {
#[cfg(feature = "async-trait")]
async fn on_factory_started(
&self,
_factory_ref: ActorRef<FactoryMessage<(), ()>>,
Expand All @@ -33,18 +37,53 @@ impl FactoryLifecycleHooks<(), ()> for AtomicHooks {
Ok(())
}

#[cfg(not(feature = "async-trait"))]
fn on_factory_started(
&self,
_factory_ref: ActorRef<FactoryMessage<(), ()>>,
) -> BoxFuture<'_, Result<(), ActorProcessingErr>> {
async {
self.state.store(1, Ordering::SeqCst);
Ok(())
}
.boxed()
}

#[cfg(feature = "async-trait")]
async fn on_factory_stopped(&self) -> Result<(), ActorProcessingErr> {
self.state.store(3, Ordering::SeqCst);
Ok(())
}

#[cfg(not(feature = "async-trait"))]
fn on_factory_stopped(&self) -> BoxFuture<'_, Result<(), ActorProcessingErr>> {
async {
self.state.store(3, Ordering::SeqCst);
Ok(())
}
.boxed()
}

#[cfg(feature = "async-trait")]
async fn on_factory_draining(
&self,
_factory_ref: ActorRef<FactoryMessage<(), ()>>,
) -> Result<(), ActorProcessingErr> {
self.state.store(2, Ordering::SeqCst);
Ok(())
}

#[cfg(not(feature = "async-trait"))]
fn on_factory_draining(
&self,
_factory_ref: ActorRef<FactoryMessage<(), ()>>,
) -> BoxFuture<'_, Result<(), ActorProcessingErr>> {
async {
self.state.store(2, Ordering::SeqCst);
Ok(())
}
.boxed()
}
}

struct TestWorker;
Expand Down
Loading

0 comments on commit fc230d8

Please sign in to comment.