diff --git a/packages/yew-agent/Cargo.toml b/packages/yew-agent/Cargo.toml index 8592b0327b3..099b212f90c 100644 --- a/packages/yew-agent/Cargo.toml +++ b/packages/yew-agent/Cargo.toml @@ -9,7 +9,7 @@ edition = "2021" readme = "../../README.md" description = "Agents for Yew" license = "MIT OR Apache-2.0" -rust-version = "1.60.0" +rust-version = "1.64.0" [dependencies] yew = { version = "0.20.0", path = "../yew" } diff --git a/packages/yew-agent/src/lib.rs b/packages/yew-agent/src/lib.rs index 19d8e0f3555..3426ff211ea 100644 --- a/packages/yew-agent/src/lib.rs +++ b/packages/yew-agent/src/lib.rs @@ -6,7 +6,7 @@ //! //! #### Oneshot //! -//! A kind of agent that for each input, a single output is expected. +//! A kind of agent that for each input, a single output is returned. //! //! #### Reactor //! @@ -73,22 +73,20 @@ extern crate self as yew_agent; -// pub mod reactor; pub mod oneshot; +pub mod reactor; pub mod worker; #[doc(inline)] pub use gloo_worker::{Bincode, Codec, Registrable, Spawnable}; -/// A procedural macro to create oneshot agents. -pub use yew_agent_macro::oneshot; -/// A procedural macro to create reactor agents. -pub use yew_agent_macro::reactor; mod reach; pub mod scope_ext; pub use reach::Reach; +mod utils; + #[doc(hidden)] pub mod __vendored { pub use futures; @@ -98,12 +96,12 @@ pub mod prelude { //! Prelude module to be imported when working with `yew-agent`. //! //! This module re-exports the frequently used types from the crate. - pub use crate::oneshot::{use_bridge_oneshot, UseBridgeOneshotHandle}; + pub use crate::oneshot::{oneshot, use_bridge_oneshot, UseBridgeOneshotHandle}; pub use crate::reach::Reach; - // pub use crate::reactor::{ - // use_reactor_bridge, use_reactor_subscription, ReactorOutput, UseReactorBridgeHandle, - // UseReactorSubscriptionHandle, - // }; + pub use crate::reactor::{ + use_reactor_bridge, use_reactor_subscription, ReactorEvent, UseReactorBridgeHandle, + UseReactorSubscriptionHandle, + }; pub use crate::scope_ext::{AgentScopeExt, /* ReactorBridgeHandle, */ WorkerBridgeHandle}; pub use crate::worker::{ use_worker_bridge, use_worker_subscription, UseWorkerBridgeHandle, diff --git a/packages/yew-agent/src/oneshot/mod.rs b/packages/yew-agent/src/oneshot/mod.rs index 6ad8fcd558c..523d6c4a53c 100644 --- a/packages/yew-agent/src/oneshot/mod.rs +++ b/packages/yew-agent/src/oneshot/mod.rs @@ -8,3 +8,5 @@ pub use gloo_worker::oneshot::{Oneshot, OneshotBridge, OneshotRegistrar, Oneshot pub use hooks::{use_bridge_oneshot, UseBridgeOneshotHandle}; pub use provider::OneshotProvider; pub(crate) use provider::OneshotProviderState; +/// A procedural macro to create oneshot agents. +pub use yew_agent_macro::oneshot; diff --git a/packages/yew-agent/src/oneshot/provider.rs b/packages/yew-agent/src/oneshot/provider.rs index 5e1d90c1231..0dbc4d85d11 100644 --- a/packages/yew-agent/src/oneshot/provider.rs +++ b/packages/yew-agent/src/oneshot/provider.rs @@ -1,13 +1,12 @@ use core::fmt; use std::cell::RefCell; use std::rc::Rc; -use std::sync::atomic::{AtomicUsize, Ordering}; -use gloo_worker::oneshot::OneshotSpawner; use serde::{Deserialize, Serialize}; use yew::prelude::*; -use super::{Oneshot, OneshotBridge}; +use super::{Oneshot, OneshotBridge, OneshotSpawner}; +use crate::utils::get_next_id; use crate::worker::WorkerProviderProps; use crate::{Bincode, Codec, Reach}; @@ -15,7 +14,7 @@ pub(crate) struct OneshotProviderState where T: Oneshot + 'static, { - ctr: usize, + id: usize, spawn_bridge_fn: Rc OneshotBridge>, reach: Reach, held_bridge: Rc>>>, @@ -59,13 +58,13 @@ where } } -impl Clone for OneshotProviderState +impl Clone for OneshotProviderState where - W: Oneshot, + T: Oneshot, { fn clone(&self) -> Self { Self { - ctr: self.ctr, + id: self.id, spawn_bridge_fn: self.spawn_bridge_fn.clone(), reach: self.reach, held_bridge: self.held_bridge.clone(), @@ -73,17 +72,15 @@ where } } -impl PartialEq for OneshotProviderState +impl PartialEq for OneshotProviderState where - W: Oneshot, + T: Oneshot, { fn eq(&self, rhs: &Self) -> bool { - self.ctr == rhs.ctr + self.id == rhs.id } } -static CTR: AtomicUsize = AtomicUsize::new(0); - /// A Oneshot Agent Provider. /// /// This component provides its children access to an oneshot agent. @@ -95,12 +92,6 @@ where T::Output: Serialize + for<'de> Deserialize<'de> + 'static, CODEC: Codec + 'static, { - // Creates a spawning function so CODEC is can be erased from contexts. - let spawn_bridge_fn: Rc OneshotBridge> = { - let path = props.path.clone(); - Rc::new(move || OneshotSpawner::::new().encoding::().spawn(&path)) - }; - let WorkerProviderProps { children, path, @@ -108,12 +99,16 @@ where reach, } = props.clone(); + // Creates a spawning function so CODEC is can be erased from contexts. + let spawn_bridge_fn: Rc OneshotBridge> = { + let path = path.clone(); + Rc::new(move || OneshotSpawner::::new().encoding::().spawn(&path)) + }; + let state = { use_memo((path, lazy, reach), move |(_path, lazy, reach)| { - let ctr = CTR.fetch_add(1, Ordering::SeqCst); - let state = OneshotProviderState:: { - ctr, + id: get_next_id(), spawn_bridge_fn, reach: *reach, held_bridge: Rc::default(), diff --git a/packages/yew-agent/src/reactor/hooks.rs b/packages/yew-agent/src/reactor/hooks.rs index af7218e5250..cf79b1853cb 100644 --- a/packages/yew-agent/src/reactor/hooks.rs +++ b/packages/yew-agent/src/reactor/hooks.rs @@ -2,28 +2,61 @@ use std::fmt; use std::ops::Deref; use std::rc::Rc; +use futures::sink::SinkExt; +use futures::stream::{SplitSink, StreamExt}; +use wasm_bindgen::UnwrapThrowExt; +use yew::platform::pinned::RwLock; +use yew::platform::spawn_local; use yew::prelude::*; -use super::messages::{ReactorInput, ReactorOutput}; -use super::traits::{Reactor, ReactorWorker}; -use super::tx_rx::{ReactorReceivable, ReactorSendable}; -use crate::worker::{use_worker_bridge, UseWorkerBridgeHandle}; +use super::provider::ReactorProviderState; +use super::{Reactor, ReactorBridge, ReactorScoped}; +use crate::utils::BridgeCounter; + +type ReactorTx = + Rc, <::Scope as ReactorScoped>::Input>>>; + +/// A type that represents events from a reactor. +pub enum ReactorEvent +where + R: Reactor, +{ + /// The reactor agent has sent an output. + Output(::Output), + /// The reactor agent has exited. + Finished, +} + +impl fmt::Debug for ReactorEvent +where + R: Reactor, + ::Output: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Output(m) => f.debug_tuple("ReactorEvent::Output").field(&m).finish(), + Self::Finished => f.debug_tuple("ReactorEvent::Finished").finish(), + } + } +} /// Handle for the [use_reactor_bridge] hook. pub struct UseReactorBridgeHandle where R: 'static + Reactor, { - inner: UseWorkerBridgeHandle>, + tx: ReactorTx, + ctr: UseReducerDispatcher, } impl fmt::Debug for UseReactorBridgeHandle where R: 'static + Reactor, + ::Input: fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("UseReactorBridgeHandle<_>") - .field("inner", &self.inner) + .field("inner", &self.tx) .finish() } } @@ -34,7 +67,8 @@ where { fn clone(&self) -> Self { Self { - inner: self.inner.clone(), + tx: self.tx.clone(), + ctr: self.ctr.clone(), } } } @@ -44,15 +78,19 @@ where R: 'static + Reactor, { /// Send an input to a reactor agent. - pub fn send(&self, msg: ::Input) { - self.inner.send(ReactorInput::Input(msg)); + pub fn send(&self, msg: ::Input) { + let tx = self.tx.clone(); + spawn_local(async move { + let mut tx = tx.write().await; + let _ = tx.send(msg).await; + }); } /// Reset the bridge. /// /// Disconnect the old bridge and re-connects the agent with a new bridge. pub fn reset(&self) { - self.inner.reset(); + self.ctr.dispatch(()); } } @@ -61,7 +99,7 @@ where R: 'static + Reactor, { fn eq(&self, rhs: &Self) -> bool { - self.inner == rhs.inner + self.ctr == rhs.ctr } } @@ -77,20 +115,61 @@ where pub fn use_reactor_bridge(on_output: F) -> UseReactorBridgeHandle where R: 'static + Reactor, - F: Fn(ReactorOutput<::Output>) + 'static, + F: Fn(ReactorEvent) + 'static, { - let bridge = use_worker_bridge::, _>(on_output); + let ctr = use_reducer(BridgeCounter::default); - { - let bridge = bridge.clone(); + let worker_state = use_context::>() + .expect_throw("cannot find a provider for current agent."); - use_effect(move || { - bridge.send(ReactorInput::Start); - || {} - }); + let on_output = { + let current_bridge_id = ctr.inner; + Rc::new(move |event: ReactorEvent, event_bridge_id: usize| { + // If a new bridge is created, then we discard messages from the previous bridge. + if current_bridge_id != event_bridge_id { + return; + } + + on_output(event); + }) + }; + + let on_output_ref = { + let on_output_clone = on_output.clone(); + use_mut_ref(move || on_output_clone) + }; + + // Refresh the callback on every render. + { + let mut on_output_ref = on_output_ref.borrow_mut(); + *on_output_ref = on_output; } - UseReactorBridgeHandle { inner: bridge } + let tx = { + use_memo((worker_state, ctr.inner), |(state, ctr)| { + let bridge = state.create_bridge(); + let ctr = *ctr; + + let (tx, mut rx) = bridge.split(); + + spawn_local(async move { + while let Some(m) = rx.next().await { + let on_output = on_output_ref.borrow().clone(); + on_output(ReactorEvent::::Output(m), ctr); + } + + let on_output = on_output_ref.borrow().clone(); + on_output(ReactorEvent::::Finished, ctr); + }); + + RwLock::new(tx) + }) + }; + + UseReactorBridgeHandle { + tx: tx.clone(), + ctr: ctr.dispatcher(), + } } /// State handle for the [`use_reactor_subscription`] hook. @@ -99,7 +178,7 @@ where R: 'static + Reactor, { bridge: UseReactorBridgeHandle, - outputs: Vec::Output>>, + outputs: Vec::Output>>, finished: bool, ctr: usize, } @@ -109,7 +188,7 @@ where R: 'static + Reactor, { /// Send an input to a reactor agent. - pub fn send(&self, msg: ::Input) { + pub fn send(&self, msg: ::Input) { self.bridge.send(msg); } @@ -144,7 +223,8 @@ where impl fmt::Debug for UseReactorSubscriptionHandle where R: 'static + Reactor, - ::Output: fmt::Debug, + ::Input: fmt::Debug, + ::Output: fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("UseReactorSubscriptionHandle<_>") @@ -158,7 +238,7 @@ impl Deref for UseReactorSubscriptionHandle where R: 'static + Reactor, { - type Target = [Rc<::Output>]; + type Target = [Rc<::Output>]; fn deref(&self) -> &Self::Target { &self.outputs @@ -186,7 +266,7 @@ where where R: Reactor + 'static, { - Output(ReactorOutput<::Output>), + Output(ReactorEvent), Reset, } @@ -195,7 +275,7 @@ where R: Reactor + 'static, { ctr: usize, - inner: Vec::Output>>, + inner: Vec::Output>>, finished: bool, } @@ -211,8 +291,8 @@ where let mut finished = self.finished; match action { - OutputsAction::Output(ReactorOutput::Output(m)) => outputs.push(m.into()), - OutputsAction::Output(ReactorOutput::Finish) => { + OutputsAction::Output(ReactorEvent::::Output(m)) => outputs.push(m.into()), + OutputsAction::Output(ReactorEvent::::Finished) => { finished = true; } OutputsAction::Reset => { @@ -256,14 +336,11 @@ where { let outputs = outputs.clone(); - use_effect_with_deps( - move |_| { - outputs.dispatch(OutputsAction::Reset); - - || {} - }, - bridge.clone(), - ); + use_effect_with(bridge.clone(), move |_| { + outputs.dispatch(OutputsAction::Reset); + + || {} + }); } UseReactorSubscriptionHandle { diff --git a/packages/yew-agent/src/reactor/messages.rs b/packages/yew-agent/src/reactor/messages.rs deleted file mode 100644 index d44f0f0110b..00000000000 --- a/packages/yew-agent/src/reactor/messages.rs +++ /dev/null @@ -1,25 +0,0 @@ -use serde::{Deserialize, Serialize}; - -/// The Bridge Input. -#[derive(Serialize, Deserialize)] -pub(crate) enum ReactorInput -where - I: 'static, -{ - /// Starts the bridge. - Start, - /// An input message. - Input(I), -} - -/// The Bridge Output. -#[derive(Debug, Serialize, Deserialize)] -pub enum ReactorOutput -where - O: 'static, -{ - /// An output message has been received. - Output(O), - /// Reactor for current bridge has exited. - Finish, -} diff --git a/packages/yew-agent/src/reactor/mod.rs b/packages/yew-agent/src/reactor/mod.rs index d5c639b1482..f67e31d1169 100644 --- a/packages/yew-agent/src/reactor/mod.rs +++ b/packages/yew-agent/src/reactor/mod.rs @@ -36,18 +36,16 @@ //! ``` mod hooks; -mod messages; mod provider; -mod traits; -mod tx_rx; +#[doc(inline)] +pub use gloo_worker::reactor::{ + Reactor, ReactorBridge, ReactorRegistrar, ReactorScope, ReactorScoped, ReactorSpawner, +}; pub use hooks::{ - use_reactor_bridge, use_reactor_subscription, UseReactorBridgeHandle, + use_reactor_bridge, use_reactor_subscription, ReactorEvent, UseReactorBridgeHandle, UseReactorSubscriptionHandle, }; -pub(crate) use messages::ReactorInput; -pub use messages::ReactorOutput; pub use provider::ReactorProvider; -pub(crate) use traits::ReactorWorker; -pub use traits::{Reactor, ReactorRegistrar}; -pub use tx_rx::{ReactorReceivable, ReactorReceiver, ReactorSendable, ReactorSender}; +/// A procedural macro to create reactor agents. +pub use yew_agent_macro::reactor; diff --git a/packages/yew-agent/src/reactor/provider.rs b/packages/yew-agent/src/reactor/provider.rs index 0ad730499f1..fd5d2192e99 100644 --- a/packages/yew-agent/src/reactor/provider.rs +++ b/packages/yew-agent/src/reactor/provider.rs @@ -1,26 +1,130 @@ +use std::cell::RefCell; +use std::fmt; +use std::rc::Rc; + +use gloo_worker::reactor::ReactorScoped; +use serde::{Deserialize, Serialize}; use yew::prelude::*; -use super::traits::{Reactor, ReactorWorker}; -use crate::worker::{WorkerProvider, WorkerProviderProps}; -use crate::{Bincode, Codec}; +use super::{Reactor, ReactorBridge, ReactorSpawner}; +use crate::utils::get_next_id; +use crate::worker::WorkerProviderProps; +use crate::{Bincode, Codec, Reach}; + +pub(crate) struct ReactorProviderState +where + T: Reactor + 'static, +{ + id: usize, + spawn_bridge_fn: Rc ReactorBridge>, + reach: Reach, + held_bridge: Rc>>>, +} + +impl fmt::Debug for ReactorProviderState +where + T: Reactor, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("ReactorProviderState<_>") + } +} + +impl ReactorProviderState +where + T: Reactor, +{ + fn get_held_bridge(&self) -> ReactorBridge { + let mut held_bridge = self.held_bridge.borrow_mut(); + + match held_bridge.as_mut() { + Some(m) => m.fork(), + None => { + let bridge = (self.spawn_bridge_fn)(); + *held_bridge = Some(bridge.fork()); + bridge + } + } + } + + /// Creates a bridge, uses "fork" for public agents. + pub fn create_bridge(&self) -> ReactorBridge { + match self.reach { + Reach::Public => { + let held_bridge = self.get_held_bridge(); + held_bridge.fork() + } + Reach::Private => (self.spawn_bridge_fn)(), + } + } +} + +impl Clone for ReactorProviderState +where + T: Reactor, +{ + fn clone(&self) -> Self { + Self { + id: self.id, + spawn_bridge_fn: self.spawn_bridge_fn.clone(), + reach: self.reach, + held_bridge: self.held_bridge.clone(), + } + } +} + +impl PartialEq for ReactorProviderState +where + T: Reactor, +{ + fn eq(&self, rhs: &Self) -> bool { + self.id == rhs.id + } +} /// The reactor provider. #[function_component] pub fn ReactorProvider(props: &WorkerProviderProps) -> Html where R: 'static + Reactor, + <::Scope as ReactorScoped>::Input: + Serialize + for<'de> Deserialize<'de> + 'static, + <::Scope as ReactorScoped>::Output: + Serialize + for<'de> Deserialize<'de> + 'static, CODEC: Codec + 'static, { let WorkerProviderProps { children, + path, lazy, reach, - path, } = props.clone(); + // Creates a spawning function so CODEC is can be erased from contexts. + let spawn_bridge_fn: Rc ReactorBridge> = { + let path = path.clone(); + Rc::new(move || ReactorSpawner::::new().encoding::().spawn(&path)) + }; + + let state = { + use_memo((path, lazy, reach), move |(_path, lazy, reach)| { + let state = ReactorProviderState:: { + id: get_next_id(), + spawn_bridge_fn, + reach: *reach, + held_bridge: Rc::default(), + }; + + if *reach == Reach::Public && !*lazy { + state.get_held_bridge(); + } + state + }) + }; + html! { - , CODEC> {lazy} {path} {reach}> + > context={(*state).clone()}> {children} - , CODEC>> + >> } } diff --git a/packages/yew-agent/src/reactor/traits.rs b/packages/yew-agent/src/reactor/traits.rs deleted file mode 100644 index 711253762b1..00000000000 --- a/packages/yew-agent/src/reactor/traits.rs +++ /dev/null @@ -1,164 +0,0 @@ -use std::collections::HashMap; -use std::fmt; - -use futures::future::LocalBoxFuture; -use futures::stream::StreamExt; -use yew::platform::pinned::mpsc; -use yew::platform::spawn_local; - -use super::messages::{ReactorInput, ReactorOutput}; -use super::tx_rx::{ReactorReceivable, ReactorSendable}; -use crate::worker::{HandlerId, Worker, WorkerDestroyHandle, WorkerRegistrar, WorkerScope}; -use crate::{Bincode, Codec, Registrable}; - -/// A reactor agent. -pub trait Reactor { - /// The Reactor Receiver. - type Receiver: ReactorReceivable; - /// The Reactor Sender. - type Sender: ReactorSendable; - - /// Runs a reactor agent. - fn run(tx: Self::Sender, rx: Self::Receiver) -> LocalBoxFuture<'static, ()>; -} - -/// A registrar for reactor agents. -pub struct ReactorRegistrar -where - R: Reactor + 'static, - CODEC: Codec + 'static, -{ - inner: WorkerRegistrar, CODEC>, -} - -impl ReactorRegistrar -where - R: Reactor + 'static, - CODEC: Codec + 'static, -{ - /// Creates a new Reactor Registrar. - pub fn new() -> ReactorRegistrar { - ReactorRegistrar { - inner: ReactorWorker::::registrar(), - } - } - - /// Sets the encoding. - pub fn encoding(&self) -> ReactorRegistrar - where - C: Codec + 'static, - { - ReactorRegistrar { - inner: self.inner.encoding::(), - } - } - - /// Registers the agent. - pub fn register(&self) { - self.inner.register() - } -} - -impl fmt::Debug for ReactorRegistrar -where - R: Reactor + 'static, - CODEC: Codec + 'static, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("ReactorRegistrar<_>").finish() - } -} - -pub(crate) enum ReactorWorkerMsg { - ReactorExited(HandlerId), -} - -pub(crate) struct ReactorWorker -where - R: 'static + Reactor, -{ - senders: HashMap::Input>>, - destruct_handle: Option>, -} - -impl Worker for ReactorWorker -where - R: 'static + Reactor, -{ - type Input = ReactorInput<::Input>; - type Message = ReactorWorkerMsg; - type Output = ReactorOutput<::Output>; - - fn create(_scope: &WorkerScope) -> Self { - Self { - senders: HashMap::new(), - destruct_handle: None, - } - } - - fn update(&mut self, _scope: &WorkerScope, msg: Self::Message) { - match msg { - ReactorWorkerMsg::ReactorExited(id) => { - self.senders.remove(&id); - } - } - - // All reactors have closed themselves, the worker can now close. - if self.destruct_handle.is_some() && self.senders.is_empty() { - self.destruct_handle = None; - } - } - - fn received(&mut self, scope: &WorkerScope, input: Self::Input, id: HandlerId) { - match input { - // We don't expose any bridge unless they send start message. - Self::Input::Start => { - let receiver = { - let (tx, rx) = mpsc::unbounded(); - self.senders.insert(id, tx); - R::Receiver::new(rx) - }; - - let sender = { - let (tx, mut rx) = mpsc::unbounded(); - let scope = scope.clone(); - - spawn_local(async move { - while let Some(m) = rx.next().await { - scope.respond(id, ReactorOutput::Output(m)); - } - - scope.respond(id, ReactorOutput::Finish); - }); - - R::Sender::new(tx) - }; - - scope.send_future(async move { - R::run(sender, receiver).await; - - ReactorWorkerMsg::ReactorExited(id) - }); - } - - Self::Input::Input(input) => { - if let Some(m) = self.senders.get_mut(&id) { - let _result = m.send_now(input); - } - } - } - } - - fn disconnected(&mut self, _scope: &WorkerScope, id: HandlerId) { - // We close this channel, but drop it when the reactor has exited itself. - if let Some(m) = self.senders.get_mut(&id) { - m.close_now(); - } - } - - fn destroy(&mut self, _scope: &WorkerScope, destruct: WorkerDestroyHandle) { - if !self.senders.is_empty() { - self.destruct_handle = Some(destruct); - } - } -} diff --git a/packages/yew-agent/src/reactor/tx_rx.rs b/packages/yew-agent/src/reactor/tx_rx.rs deleted file mode 100644 index dc34043ea4a..00000000000 --- a/packages/yew-agent/src/reactor/tx_rx.rs +++ /dev/null @@ -1,145 +0,0 @@ -use std::pin::Pin; - -use futures::sink::Sink; -use futures::stream::{FusedStream, Stream}; -use futures::task::{Context, Poll}; -use pin_project::pin_project; -use serde::{Deserialize, Serialize}; -use yew::platform::pinned::mpsc; -use yew::platform::pinned::mpsc::{SendError, TrySendError}; - -/// A receiver for reactors. -#[pin_project] -#[derive(Debug)] -pub struct ReactorReceiver -where - I: Serialize + for<'de> Deserialize<'de>, -{ - #[pin] - rx: mpsc::UnboundedReceiver, -} - -impl Stream for ReactorReceiver -where - I: Serialize + for<'de> Deserialize<'de>, -{ - type Item = I; - - #[inline] - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.project(); - this.rx.poll_next(cx) - } - - #[inline] - fn size_hint(&self) -> (usize, Option) { - self.rx.size_hint() - } -} - -impl FusedStream for ReactorReceiver -where - I: Serialize + for<'de> Deserialize<'de>, -{ - #[inline] - fn is_terminated(&self) -> bool { - self.rx.is_terminated() - } -} - -/// A trait to extract input type from [ReactorReceiver]. -pub trait ReactorReceivable { - /// The input message type. - type Input: Serialize + for<'de> Deserialize<'de>; - - /// Creates a ReactorReceiver. - fn new(rx: mpsc::UnboundedReceiver) -> Self; -} - -impl ReactorReceivable for ReactorReceiver -where - I: Serialize + for<'de> Deserialize<'de>, -{ - type Input = I; - - fn new(rx: mpsc::UnboundedReceiver) -> Self { - Self { rx } - } -} - -/// A sender for reactors. -#[derive(Debug)] -pub struct ReactorSender -where - O: Serialize + for<'de> Deserialize<'de>, -{ - tx: mpsc::UnboundedSender, -} - -impl Clone for ReactorSender -where - O: Serialize + for<'de> Deserialize<'de>, -{ - fn clone(&self) -> Self { - Self { - tx: self.tx.clone(), - } - } -} - -impl ReactorSender -where - O: Serialize + for<'de> Deserialize<'de>, -{ - /// Send an output. - pub fn send_now(&self, output: O) -> std::result::Result<(), SendError> { - self.tx.send_now(output) - } -} - -impl Sink for &'_ ReactorSender -where - O: Serialize + for<'de> Deserialize<'de>, -{ - type Error = TrySendError; - - #[inline] - fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut &self.tx).poll_ready(cx) - } - - #[inline] - fn start_send(self: Pin<&mut Self>, item: O) -> Result<(), Self::Error> { - Pin::new(&mut &self.tx).start_send(item) - } - - #[inline] - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut &self.tx).poll_flush(cx) - } - - #[inline] - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut &self.tx).poll_close(cx) - } -} - -/// A trait to extract output type from [ReactorSender]. -pub trait ReactorSendable { - /// The output message type. - type Output: Serialize + for<'de> Deserialize<'de>; - - /// Creates a ReactorSender. - fn new(tx: mpsc::UnboundedSender) -> Self; -} - -impl ReactorSendable for ReactorSender -where - O: Serialize + for<'de> Deserialize<'de>, -{ - type Output = O; - - fn new(tx: mpsc::UnboundedSender) -> Self { - Self { tx } - } -} diff --git a/packages/yew-agent/src/utils.rs b/packages/yew-agent/src/utils.rs new file mode 100644 index 00000000000..cea8b87d1aa --- /dev/null +++ b/packages/yew-agent/src/utils.rs @@ -0,0 +1,27 @@ +use std::rc::Rc; +use std::sync::atomic::{AtomicUsize, Ordering}; + +use yew::Reducible; + +/// Gets a unique worker id +pub(crate) fn get_next_id() -> usize { + static CTR: AtomicUsize = AtomicUsize::new(0); + + CTR.fetch_add(1, Ordering::SeqCst) +} + +#[derive(Default, PartialEq)] +pub(crate) struct BridgeCounter { + pub inner: usize, +} + +impl Reducible for BridgeCounter { + type Action = (); + + fn reduce(self: Rc, _: Self::Action) -> Rc { + Self { + inner: self.inner + 1, + } + .into() + } +} diff --git a/packages/yew-agent/src/worker/hooks.rs b/packages/yew-agent/src/worker/hooks.rs index 87f18ff6a3b..846d94683b3 100644 --- a/packages/yew-agent/src/worker/hooks.rs +++ b/packages/yew-agent/src/worker/hooks.rs @@ -7,32 +7,17 @@ use std::rc::Rc; use wasm_bindgen::prelude::*; use yew::prelude::*; +use crate::utils::BridgeCounter; use crate::worker::provider::WorkerProviderState; use crate::worker::{Worker, WorkerBridge}; -#[derive(Default)] -struct UseBridgeCounter { - inner: usize, -} - -impl Reducible for UseBridgeCounter { - type Action = (); - - fn reduce(self: Rc, _: Self::Action) -> Rc { - Self { - inner: self.inner + 1, - } - .into() - } -} - /// State handle for the [`use_worker_bridge`] hook. pub struct UseWorkerBridgeHandle where T: Worker, { inner: WorkerBridge, - ctr: UseReducerDispatcher, + ctr: UseReducerDispatcher, } impl UseWorkerBridgeHandle @@ -99,7 +84,7 @@ where T: Worker + 'static, F: Fn(T::Output) + 'static, { - let ctr = use_reducer(UseBridgeCounter::default); + let ctr = use_reducer(BridgeCounter::default); let worker_state = use_context::>() .expect_throw("cannot find a provider for current agent."); diff --git a/packages/yew-agent/src/worker/provider.rs b/packages/yew-agent/src/worker/provider.rs index 06cc8a007b7..69874f5d801 100644 --- a/packages/yew-agent/src/worker/provider.rs +++ b/packages/yew-agent/src/worker/provider.rs @@ -1,7 +1,6 @@ use std::cell::RefCell; use std::fmt; use std::rc::Rc; -use std::sync::atomic::{AtomicUsize, Ordering}; use gloo_worker::Spawnable; use serde::{Deserialize, Serialize}; @@ -9,6 +8,7 @@ use yew::prelude::*; use super::{Worker, WorkerBridge}; use crate::reach::Reach; +use crate::utils::get_next_id; use crate::{Bincode, Codec}; /// Properties for [WorkerProvider]. @@ -42,7 +42,7 @@ pub(crate) struct WorkerProviderState where W: Worker, { - ctr: usize, + id: usize, spawn_bridge_fn: Rc WorkerBridge>, reach: Reach, held_bridge: Rc>>>, @@ -93,7 +93,7 @@ where { fn clone(&self) -> Self { Self { - ctr: self.ctr, + id: self.id, spawn_bridge_fn: self.spawn_bridge_fn.clone(), reach: self.reach, held_bridge: self.held_bridge.clone(), @@ -106,12 +106,10 @@ where W: Worker, { fn eq(&self, rhs: &Self) -> bool { - self.ctr == rhs.ctr + self.id == rhs.id } } -static CTR: AtomicUsize = AtomicUsize::new(0); - /// A Worker Agent Provider. /// /// This component provides its children access to an worker agent. @@ -123,12 +121,6 @@ where W::Output: Serialize + for<'de> Deserialize<'de> + 'static, CODEC: Codec + 'static, { - // Creates a spawning function so CODEC is can be erased from contexts. - let spawn_bridge_fn: Rc WorkerBridge> = { - let path = props.path.clone(); - Rc::new(move || W::spawner().encoding::().spawn(&path)) - }; - let WorkerProviderProps { children, path, @@ -136,12 +128,16 @@ where reach, } = props.clone(); + // Creates a spawning function so CODEC is can be erased from contexts. + let spawn_bridge_fn: Rc WorkerBridge> = { + let path = path.clone(); + Rc::new(move || W::spawner().encoding::().spawn(&path)) + }; + let state = { use_memo((path, lazy, reach), move |(_path, lazy, reach)| { - let ctr = CTR.fetch_add(1, Ordering::SeqCst); - let state = WorkerProviderState:: { - ctr, + id: get_next_id(), spawn_bridge_fn, reach: *reach, held_bridge: Rc::default(),