Skip to content

Commit

Permalink
Marking that the converter function in the derived actor is cross-thr…
Browse files Browse the repository at this point in the history
…ead safe since it's just an Into() call. (#321)
  • Loading branch information
slawlor authored Jan 9, 2025
1 parent d385bc9 commit 3b7cf3d
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 32 deletions.
4 changes: 2 additions & 2 deletions ractor/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ractor"
version = "0.14.4"
version = "0.14.5"
authors = ["Sean Lawlor", "Evan Au", "Dillon George"]
description = "A actor framework for Rust"
documentation = "https://docs.rs/ractor"
Expand All @@ -14,7 +14,7 @@ categories = ["asynchronous"]
rust-version = "1.64"

[lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)'] }
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)', 'cfg(rust_analyzer)'] }

[features]
### Other features
Expand Down
14 changes: 11 additions & 3 deletions ractor/src/actor/derived_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,18 @@ use std::sync::Arc;
/// kitchen_actor_handle.await.unwrap();
/// }
/// ```
#[derive(Clone)]
pub struct DerivedActorRef<TFrom> {
converter: Arc<dyn Fn(TFrom) -> Result<(), MessagingErr<TFrom>>>,
inner: ActorCell,
converter: Arc<dyn Fn(TFrom) -> Result<(), MessagingErr<TFrom>> + Send + Sync + 'static>,
pub(crate) inner: ActorCell,
}

impl<TFrom> Clone for DerivedActorRef<TFrom> {
fn clone(&self) -> Self {
Self {
converter: self.converter.clone(),
inner: self.inner.clone(),
}
}
}

impl<TFrom> std::fmt::Debug for DerivedActorRef<TFrom> {
Expand Down
4 changes: 1 addition & 3 deletions ractor/src/factory/factoryimpl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ use std::fmt::Debug;
use std::marker::PhantomData;
use std::sync::Arc;

use bon::Builder;

use self::routing::RouteResult;
use crate::concurrency::Duration;
use crate::concurrency::Instant;
Expand Down Expand Up @@ -99,7 +97,7 @@ where
}

/// Arguments for configuring and starting a [Factory] actor instance.
#[derive(Builder)]
#[derive(bon::Builder)]
#[builder(on(String, into))]
pub struct FactoryArguments<TKey, TMsg, TWorkerStart, TWorker, TRouter, TQueue>
where
Expand Down
87 changes: 65 additions & 22 deletions ractor/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,53 @@
use crate::concurrency::{self, Duration, JoinHandle};

use crate::{ActorCell, ActorRef, Message, MessagingErr, RpcReplyPort};
use crate::{ActorCell, ActorRef, DerivedActorRef, Message, MessagingErr, RpcReplyPort};

pub mod call_result;
pub use call_result::CallResult;
#[cfg(test)]
mod tests;

fn internal_cast<F, TMessage>(sender: F, msg: TMessage) -> Result<(), MessagingErr<TMessage>>
where
F: Fn(TMessage) -> Result<(), MessagingErr<TMessage>>,
TMessage: Message,
{
sender(msg)
}

async fn internal_call<F, TMessage, TReply, TMsgBuilder>(
sender: F,
msg_builder: TMsgBuilder,
timeout_option: Option<Duration>,
) -> Result<CallResult<TReply>, MessagingErr<TMessage>>
where
F: Fn(TMessage) -> Result<(), MessagingErr<TMessage>>,
TMessage: Message,
TMsgBuilder: FnOnce(RpcReplyPort<TReply>) -> TMessage,
{
let (tx, rx) = concurrency::oneshot();
let port: RpcReplyPort<TReply> = match timeout_option {
Some(duration) => (tx, duration).into(),
None => tx.into(),
};
sender(msg_builder(port))?;

// wait for the reply
Ok(if let Some(duration) = timeout_option {
match crate::concurrency::timeout(duration, rx).await {
Ok(Ok(result)) => CallResult::Success(result),
Ok(Err(_send_err)) => CallResult::SenderError,
Err(_timeout_err) => CallResult::Timeout,
}
} else {
match rx.await {
Ok(result) => CallResult::Success(result),
Err(_send_err) => CallResult::SenderError,
}
})
}

/// Sends an asynchronous request to the specified actor, ignoring if the
/// actor is alive or healthy and simply returns immediately
///
Expand All @@ -107,7 +147,7 @@ pub fn cast<TMessage>(actor: &ActorCell, msg: TMessage) -> Result<(), MessagingE
where
TMessage: Message,
{
actor.send_message::<TMessage>(msg)
internal_cast(|m| actor.send_message::<TMessage>(m), msg)
}

/// Sends an asynchronous request to the specified actor, building a one-time
Expand All @@ -129,26 +169,7 @@ where
TMessage: Message,
TMsgBuilder: FnOnce(RpcReplyPort<TReply>) -> TMessage,
{
let (tx, rx) = concurrency::oneshot();
let port: RpcReplyPort<TReply> = match timeout_option {
Some(duration) => (tx, duration).into(),
None => tx.into(),
};
actor.send_message::<TMessage>(msg_builder(port))?;

// wait for the reply
Ok(if let Some(duration) = timeout_option {
match crate::concurrency::timeout(duration, rx).await {
Ok(Ok(result)) => CallResult::Success(result),
Ok(Err(_send_err)) => CallResult::SenderError,
Err(_timeout_err) => CallResult::Timeout,
}
} else {
match rx.await {
Ok(result) => CallResult::Success(result),
Err(_send_err) => CallResult::SenderError,
}
})
internal_call(|m| actor.send_message(m), msg_builder, timeout_option).await
}

/// Sends an asynchronous request to the specified actors, building a one-time
Expand Down Expand Up @@ -327,3 +348,25 @@ where
)
}
}

impl<TMessage> DerivedActorRef<TMessage>
where
TMessage: Message,
{
/// Alias of [cast]
pub fn cast(&self, msg: TMessage) -> Result<(), MessagingErr<TMessage>> {
internal_cast(|m| self.send_message(m), msg)
}

/// Alias of [call]
pub async fn call<TReply, TMsgBuilder>(
&self,
msg_builder: TMsgBuilder,
timeout_option: Option<Duration>,
) -> Result<CallResult<TReply>, MessagingErr<TMessage>>
where
TMsgBuilder: FnOnce(RpcReplyPort<TReply>) -> TMessage,
{
internal_call(|m| self.send_message(m), msg_builder, timeout_option).await
}
}
36 changes: 36 additions & 0 deletions ractor/src/time/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,3 +216,39 @@ where
kill_after(period, self.get_cell())
}
}

/// Add the timing functionality on top of the [crate::ActorRef]
impl<TMessage> crate::DerivedActorRef<TMessage>
where
TMessage: crate::Message,
{
/// Alias of [send_interval]
pub fn send_interval<F>(&self, period: Duration, msg: F) -> JoinHandle<()>
where
F: Fn() -> TMessage + Send + 'static,
{
send_interval::<TMessage, F>(period, self.get_cell(), msg)
}

/// Alias of [send_after]
pub fn send_after<F>(
&self,
period: Duration,
msg: F,
) -> JoinHandle<Result<(), MessagingErr<TMessage>>>
where
F: FnOnce() -> TMessage + Send + 'static,
{
send_after::<TMessage, F>(period, self.get_cell(), msg)
}

/// Alias of [exit_after]
pub fn exit_after(&self, period: Duration) -> JoinHandle<()> {
exit_after(period, self.get_cell())
}

/// Alias of [kill_after]
pub fn kill_after(&self, period: Duration) -> JoinHandle<()> {
kill_after(period, self.get_cell())
}
}
2 changes: 1 addition & 1 deletion ractor_cluster/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ractor_cluster"
version = "0.14.4"
version = "0.14.5"
authors = ["Sean Lawlor <slawlor>"]
description = "Distributed cluster environment of Ractor actors"
documentation = "https://docs.rs/ractor"
Expand Down
2 changes: 1 addition & 1 deletion ractor_cluster_derive/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ractor_cluster_derive"
version = "0.14.4"
version = "0.14.5"
authors = ["Sean Lawlor <slawlor>"]
description = "Derives for ractor_cluster"
license = "MIT"
Expand Down

0 comments on commit 3b7cf3d

Please sign in to comment.