Skip to content

Commit

Permalink
Add proper span handling to factories through extension trait (#300)
Browse files Browse the repository at this point in the history
  • Loading branch information
slawlor authored Dec 17, 2024
1 parent 6a08fe2 commit f15be0c
Show file tree
Hide file tree
Showing 9 changed files with 596 additions and 115 deletions.
2 changes: 1 addition & 1 deletion ractor/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ractor"
version = "0.13.5"
version = "0.14.0"
authors = ["Sean Lawlor", "Evan Au", "Dillon George"]
description = "A actor framework for Rust"
documentation = "https://docs.rs/ractor"
Expand Down
87 changes: 78 additions & 9 deletions ractor/src/factory/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::sync::Arc;
use std::{hash::Hash, time::SystemTime};

use bon::Builder;
use tracing::Span;

use crate::{concurrency::Duration, Message};
use crate::{ActorRef, RpcReplyPort};
Expand Down Expand Up @@ -40,27 +41,91 @@ pub trait JobKey: Debug + Hash + Send + Sync + Clone + Eq + PartialEq + 'static
impl<T: Debug + Hash + Send + Sync + Clone + Eq + PartialEq + 'static> JobKey for T {}

/// Represents options for the specified job
#[derive(Debug, Eq, PartialEq, Clone)]
#[derive(Debug, PartialEq, Clone)]
pub struct JobOptions {
/// Time job was submitted from the client
pub submit_time: SystemTime,
submit_time: SystemTime,
/// Time job was processed by the factory
pub factory_time: SystemTime,
factory_time: SystemTime,
/// Time job was sent to a worker
pub worker_time: SystemTime,
worker_time: SystemTime,
/// Time-to-live for the job
pub ttl: Option<Duration>,
ttl: Option<Duration>,
/// The parent span we want to propagate to the worker.
/// Spans don't propagate over the wire in networks
span: Option<Span>,
}

impl Default for JobOptions {
fn default() -> Self {
impl JobOptions {
/// Create a new [JobOptions] instance, optionally supplying the ttl for the job
///
/// * `ttl` - The Time-to-live specification for this job, which is the maximum amount
/// of time the job can remain in the factory's (or worker's) queue before being expired
/// and discarded
///
/// Returns a new [JobOptions] instance.
pub fn new(ttl: Option<Duration>) -> Self {
let span = {
#[cfg(feature = "message_span_propogation")]
{
Some(Span::current())
}
#[cfg(not(feature = "message_span_propogation"))]
{
None
}
};
Self {
submit_time: SystemTime::now(),
factory_time: SystemTime::now(),
worker_time: SystemTime::now(),
ttl: None,
ttl,
span,
}
}

/// Retrieve the TTL for this job
pub fn ttl(&self) -> Option<Duration> {
self.ttl
}

/// Set the TTL for this job
pub fn set_ttl(&mut self, ttl: Option<Duration>) {
self.ttl = ttl;
}

/// Time the job was submitted to the factory
/// (i.e. the time `cast` was called)
pub fn submit_time(&self) -> SystemTime {
self.submit_time
}

/// Time the job was dispatched to a worker
pub fn worker_time(&self) -> SystemTime {
self.worker_time
}

/// Time the job was received by the factory and first either dispatched
/// or enqueued to the factory's queue
pub fn factory_time(&self) -> SystemTime {
self.factory_time
}

/// Clone the [Span] and return it which is attached
/// to this [JobOptions] instance.
pub fn span(&self) -> Option<Span> {
self.span.clone()
}

pub(crate) fn take_span(&mut self) -> Option<Span> {
self.span.take()
}
}

impl Default for JobOptions {
fn default() -> Self {
Self::new(None)
}
}

#[cfg(feature = "cluster")]
Expand All @@ -86,7 +151,10 @@ impl BytesConvertable for JobOptions {

fn from_bytes(mut data: Vec<u8>) -> Self {
if data.len() != 16 {
Self::default()
Self {
span: None,
..Default::default()
}
} else {
let ttl_bytes = data.split_off(8);

Expand All @@ -100,6 +168,7 @@ impl BytesConvertable for JobOptions {
} else {
None
},
span: None,
..Default::default()
}
}
Expand Down
57 changes: 21 additions & 36 deletions ractor/src/factory/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,53 +66,38 @@
//! /// the business logic for each message that will be done in parallel.
//! struct ExampleWorker;
//! #[cfg_attr(feature = "async-trait", ractor::async_trait)]
//! impl Actor for ExampleWorker {
//! type Msg = WorkerMessage<(), ExampleMessage>;
//! type State = WorkerStartContext<(), ExampleMessage, ()>;
//! type Arguments = WorkerStartContext<(), ExampleMessage, ()>;
//! impl Worker for ExampleWorker {
//! type Key = ();
//! type Message = ExampleMessage;
//! type State = ();
//! type Arguments = ();
//! async fn pre_start(
//! &self,
//! _myself: ActorRef<Self::Msg>,
//! wid: WorkerId,
//! factory: &ActorRef<FactoryMessage<(), ExampleMessage>>,
//! startup_context: Self::Arguments,
//! ) -> Result<Self::State, ActorProcessingErr> {
//! Ok(startup_context)
//! }
//! async fn handle(
//! &self,
//! _myself: ActorRef<Self::Msg>,
//! message: Self::Msg,
//! state: &mut Self::State,
//! wid: WorkerId,
//! factory: &ActorRef<FactoryMessage<(), ExampleMessage>>,
//! Job {msg, key, ..}: Job<(), ExampleMessage>,
//! _state: &mut Self::State,
//! ) -> Result<(), ActorProcessingErr> {
//! match message {
//! WorkerMessage::FactoryPing(time) => {
//! // This is a message which all factory workers **must**
//! // adhere to. It is a background processing message from the
//! // factory which is used for (a) metrics and (b) detecting
//! // stuck workers, i.e. workers which aren't making progress
//! // processing their messages
//! state
//! .factory
//! .cast(FactoryMessage::WorkerPong(state.wid, time.elapsed()))?;
//! // Actual business logic that we want to parallelize
//! tracing::trace!("Worker {} received {:?}", wid, msg);
//! match msg {
//! ExampleMessage::PrintValue(value) => {
//! tracing::info!("Worker {} printing value {value}", wid);
//! }
//! WorkerMessage::Dispatch(job) => {
//! // Actual business logic that we want to parallelize
//! tracing::trace!("Worker {} received {:?}", state.wid, job.msg);
//! match job.msg {
//! ExampleMessage::PrintValue(value) => {
//! tracing::info!("Worker {} printing value {value}", state.wid);
//! }
//! ExampleMessage::EchoValue(value, reply) => {
//! tracing::info!("Worker {} echoing value {value}", state.wid);
//! let _ = reply.send(value);
//! }
//! }
//! // job finished, on success or err we report back to the factory
//! state
//! .factory
//! .cast(FactoryMessage::Finished(state.wid, job.key))?;
//! ExampleMessage::EchoValue(value, reply) => {
//! tracing::info!("Worker {} echoing value {value}", wid);
//! let _ = reply.send(value);
//! }
//! }
//! Ok(())
//! Ok(key)
//! }
//! }
//! /// Used by the factory to build new [ExampleWorker]s.
Expand Down Expand Up @@ -200,7 +185,7 @@ pub use factoryimpl::{Factory, FactoryArguments, FactoryArgumentsBuilder};
pub use job::{Job, JobKey, JobOptions, MessageRetryStrategy, RetriableMessage};
pub use lifecycle::FactoryLifecycleHooks;
pub use worker::{
DeadMansSwitchConfiguration, WorkerBuilder, WorkerCapacityController, WorkerMessage,
DeadMansSwitchConfiguration, Worker, WorkerBuilder, WorkerCapacityController, WorkerMessage,
WorkerProperties, WorkerStartContext,
};

Expand Down
10 changes: 2 additions & 8 deletions ractor/src/factory/queues.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,10 +423,7 @@ mod tests {
key: 99,
accepted: None,
msg: (),
options: JobOptions {
ttl: Some(Duration::from_millis(1)),
..Default::default()
},
options: JobOptions::new(Some(Duration::from_millis(1))),
});

let oldest = queue.discard_oldest();
Expand Down Expand Up @@ -480,10 +477,7 @@ mod tests {
key: 99,
accepted: None,
msg: (),
options: JobOptions {
ttl: Some(Duration::from_millis(1)),
..Default::default()
},
options: JobOptions::new(Some(Duration::from_millis(1))),
});

// should discard lowest pri first
Expand Down
46 changes: 18 additions & 28 deletions ractor/src/factory/tests/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,47 +55,37 @@ struct TestWorker {
}

#[cfg_attr(feature = "async-trait", crate::async_trait)]
impl Actor for TestWorker {
type Msg = WorkerMessage<TestKey, TestMessage>;
type State = Self::Arguments;
type Arguments = WorkerStartContext<TestKey, TestMessage, ()>;
impl Worker for TestWorker {
type Key = TestKey;
type Message = TestMessage;
type State = ();
type Arguments = ();

async fn pre_start(
&self,
_myself: ActorRef<Self::Msg>,
_wid: WorkerId,
_factory: &ActorRef<FactoryMessage<TestKey, TestMessage>>,
startup_context: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
Ok(startup_context)
}

async fn handle(
&self,
_myself: ActorRef<Self::Msg>,
message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
WorkerMessage::FactoryPing(time) => {
state
.factory
.cast(FactoryMessage::WorkerPong(state.wid, time.elapsed()))?;
}
WorkerMessage::Dispatch(job) => {
tracing::debug!("Worker received {:?}", job.msg);
_wid: WorkerId,
_factory: &ActorRef<FactoryMessage<TestKey, TestMessage>>,
Job { msg, key, .. }: Job<Self::Key, Self::Message>,
_state: &mut Self::State,
) -> Result<TestKey, ActorProcessingErr> {
tracing::debug!("Worker received {:?}", msg);

self.counter.fetch_add(1, Ordering::Relaxed);

if let Some(timeout_ms) = self.slow {
crate::concurrency::sleep(Duration::from_millis(timeout_ms)).await;
}
self.counter.fetch_add(1, Ordering::Relaxed);

// job finished, on success or err we report back to the factory
state
.factory
.cast(FactoryMessage::Finished(state.wid, job.key))?;
}
if let Some(timeout_ms) = self.slow {
crate::concurrency::sleep(Duration::from_millis(timeout_ms)).await;
}
Ok(())

Ok(key)
}
}

Expand Down
41 changes: 15 additions & 26 deletions ractor/src/factory/tests/dynamic_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,43 +48,32 @@ struct TestWorker {
impl crate::Message for TestMessage {}

#[cfg_attr(feature = "async-trait", crate::async_trait)]
impl Actor for TestWorker {
type Msg = WorkerMessage<TestKey, TestMessage>;
impl Worker for TestWorker {
type Key = TestKey;
type Message = TestMessage;
type State = Self::Arguments;
type Arguments = WorkerStartContext<TestKey, TestMessage, ()>;
type Arguments = ();

async fn pre_start(
&self,
_myself: ActorRef<Self::Msg>,
_wid: WorkerId,
_factory: &ActorRef<FactoryMessage<Self::Key, Self::Message>>,
startup_context: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
Ok(startup_context)
}

async fn handle(
&self,
_myself: ActorRef<Self::Msg>,
message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
WorkerMessage::FactoryPing(time) => {
state
.factory
.cast(FactoryMessage::WorkerPong(state.wid, time.elapsed()))?;
}
WorkerMessage::Dispatch(job) => {
tracing::debug!("Worker received {:?}", job.msg);

self.id_map.insert(state.wid);

// job finished, on success or err we report back to the factory
state
.factory
.cast(FactoryMessage::Finished(state.wid, job.key))?;
}
}
Ok(())
wid: WorkerId,
_factory: &ActorRef<FactoryMessage<Self::Key, Self::Message>>,
Job { msg, key, .. }: Job<Self::Key, Self::Message>,
_state: &mut Self::State,
) -> Result<Self::Key, ActorProcessingErr> {
tracing::debug!("Worker received {:?}", msg);

self.id_map.insert(wid);
Ok(key)
}
}

Expand Down
Loading

0 comments on commit f15be0c

Please sign in to comment.