Skip to content

Commit

Permalink
Add utility functions for spawning actors on
Browse files Browse the repository at this point in the history
(a) the ActorRef for spawning children (instance method of `spawn_linked`)
(b) at the crate root for spawning unmanaged actors
  • Loading branch information
slawlor committed Feb 4, 2025
1 parent f9ccc48 commit bebc427
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 16 deletions.
24 changes: 22 additions & 2 deletions ractor/src/actor/actor_cell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ use super::actor_properties::MuxedMessage;
use super::messages::{Signal, StopMessage};
use super::SupervisionEvent;
use crate::actor::actor_properties::ActorProperties;
use crate::concurrency::{MpscUnboundedReceiver as InputPortReceiver, OneshotReceiver};
use crate::concurrency::{JoinHandle, MpscUnboundedReceiver as InputPortReceiver, OneshotReceiver};
use crate::errors::MessagingErr;
#[cfg(feature = "cluster")]
use crate::message::SerializedMessage;
use crate::RactorErr;
use crate::{Actor, ActorName, SpawnErr};
use crate::{ActorId, Message};
use crate::{ActorRef, RactorErr};

/// [ActorStatus] represents the status of an actor's lifecycle
#[derive(Debug, Clone, Eq, PartialEq, Copy, PartialOrd, Ord)]
Expand Down Expand Up @@ -655,6 +655,26 @@ impl ActorCell {
}
}

/// Spawn an actor of the given type as a child of this actor, automatically starting the actor.
/// This [ActorCell] becomes the supervisor of the child actor.
///
/// * `name`: A name to give the actor. Useful for global referencing or debug printing
/// * `handler` The implementation of Self
/// * `startup_args`: Arguments passed to the `pre_start` call of the [Actor] to facilitate startup and
/// initial state creation

Check failure on line 664 in ractor/src/actor/actor_cell.rs

View workflow job for this annotation

GitHub Actions / Clippy

doc list item without indentation
///
/// Returns a [Ok((ActorRef, JoinHandle<()>))] upon successful start, denoting the actor reference
/// along with the join handle which will complete when the actor terminates. Returns [Err(SpawnErr)] if
/// the actor failed to start
pub async fn spawn_linked<T: Actor>(
&self,
name: Option<String>,
handler: T,
startup_args: T::Arguments,
) -> Result<(ActorRef<T::Msg>, JoinHandle<()>), SpawnErr> {
crate::actor::ActorRuntime::spawn_linked(name, handler, startup_args, self.clone()).await
}

// ================== Test Utilities ================== //

#[cfg(test)]
Expand Down
16 changes: 11 additions & 5 deletions ractor/src/actor/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ impl crate::Message for EmptyMessage {}
#[crate::concurrency::test]
#[tracing_test::traced_test]
async fn test_panic_on_start_captured() {
#[derive(Default)]
struct TestActor;

#[cfg_attr(feature = "async-trait", crate::async_trait)]
Expand All @@ -47,7 +48,7 @@ async fn test_panic_on_start_captured() {
}
}

let actor_output = Actor::spawn(None, TestActor, ()).await;
let actor_output = crate::spawn::<TestActor>(()).await;
assert!(matches!(actor_output, Err(SpawnErr::StartupFailed(_))));
}

Expand Down Expand Up @@ -364,6 +365,7 @@ async fn test_sending_message_to_invalid_actor_type() {
#[crate::concurrency::test]
#[tracing_test::traced_test]
async fn test_sending_message_to_dead_actor() {
#[derive(Default)]
struct TestActor;

#[cfg_attr(feature = "async-trait", crate::async_trait)]
Expand All @@ -381,13 +383,15 @@ async fn test_sending_message_to_dead_actor() {
}
}

let (actor, handle) = Actor::spawn(None, TestActor, ())
let (actor, _) = crate::spawn::<TestActor>(())
.await
.expect("Actor failed to start");

// Stop the actor and wait for it to die
actor.stop(None);
handle.await.unwrap();
actor
.stop_and_wait(None, None)
.await
.expect("Failed to stop");

// assert that if we send a message, it doesn't transmit since
// the receiver is dropped
Expand Down Expand Up @@ -968,6 +972,7 @@ fn returns_actor_references() {
#[crate::concurrency::test]
#[tracing_test::traced_test]
async fn actor_failing_in_spawn_err_doesnt_poison_registries() {
#[derive(Default)]
struct Test;

#[cfg_attr(feature = "async-trait", crate::async_trait)]
Expand All @@ -981,6 +986,7 @@ async fn actor_failing_in_spawn_err_doesnt_poison_registries() {
}
}

#[derive(Default)]
struct Test2;

#[cfg_attr(feature = "async-trait", crate::async_trait)]
Expand All @@ -994,7 +1000,7 @@ async fn actor_failing_in_spawn_err_doesnt_poison_registries() {
}
}

let a = Actor::spawn(Some("test".to_owned()), Test, ()).await;
let a = crate::spawn_named::<Test>("test".to_owned(), ()).await;
assert!(a.is_err());
drop(a);

Expand Down
10 changes: 4 additions & 6 deletions ractor/src/actor/tests/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,8 @@ async fn test_supervision_panic_in_post_startup() {
.await
.expect("Supervisor panicked on startup");

let supervisor_cell: ActorCell = supervisor_ref.clone().into();

let (child_ref, c_handle) = Actor::spawn_linked(None, Child, (), supervisor_cell)
let (child_ref, c_handle) = supervisor_ref
.spawn_linked(None, Child, ())
.await
.expect("Child panicked on startup");

Expand Down Expand Up @@ -1088,9 +1087,8 @@ async fn test_supervisor_captures_dead_childs_state() {
.await
.expect("Supervisor panicked on startup");

let supervisor_cell: ActorCell = supervisor_ref.clone().into();

let (child_ref, c_handle) = Actor::spawn_linked(None, Child, (), supervisor_cell)
let (child_ref, c_handle) = supervisor_ref
.spawn_linked(None, Child, ())
.await
.expect("Child panicked on startup");

Expand Down
31 changes: 31 additions & 0 deletions ractor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ pub mod rpc;
pub mod serialization;
pub mod time;

use concurrency::JoinHandle;
#[cfg(not(feature = "async-trait"))]
use strum as _;

Expand Down Expand Up @@ -228,3 +229,33 @@ pub type ScopeName = String;
/// to send between threads (same bounds as a [Message])
pub trait State: std::any::Any + Send + 'static {}
impl<T: std::any::Any + Send + 'static> State for T {}

// ======================== Helper Functionality ======================== //

/// Perform a background-spawn of an actor. This is a utility wrapper over [Actor::spawn] which drops
/// the [crate::concurrency::JoinHandle] for convenience and assumes the actor implementation implements
/// [Default].
///
/// * `args` - The arguments to start the actor
///
/// Returns [Ok((ActorRef, JoinHandle<()>))] upon successful actor startup, [Err(SpawnErr)] otherwise
pub async fn spawn<T: Actor + Default>(
args: T::Arguments,
) -> Result<(ActorRef<T::Msg>, JoinHandle<()>), SpawnErr> {
T::spawn(None, T::default(), args).await
}

/// Perform a background-spawn of an actor with the provided name. This is a utility wrapper
/// over [Actor::spawn] which drops the [crate::concurrency::JoinHandle] for convenience and
/// assumes the actor implementation implements [Default].
///
/// * `name` - The name for the actor to spawn
/// * `args` - The arguments to start the actor
///
/// Returns [Ok((ActorRef, JoinHandle<()>))] upon successful actor startup, [Err(SpawnErr)] otherwise
pub async fn spawn_named<T: Actor + Default>(
name: crate::ActorName,
args: T::Arguments,
) -> Result<(ActorRef<T::Msg>, JoinHandle<()>), SpawnErr> {
T::spawn(Some(name), T::default(), args).await
}
9 changes: 6 additions & 3 deletions ractor/src/registry/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::{Actor, ActorProcessingErr, SpawnErr};
#[crate::concurrency::test]
#[tracing_test::traced_test]
async fn test_basic_registation() {
#[derive(Default)]
struct EmptyActor;

#[cfg_attr(feature = "async-trait", crate::async_trait)]
Expand All @@ -29,7 +30,7 @@ async fn test_basic_registation() {
}
}

let (actor, handle) = Actor::spawn(Some("my_actor".to_string()), EmptyActor, ())
let (actor, _) = crate::spawn_named::<EmptyActor>("my_actor".to_string(), ())
.await
.expect("Actor failed to start");

Expand All @@ -38,8 +39,10 @@ async fn test_basic_registation() {
// Coverage for Issue #70
assert!(crate::ActorRef::<()>::where_is("my_actor".to_string()).is_some());

actor.stop(None);
handle.await.expect("Failed to clean stop the actor");
actor
.stop_and_wait(None, None)
.await
.expect("Failed to wait for stop");
}

#[crate::concurrency::test]
Expand Down

0 comments on commit bebc427

Please sign in to comment.