Skip to content

Commit

Permalink
feat: adds actor framework
Browse files Browse the repository at this point in the history
This change adds a lightweight actor framework. The main features that
are important to our use case are strongly typed messages and preserving
of tracing spans.
  • Loading branch information
nathanielc committed Jan 14, 2025
1 parent 2f28a80 commit 2241a85
Show file tree
Hide file tree
Showing 6 changed files with 654 additions and 0 deletions.
14 changes: 14 additions & 0 deletions actor-macros/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[package]
name = "ceramic-actor-macros"
version.workspace = true
edition.workspace = true
authors.workspace = true
license.workspace = true
repository.workspace = true

[lib]
proc-macro = true

[dependencies]
syn = "2.0"
quote = "1.0"
137 changes: 137 additions & 0 deletions actor-macros/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
use proc_macro::TokenStream;
use quote::quote;
use syn::{parse_macro_input, Attribute, DeriveInput, GenericParam, Lit};

#[proc_macro_derive(Actor, attributes(actor))]
pub fn actor(item: TokenStream) -> TokenStream {
let item = parse_macro_input!(item as DeriveInput);
// Extract struct name
let struct_name = item.ident;

let Config {
trait_name,
envelope_name,
handle_name,
} = Config::from_attributes(&struct_name, &item.attrs);

let generics = item.generics;
let generic_types: Vec<_> = generics
.params
.iter()
.filter_map(|param| match param {
GenericParam::Type(type_param) => Some(&type_param.ident),
_ => None,
})
.collect();
let phantom_fields = generic_types.iter().map(|ty| {
let name = syn::Ident::new(&format!("__{}", ty).to_lowercase(), ty.span());
quote! {
#name: std::marker::PhantomData<#ty>
}
});
let phantom_values: Vec<_> = generic_types
.iter()
.map(|ty| {
let name = syn::Ident::new(&format!("__{}", ty).to_lowercase(), ty.span());
quote! {
#name: Default::default()
}
})
.collect();

// Generate the implementation
let expanded = quote! {
impl #generics ceramic_actor::Actor for #struct_name < #(#generic_types,)*> {
type Envelope = #envelope_name;
}
impl #generics #trait_name for #struct_name < #(#generic_types,)*> { }

impl #generics #struct_name < #(#generic_types,)*> {
/// Start the actor returning a handle that can be easily cloned and shared.
/// The actor stops once all handles are dropped.
pub fn spawn(size: usize, actor: impl #trait_name + ::std::marker::Send + 'static, shutdown: impl ::std::future::Future<Output=()> + ::std::marker::Send + 'static) -> (#handle_name < #(#generic_types,)*>, tokio::task::JoinHandle<()>) {
let (sender, receiver) = ceramic_actor::channel(size);
let task_handle = tokio::spawn(async move { #envelope_name::run(actor, receiver, shutdown).await });

(
#handle_name {
sender,
#(#phantom_values,)*
},
task_handle,
)
}
}

/// Handle for [`#struct_name`].
#[derive(Debug)]
pub struct #handle_name #generics {
sender: ceramic_actor::Sender<#envelope_name>,
#(#phantom_fields,)*
}
impl #generics ::core::clone::Clone for #handle_name < #(#generic_types,)*> {
fn clone(&self) -> Self {
Self{
sender:self.sender.clone(),
#(#phantom_values,)*
}
}
}

#[async_trait::async_trait]
impl #generics ceramic_actor::ActorHandle for #handle_name < #(#generic_types,)*> {
type Actor = #struct_name < #(#generic_types,)*>;
fn sender(&self) -> ceramic_actor::Sender<<#struct_name < #(#generic_types,)*> as ceramic_actor::Actor>::Envelope> {
self.sender.clone()
}
}

};

TokenStream::from(expanded)
}

struct Config {
trait_name: syn::Ident,
envelope_name: syn::Ident,
handle_name: syn::Ident,
}

impl Config {
fn from_attributes(struct_name: &syn::Ident, attrs: &[Attribute]) -> Self {
let mut trait_name = syn::Ident::new(&format!("{}Actor", struct_name), struct_name.span());
let mut envelope_name =
syn::Ident::new(&format!("{}Envelope", struct_name), struct_name.span());
let mut handle_name =
syn::Ident::new(&format!("{}Handle", struct_name), struct_name.span());
for attr in attrs {
if attr.path().is_ident("actor") {
attr.parse_nested_meta(|meta| {
if meta.path.is_ident("envelope") {
let value: Lit = meta.value()?.parse()?;
if let Lit::Str(lit_str) = value {
envelope_name = syn::Ident::new(&lit_str.value(), lit_str.span())
}
} else if meta.path.is_ident("handle") {
let value: Lit = meta.value()?.parse()?;
if let Lit::Str(lit_str) = value {
handle_name = syn::Ident::new(&lit_str.value(), lit_str.span())
}
} else if meta.path.is_ident("actor_trait") {
let value: Lit = meta.value()?.parse()?;
if let Lit::Str(lit_str) = value {
trait_name = syn::Ident::new(&lit_str.value(), lit_str.span())
}
}
Ok(())
})
.expect("should be able to parse attributes");
}
}
Self {
trait_name,
envelope_name,
handle_name,
}
}
}
19 changes: 19 additions & 0 deletions actor/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[package]
name = "ceramic-actor"
version.workspace = true
edition.workspace = true
authors.workspace = true
license.workspace = true
repository.workspace = true

[dependencies]
async-trait.workspace = true
tokio.workspace = true
tracing.workspace = true
ceramic-actor-macros.workspace = true
snafu.workspace = true

[dev-dependencies]
tokio = { workspace = true, features = ["rt-multi-thread", "time"] }
tracing-subscriber.workspace = true
shutdown.workspace = true
140 changes: 140 additions & 0 deletions actor/examples/game/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
use std::ops::AddAssign;

use async_trait::async_trait;
use ceramic_actor::{actor_envelope, Actor, ActorHandle, Error, Handler, Message};
use shutdown::Shutdown;
use tracing::{instrument, Level};

#[derive(Actor)]
pub struct Game {
scores: Scores,
}
impl Game {
pub fn new() -> Self {
Self {
scores: Default::default(),
}
}
}

actor_envelope! {
GameEnvelope,
GameActor,
GetScore => GetScoreMessage,
Score => ScoreMessage,
}

#[derive(Debug)]
struct ScoreMessage {
scores: Scores,
}
impl Message for ScoreMessage {
type Result = ();
}

#[derive(Debug)]
struct GetScoreMessage;
impl Message for GetScoreMessage {
type Result = Scores;
}

#[derive(Clone, Debug, Default)]
struct Scores {
home: usize,
away: usize,
}
impl AddAssign for Scores {
fn add_assign(&mut self, rhs: Self) {
self.home += rhs.home;
self.away += rhs.away;
}
}

#[async_trait]
impl Handler<ScoreMessage> for Game {
#[instrument(skip(self), ret(level = Level::DEBUG))]
async fn handle(&mut self, message: ScoreMessage) -> <ScoreMessage as Message>::Result {
self.scores += message.scores;
}
}
#[async_trait]
impl Handler<GetScoreMessage> for Game {
#[instrument(skip(self), ret(level = Level::DEBUG))]
async fn handle(&mut self, _message: GetScoreMessage) -> <GetScoreMessage as Message>::Result {
self.scores.clone()
}
}

#[derive(Actor)]
// The envelope and handle types names can be explicitly named.
#[actor(envelope = "PlayerEnv", handle = "PlayerH", actor_trait = "PlayerI")]
pub struct Player {
is_home: bool,
game: GameHandle,
}

impl Player {
fn new(is_home: bool, game: GameHandle) -> Self {
Self { is_home, game }
}
}

actor_envelope! {
PlayerEnv,
PlayerI,
Shoot => ShootMessage,
}

#[derive(Debug)]
struct ShootMessage;
impl Message for ShootMessage {
type Result = ();
}

#[async_trait]
impl Handler<ShootMessage> for Player {
#[instrument(skip(self), ret(level = Level::DEBUG))]
async fn handle(&mut self, _message: ShootMessage) -> <ScoreMessage as Message>::Result {
// Player always scores two points
let message = if self.is_home {
ScoreMessage {
scores: Scores { home: 2, away: 0 },
}
} else {
ScoreMessage {
scores: Scores { home: 0, away: 2 },
}
};
self.game.notify(message).await.unwrap();
}
}

#[tokio::main]
async fn main() {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.pretty()
.init();
let shutdown = Shutdown::new();
let (game, _) = Game::spawn(1_000, Game::new(), shutdown.wait_fut());
let (player_home, _) =
Player::spawn(1_000, Player::new(true, game.clone()), shutdown.wait_fut());
let (player_away, _) =
Player::spawn(1_000, Player::new(false, game.clone()), shutdown.wait_fut());
player_home.notify(ShootMessage).await.unwrap();
player_away.send(ShootMessage).await.unwrap();
// Send with retry without cloning the message to be sent.
let mut msg = ShootMessage;
loop {
match player_home.send(msg).await {
Ok(_) => break,
Err(Error::Send { message }) => msg = message.0,
Err(_) => panic!(),
};
}
println!(
"Game score is: {:?}",
game.send(GetScoreMessage).await.unwrap()
);
shutdown.shutdown();
}
Loading

0 comments on commit 2241a85

Please sign in to comment.