Skip to content

Commit

Permalink
Hydration fix
Browse files Browse the repository at this point in the history
  • Loading branch information
zakstucke committed Nov 23, 2024
1 parent 980595f commit 7f45361
Showing 1 changed file with 45 additions and 19 deletions.
64 changes: 45 additions & 19 deletions reactive_graph/src/computed/async_derived/arc_async_derived.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,8 @@ macro_rules! spawn_derived {
let inner = Arc::downgrade(&this.inner);
let wakers = Arc::downgrade(&this.wakers);
let loading = Arc::downgrade(&this.loading);
#[cfg(debug_assertions)]
let defined_at = this.defined_at;
let fut = async move {
while rx.next().await.is_some() {
let update_if_necessary = if $should_track {
Expand All @@ -323,8 +325,29 @@ macro_rules! spawn_derived {
if update_if_necessary || first_run.is_some() {
match (value.upgrade(), inner.upgrade(), wakers.upgrade(), loading.upgrade()) {
(Some(value), Some(inner), Some(wakers), Some(loading)) => {
let this = ArcAsyncDerived {
#[cfg(debug_assertions)]
defined_at,
value,
wakers,
inner,
loading,
};

let owner = this.inner.read().or_poisoned().owner.clone();

this.loading.store(true, Ordering::Relaxed);

if first_run.is_none() {
owner.with(|| {
let _ = this.try_read_untracked();
for sub in (&this.inner.read().or_poisoned().subscribers).into_iter() {
sub.mark_dirty();
}
});
}

// generate new Future
let owner = inner.read().or_poisoned().owner.clone();
let fut = initial_fut.take().unwrap_or_else(|| {
let fut = if $should_track {
owner.with_cleanup(|| {
Expand All @@ -342,6 +365,8 @@ macro_rules! spawn_derived {
Box::pin(fut)
});

this.loading.store(true, Ordering::Relaxed);

// register with global transition listener, if any
let ready_tx = first_run.take().unwrap_or_else(|| {
let (ready_tx, ready_rx) = oneshot::channel();
Expand All @@ -351,11 +376,8 @@ macro_rules! spawn_derived {
ready_tx
});

// generate and assign new value
loading.store(true, Ordering::Relaxed);

let (this_version, suspense_ids) = {
let mut guard = inner.write().or_poisoned();
let mut guard = this.inner.write().or_poisoned();
guard.version += 1;
let version = guard.version;
let suspense_ids = mem::take(&mut guard.suspenses)
Expand All @@ -369,10 +391,10 @@ macro_rules! spawn_derived {

drop(suspense_ids);

let latest_version = inner.read().or_poisoned().version;
let latest_version = this.inner.read().or_poisoned().version;

if latest_version == this_version {
Self::set_inner_value(new_value, value, wakers, inner, loading, Some(ready_tx)).await;
Self::set_inner_value(new_value, this.value, this.wakers, this.inner, this.loading, Some(ready_tx)).await;
}
}
_ => break,
Expand Down Expand Up @@ -583,18 +605,22 @@ impl<T: 'static> ReadUntracked for ArcAsyncDerived<T> {
type Value = ReadGuard<Option<T>, AsyncPlain<Option<T>>>;

fn try_read_untracked(&self) -> Option<Self::Value> {
if let Some(suspense_context) = use_context::<SuspenseContext>() {
let handle = suspense_context.task_id();
let ready = SpecialNonReactiveFuture::new(self.ready());
crate::spawn(async move {
ready.await;
drop(handle);
});
self.inner
.write()
.or_poisoned()
.suspenses
.push(suspense_context);
if self.loading.load(Ordering::Relaxed)
|| self.value.blocking_read().is_none()
{
if let Some(suspense_context) = use_context::<SuspenseContext>() {
let handle = suspense_context.task_id();
let ready = SpecialNonReactiveFuture::new(self.ready());
crate::spawn(async move {
ready.await;
drop(handle);
});
self.inner
.write()
.or_poisoned()
.suspenses
.push(suspense_context);
}
}
AsyncPlain::try_new(&self.value).map(ReadGuard::new)
}
Expand Down

0 comments on commit 7f45361

Please sign in to comment.