diff --git a/or_poisoned/src/lib.rs b/or_poisoned/src/lib.rs index fd337f2d44..29bb10a56a 100644 --- a/or_poisoned/src/lib.rs +++ b/or_poisoned/src/lib.rs @@ -35,7 +35,7 @@ pub trait OrPoisoned { fn or_poisoned(self) -> Self::Inner; } -impl<'a, T> OrPoisoned +impl<'a, T: ?Sized> OrPoisoned for Result, PoisonError>> { type Inner = RwLockReadGuard<'a, T>; @@ -45,7 +45,7 @@ impl<'a, T> OrPoisoned } } -impl<'a, T> OrPoisoned +impl<'a, T: ?Sized> OrPoisoned for Result, PoisonError>> { type Inner = RwLockWriteGuard<'a, T>; @@ -55,7 +55,7 @@ impl<'a, T> OrPoisoned } } -impl<'a, T> OrPoisoned for LockResult> { +impl<'a, T: ?Sized> OrPoisoned for LockResult> { type Inner = MutexGuard<'a, T>; fn or_poisoned(self) -> Self::Inner { diff --git a/reactive_graph/src/effect.rs b/reactive_graph/src/effect.rs index bef29d70e3..0268cee240 100644 --- a/reactive_graph/src/effect.rs +++ b/reactive_graph/src/effect.rs @@ -3,11 +3,13 @@ #[allow(clippy::module_inception)] mod effect; mod effect_function; +mod immediate; mod inner; mod render_effect; pub use effect::*; pub use effect_function::*; +pub use immediate::*; pub use render_effect::*; /// Creates a new render effect, which immediately runs `fun`. diff --git a/reactive_graph/src/effect/effect.rs b/reactive_graph/src/effect/effect.rs index 24f1bff25e..e08e8f30a7 100644 --- a/reactive_graph/src/effect/effect.rs +++ b/reactive_graph/src/effect/effect.rs @@ -374,47 +374,16 @@ impl Effect { /// This spawns a task that can be run on any thread. For an effect that will be spawned on /// the current thread, use [`new`](Effect::new). pub fn new_sync( - mut fun: impl EffectFunction + Send + Sync + 'static, + fun: impl EffectFunction + Send + Sync + 'static, ) -> Self where T: Send + Sync + 'static, { - let inner = cfg!(feature = "effects").then(|| { - let (mut rx, owner, inner) = effect_base(); - let mut first_run = true; - let value = Arc::new(RwLock::new(None::)); - - crate::spawn({ - let value = Arc::clone(&value); - let subscriber = inner.to_any_subscriber(); - - async move { - while rx.next().await.is_some() { - if !owner.paused() - && (subscriber.with_observer(|| { - subscriber.update_if_necessary() - }) || first_run) - { - first_run = false; - subscriber.clear_sources(&subscriber); - - let old_value = - mem::take(&mut *value.write().or_poisoned()); - let new_value = owner.with_cleanup(|| { - subscriber.with_observer(|| { - run_in_effect_scope(|| fun.run(old_value)) - }) - }); - *value.write().or_poisoned() = Some(new_value); - } - } - } - }); - - ArenaItem::new_with_storage(Some(inner)) - }); + if !cfg!(feature = "effects") { + return Self { inner: None }; + } - Self { inner } + Self::new_isomorphic(fun) } /// Creates a new effect, which runs once on the next “tick”, and then runs again when reactive values diff --git a/reactive_graph/src/effect/immediate.rs b/reactive_graph/src/effect/immediate.rs new file mode 100644 index 0000000000..d000ccd683 --- /dev/null +++ b/reactive_graph/src/effect/immediate.rs @@ -0,0 +1,403 @@ +use crate::{ + graph::{AnySubscriber, ReactiveNode, ToAnySubscriber}, + owner::{ArenaItem, LocalStorage, Storage, SyncStorage}, + traits::{DefinedAt, Dispose}, +}; +use or_poisoned::OrPoisoned; +use std::{ + panic::Location, + sync::{Arc, Mutex, RwLock}, +}; + +/// Effects run a certain chunk of code whenever the signals they depend on change. +/// +/// The effect runs on creation and again as soon as any tracked signal changes. +/// +/// NOTE: you probably want use [`Effect`](super::Effect) instead. +/// This is for the few cases where it's important to execute effects immediately and in order. +/// +/// Effects are intended to run *side-effects* of the system, not to synchronize state +/// *within* the system. In other words: In most cases, you usually should not write to +/// signals inside effects. (If you need to define a signal that depends on the value of +/// other signals, use a derived signal or a [`Memo`](crate::computed::Memo)). +/// +/// You can provide an effect function without parameters or one with one parameter. +/// If you provide such a parameter, the effect function is called with an argument containing +/// whatever value it returned the last time it ran. On the initial run, this is `None`. +/// +/// Effects stop running when their reactive [`Owner`](crate::owner::Owner) is disposed. +/// +/// NOTE: since effects are executed immediately, they might recurse. +/// Under recursion only the last run to start is tracked. +/// +/// ## Example +/// +/// ``` +/// # use reactive_graph::computed::*; +/// # use reactive_graph::signal::*; let owner = reactive_graph::owner::Owner::new(); owner.set(); +/// # use reactive_graph::prelude::*; +/// # use reactive_graph::effect::immediateEffect; +/// # use reactive_graph::owner::ArenaItem; +/// # let owner = reactive_graph::owner::Owner::new(); owner.set(); +/// let a = RwSignal::new(0); +/// let b = RwSignal::new(0); +/// +/// // ✅ use effects to interact between reactive state and the outside world +/// ImmediateEffect::new(move || { +/// // on the next “tick” prints "Value: 0" and subscribes to `a` +/// println!("Value: {}", a.get()); +/// }); +/// +/// // The effect runs immediately and subscribes to `a`, in the process it prints "Value: 0" +/// # assert_eq!(a.get(), 0); +/// a.set(1); +/// # assert_eq!(a.get(), 1); +/// // ✅ because it's subscribed to `a`, the effect reruns and prints "Value: 1" +/// +/// // ❌ don't use effects to synchronize state within the reactive system +/// Effect::new(move || { +/// // this technically works but can cause unnecessary runs +/// // and easily lead to problems like infinite loops +/// b.set(a.get() + 1); +/// }); +/// ``` +/// ## Web-Specific Notes +/// +/// 1. **Scheduling**: Effects run immediately, as soon as any tracked signal changes. +/// 2. By default, effects do not run unless the `effects` feature is enabled. If you are using +/// this with a web framework, this generally means that effects **do not run on the server**. +/// and you can call browser-specific APIs within the effect function without causing issues. +/// If you need an effect to run on the server, use [`ImmediateEffect::new_isomorphic`]. +#[derive(Debug, Clone, Copy)] +pub struct ImmediateEffect { + inner: Option>, +} + +type StoredEffect = Option>>; + +impl Dispose for ImmediateEffect { + fn dispose(self) { + if let Some(inner) = self.inner { + inner.dispose() + } + } +} + +impl ImmediateEffect { + /// Creates a new effect which runs immediately, then again as soon as any tracked signal changes. + /// + /// NOTE: this requires a `Fn` function because it might recurse. + /// Use [Self::new_mut] to pass a `FnMut` function, it'll panic on recursion. + #[track_caller] + pub fn new(fun: impl Fn() + Send + Sync + 'static) -> Self { + if !cfg!(feature = "effects") { + return Self { inner: None }; + } + + let inner = inner::EffectInner::new(fun); + + inner.update_if_necessary(); + + Self { + inner: Some(ArenaItem::new_with_storage(Some(inner))), + } + } + /// Creates a new effect which runs immediately, then again as soon as any tracked signal changes. + /// + /// # Panics + /// Panics on recursion. Also see [Self::new] + #[track_caller] + pub fn new_mut(fun: impl FnMut() + Send + Sync + 'static) -> Self { + const MSG: &str = "The effect recursed or its function panicked."; + let fun = Mutex::new(fun); + Self::new(move || fun.try_lock().expect(MSG)()) + } +} + +impl ImmediateEffect { + /// Creates a new effect which runs immediately, then again as soon as any tracked signal changes. + #[track_caller] + pub fn new_sync(fun: impl Fn() + Send + Sync + 'static) -> Self { + if !cfg!(feature = "effects") { + return Self { inner: None }; + } + + Self::new_isomorphic(fun) + } + + /// Creates a new effect which runs immediately, then again as soon as any tracked signal changes. + /// + /// This will run whether the `effects` feature is enabled or not. + #[track_caller] + pub fn new_isomorphic(fun: impl Fn() + Send + Sync + 'static) -> Self { + let inner = inner::EffectInner::new(fun); + + inner.update_if_necessary(); + + Self { + inner: Some(ArenaItem::new_with_storage(Some(inner))), + } + } +} + +impl ToAnySubscriber for ImmediateEffect +where + S: Storage, +{ + fn to_any_subscriber(&self) -> AnySubscriber { + const MSG: &str = "tried to set effect that has been stopped"; + let inner = self.inner.as_ref().expect(MSG); + inner + .try_with_value(|inner| Some(inner.as_ref()?.to_any_subscriber())) + .flatten() + .expect(MSG) + } +} + +impl DefinedAt for ImmediateEffect +where + S: Storage, +{ + fn defined_at(&self) -> Option<&'static Location<'static>> { + self.inner + .as_ref()? + .try_with_value(|inner| { + inner.as_ref()?.read().or_poisoned().defined_at() + }) + .flatten() + } +} + +mod inner { + use crate::{ + graph::{ + AnySource, AnySubscriber, ReactiveNode, ReactiveNodeState, + SourceSet, Subscriber, ToAnySubscriber, WithObserver, + }, + log_warning, + owner::Owner, + traits::DefinedAt, + }; + use or_poisoned::OrPoisoned; + use std::{ + panic::Location, + sync::{Arc, RwLock, Weak}, + thread::{self, ThreadId}, + }; + + /// Handles subscription logic for effects. + /// + /// To handle parallelism and recursion we assign ordered (1..) ids to each run. + /// We only keep the sources tracked by the run with the highest id (the last one). + /// + /// We do this by: + /// - Clearing the sources before every run, so the last one clears anything before it. + /// - We stop tracking sources after the last run has completed. + /// (A parent run will start before and end after a recursive child run.) + /// - To handle parallelism with the last run, we only allow sources to be added by its thread. + pub(super) struct EffectInner { + #[cfg(any(debug_assertions, leptos_debuginfo))] + defined_at: &'static Location<'static>, + owner: Owner, + state: ReactiveNodeState, + /// The number of effect runs in this 'batch'. + /// Cleared when no runs are *ongoing* anymore. + /// Used to assign ordered ids to each run, and to know when we can clear these values. + run_count_start: usize, + /// The number of effect runs that have completed in the current 'batch'. + /// Cleared when no runs are *ongoing* anymore. + /// Used to know when we can clear these values. + run_done_count: usize, + /// Given ordered ids (1..), the run with the highest id that has completed in this 'batch'. + /// Cleared when no runs are *ongoing* anymore. + /// Used to know whether the current run is the latest one. + run_done_max: usize, + /// The [ThreadId] of the run with the highest id. + /// Used to prevent over-subscribing during parallel execution with the last run. + last_run_thread_id: ThreadId, + fun: Arc, + sources: SourceSet, + any_subscriber: AnySubscriber, + } + + impl EffectInner { + #[track_caller] + pub fn new( + fun: impl Fn() + Send + Sync + 'static, + ) -> Arc> { + let owner = Owner::new(); + + Arc::new_cyclic(|weak| { + let any_subscriber = AnySubscriber( + weak.as_ptr() as usize, + Weak::clone(weak) as Weak, + ); + + RwLock::new(EffectInner { + #[cfg(any(debug_assertions, leptos_debuginfo))] + defined_at: Location::caller(), + owner, + state: ReactiveNodeState::Dirty, + run_count_start: 0, + run_done_count: 0, + run_done_max: 0, + last_run_thread_id: thread::current().id(), + fun: Arc::new(fun), + sources: SourceSet::new(), + any_subscriber, + }) + }) + } + } + + impl ToAnySubscriber for Arc> { + fn to_any_subscriber(&self) -> AnySubscriber { + AnySubscriber( + Arc::as_ptr(self) as usize, + Arc::downgrade(self) as Weak, + ) + } + } + + impl ReactiveNode for RwLock { + fn mark_subscribers_check(&self) {} + + fn update_if_necessary(&self) -> bool { + let state = { + let guard = self.read().or_poisoned(); + + if guard.owner.paused() { + return false; + } + + guard.state + }; + + let needs_update = match state { + ReactiveNodeState::Clean => false, + ReactiveNodeState::Check => { + let sources = self.read().or_poisoned().sources.clone(); + sources + .into_iter() + .any(|source| source.update_if_necessary()) + } + ReactiveNodeState::Dirty => true, + }; + + if needs_update { + let mut guard = self.write().or_poisoned(); + + let owner = guard.owner.clone(); + let any_subscriber = guard.any_subscriber.clone(); + let fun = guard.fun.clone(); + + // New run has started. + guard.run_count_start += 1; + // We get a value for this run, the highest value will be what we keep the sources from. + let recursion_count = guard.run_count_start; + // We clear the sources before running the effect. + // Note that this is tied to the ordering of the initial write lock acquisition + // to ensure the last run is also the last to clear them. + guard.sources.clear_sources(&any_subscriber); + // Only this thread will be able to subscribe. + guard.last_run_thread_id = thread::current().id(); + + if recursion_count > 2 { + warn_excessive_recursion(&guard); + } + + drop(guard); + + // We execute the effect. + // Note that *this could happen in parallel across threads*. + owner.with_cleanup(|| any_subscriber.with_observer(|| fun())); + + let mut guard = self.write().or_poisoned(); + + // This run has completed. + guard.run_done_count += 1; + + // We update the done count. + // Sources will only be added if recursion_done_max < recursion_count_start. + // (Meaning the last run is not done yet.) + guard.run_done_max = + Ord::max(recursion_count, guard.run_done_max); + + // The same amount of runs has started and completed, + // so we can clear everything up for next time. + if guard.run_count_start == guard.run_done_count { + guard.run_count_start = 0; + guard.run_done_count = 0; + guard.run_done_max = 0; + // Can be left unchanged, it'll be set again next time. + // guard.last_run_thread_id = thread::current().id(); + } + + guard.state = ReactiveNodeState::Clean; + } + + needs_update + } + + fn mark_check(&self) { + self.write().or_poisoned().state = ReactiveNodeState::Check; + self.update_if_necessary(); + } + + fn mark_dirty(&self) { + self.write().or_poisoned().state = ReactiveNodeState::Dirty; + self.update_if_necessary(); + } + } + + impl Subscriber for RwLock { + fn add_source(&self, source: AnySource) { + let mut guard = self.write().or_poisoned(); + if guard.run_done_max < guard.run_count_start + && guard.last_run_thread_id == thread::current().id() + { + guard.sources.insert(source); + } + } + + fn clear_sources(&self, subscriber: &AnySubscriber) { + self.write().or_poisoned().sources.clear_sources(subscriber); + } + } + + impl DefinedAt for EffectInner { + fn defined_at(&self) -> Option<&'static Location<'static>> { + #[cfg(any(debug_assertions, leptos_debuginfo))] + { + Some(self.defined_at) + } + #[cfg(not(any(debug_assertions, leptos_debuginfo)))] + { + None + } + } + } + + impl std::fmt::Debug for EffectInner { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("EffectInner") + .field("owner", &self.owner) + .field("state", &self.state) + .field("sources", &self.sources) + .field("any_subscriber", &self.any_subscriber) + .finish() + } + } + + fn warn_excessive_recursion(effect: &EffectInner) { + const MSG: &str = "ImmediateEffect recursed more than once."; + match effect.defined_at() { + Some(defined_at) => { + log_warning(format_args!("{MSG} Defined at: {}", defined_at)); + } + None => { + log_warning(format_args!("{MSG}")); + } + } + } +} diff --git a/reactive_graph/src/effect/inner.rs b/reactive_graph/src/effect/inner.rs index a781f9c4a0..58224f5a27 100644 --- a/reactive_graph/src/effect/inner.rs +++ b/reactive_graph/src/effect/inner.rs @@ -30,21 +30,19 @@ impl ReactiveNode for RwLock { fn update_if_necessary(&self) -> bool { let mut guard = self.write().or_poisoned(); - let (is_dirty, sources) = - (guard.dirty, (!guard.dirty).then(|| guard.sources.clone())); - if is_dirty { + if guard.dirty { guard.dirty = false; return true; } + let sources = guard.sources.clone(); + drop(guard); - for source in sources.into_iter().flatten() { - if source.update_if_necessary() { - return true; - } - } - false + + sources + .into_iter() + .any(|source| source.update_if_necessary()) } fn mark_check(&self) { diff --git a/reactive_graph/tests/effect_immediate.rs b/reactive_graph/tests/effect_immediate.rs new file mode 100644 index 0000000000..75742fa224 --- /dev/null +++ b/reactive_graph/tests/effect_immediate.rs @@ -0,0 +1,176 @@ +#[cfg(feature = "effects")] +pub mod imports { + pub use any_spawner::Executor; + pub use reactive_graph::{ + effect::ImmediateEffect, owner::Owner, prelude::*, signal::RwSignal, + }; + pub use std::sync::{Arc, RwLock}; + pub use tokio::task; +} + +#[cfg(feature = "effects")] +#[test] +fn effect_runs() { + use imports::*; + + let owner = Owner::new(); + owner.set(); + + let a = RwSignal::new(-1); + + // simulate an arbitrary side effect + let b = Arc::new(RwLock::new(String::new())); + + ImmediateEffect::new({ + let b = b.clone(); + move || { + let formatted = format!("Value is {}", a.get()); + *b.write().unwrap() = formatted; + } + }); + assert_eq!(b.read().unwrap().as_str(), "Value is -1"); + + println!("setting to 1"); + a.set(1); + assert_eq!(b.read().unwrap().as_str(), "Value is 1"); +} + +#[cfg(feature = "effects")] +#[test] +fn dynamic_dependencies() { + use imports::*; + + let owner = Owner::new(); + owner.set(); + + let first = RwSignal::new("Greg"); + let last = RwSignal::new("Johnston"); + let use_last = RwSignal::new(true); + + let combined_count = Arc::new(RwLock::new(0)); + + ImmediateEffect::new({ + let combined_count = Arc::clone(&combined_count); + move || { + *combined_count.write().unwrap() += 1; + if use_last.get() { + println!("{} {}", first.get(), last.get()); + } else { + println!("{}", first.get()); + } + } + }); + + assert_eq!(*combined_count.read().unwrap(), 1); + + println!("\nsetting `first` to Bob"); + first.set("Bob"); + assert_eq!(*combined_count.read().unwrap(), 2); + + println!("\nsetting `last` to Bob"); + last.set("Thompson"); + assert_eq!(*combined_count.read().unwrap(), 3); + + println!("\nsetting `use_last` to false"); + use_last.set(false); + assert_eq!(*combined_count.read().unwrap(), 4); + + println!("\nsetting `last` to Jones"); + last.set("Jones"); + assert_eq!(*combined_count.read().unwrap(), 4); + + println!("\nsetting `last` to Jones"); + last.set("Smith"); + assert_eq!(*combined_count.read().unwrap(), 4); + + println!("\nsetting `last` to Stevens"); + last.set("Stevens"); + assert_eq!(*combined_count.read().unwrap(), 4); + + println!("\nsetting `use_last` to true"); + use_last.set(true); + assert_eq!(*combined_count.read().unwrap(), 5); +} + +#[cfg(feature = "effects")] +#[test] +fn recursive_effect_runs_recursively() { + use imports::*; + + let owner = Owner::new(); + owner.set(); + + let s = RwSignal::new(0); + + let logged_values = Arc::new(RwLock::new(Vec::new())); + + ImmediateEffect::new({ + let logged_values = Arc::clone(&logged_values); + move || { + let a = s.get(); + println!("a = {a}"); + logged_values.write().unwrap().push(a); + if a == 0 { + return; + } + s.set(0); + } + }); + + s.set(1); + s.set(2); + s.set(3); + + assert_eq!(0, s.get_untracked()); + assert_eq!(&*logged_values.read().unwrap(), &[0, 1, 0, 2, 0, 3, 0]); +} + +#[cfg(feature = "effects")] +#[test] +fn paused_effect_pauses() { + use imports::*; + use reactive_graph::owner::StoredValue; + + let owner = Owner::new(); + owner.set(); + + let a = RwSignal::new(-1); + + // simulate an arbitrary side effect + let runs = StoredValue::new(0); + + let owner = StoredValue::new(None); + + ImmediateEffect::new({ + move || { + *owner.write_value() = Owner::current(); + + let _ = a.get(); + *runs.write_value() += 1; + } + }); + + assert_eq!(runs.get_value(), 1); + + println!("setting to 1"); + a.set(1); + + assert_eq!(runs.get_value(), 2); + + println!("pausing"); + owner.get_value().unwrap().pause(); + + println!("setting to 2"); + a.set(2); + + assert_eq!(runs.get_value(), 2); + + println!("resuming"); + owner.get_value().unwrap().resume(); + + println!("setting to 3"); + a.set(3); + + println!("checking value"); + assert_eq!(runs.get_value(), 3); +}