Skip to content

Commit

Permalink
refactor: Replace select! with futures-lite::race
Browse files Browse the repository at this point in the history
  • Loading branch information
cmleinz committed Oct 8, 2024
1 parent 969fce9 commit 491f811
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 151 deletions.
74 changes: 0 additions & 74 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ edition = "2021"
authors = ["Caleb Leinz <[email protected]>"]
description = "A minimal actor framework"
readme = "README.md"
repository = "https://github.com/cmleinz/black-box"
license = "MIT OR Apache-2.0"
rust-version = "1.75"

[dependencies]
async-channel = { version = "2.3.1" }
futures-util = { version = "0.3.31" }
pin-project-lite = { version = "0.2.14" }
futures-lite = { version = "2.3.0"}

[dev-dependencies]
async-executor = { version = "1.13.1" }
Expand Down
94 changes: 19 additions & 75 deletions src/executor.rs
Original file line number Diff line number Diff line change
@@ -1,76 +1,16 @@
use std::task::Poll;

use async_channel::Receiver;
use futures_util::{future::FusedFuture, Future};

use crate::{message::Envelope, Actor, Address};

const DEFAULT_CAP: usize = 100;

struct FusedReceiver<T> {
receiver: Receiver<T>,
}

impl<T> From<Receiver<T>> for FusedReceiver<T> {
fn from(value: Receiver<T>) -> Self {
FusedReceiver { receiver: value }
}
}

impl<T> FusedReceiver<T> {
fn recv(&self) -> FusedRecv<'_, T> {
FusedRecv::new(&self.receiver)
}
}

#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
enum State {
#[default]
Continue,
Shutdown,
}

pin_project_lite::pin_project! {
struct FusedRecv<'a, T> {
#[pin]
recv: async_channel::Recv<'a, T>,
finished: bool,
}
}

impl<'a, T> FusedRecv<'a, T> {
fn new(recv: &'a Receiver<T>) -> Self {
Self {
recv: recv.recv(),
finished: false,
}
}
}

impl<'a, T> Future for FusedRecv<'a, T> {
type Output = Result<T, async_channel::RecvError>;

fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let this = self.project();
match this.recv.poll(cx) {
std::task::Poll::Ready(Err(e)) => {
*this.finished = true;
Poll::Ready(Err(e))
}
other => other,
}
}
}

impl<'a, T> FusedFuture for FusedRecv<'a, T> {
fn is_terminated(&self) -> bool {
self.finished
}
}

/// A cloneable context for the actor.
///
/// Currently this fuctions as a means by which to alter the state of the [`Executor`], it is
Expand Down Expand Up @@ -110,8 +50,8 @@ pub struct Executor<A> {
actor: A,
context: Context,
state: State,
from_context: FusedReceiver<State>,
receiver: FusedReceiver<Envelope<A>>,
from_context: Receiver<State>,
receiver: Receiver<Envelope<A>>,
}

impl<A> Executor<A> {
Expand All @@ -120,9 +60,9 @@ impl<A> Executor<A> {
let (state_tx, state_rx) = async_channel::unbounded();
let me = Self {
actor,
receiver: receiver.into(),
receiver,
context: Context { sender: state_tx },
from_context: state_rx.into(),
from_context: state_rx,
state: Default::default(),
};
let address = Address::new(sender);
Expand All @@ -131,6 +71,11 @@ impl<A> Executor<A> {
}
}

enum Race<A> {
State(State),
Envelope(Envelope<A>),
}

impl<A> Executor<A>
where
A: Actor,
Expand All @@ -151,18 +96,17 @@ where
}

async fn continuation(&mut self) {
futures_util::select! {
state = self.from_context.recv() => {
self.state = state.unwrap_or(State::Shutdown);
}
message = self.receiver.recv() => {
let Ok(message) = message else {
self.state = State::Shutdown;
return
};
let fut1 = async { self.from_context.recv().await.map(|val| Race::State(val)) };
let fut2 = async { self.receiver.recv().await.map(|val| Race::Envelope(val)) };

message.resolve(&mut self.actor, &self.context).await;
let result = futures_lite::future::race(fut1, fut2).await;

match result {
Ok(Race::State(state)) => self.state = state,
Ok(Race::Envelope(env)) => env.resolve(&mut self.actor, &self.context).await,
Err(_) => {
self.state = State::Shutdown;
}
};
}
}
}

0 comments on commit 491f811

Please sign in to comment.