Skip to content

Commit 5f28806

Browse files
committed
feat: introduce ImmediateEffect
1 parent c75397e commit 5f28806

File tree

2 files changed

+310
-0
lines changed

2 files changed

+310
-0
lines changed

reactive_graph/src/effect.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@
33
#[allow(clippy::module_inception)]
44
mod effect;
55
mod effect_function;
6+
mod immediate;
67
mod inner;
78
mod render_effect;
89

910
pub use effect::*;
1011
pub use effect_function::*;
12+
pub use immediate::*;
1113
pub use render_effect::*;
1214

1315
/// Creates a new render effect, which immediately runs `fun`.
Lines changed: 308 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,308 @@
1+
use crate::{
2+
effect::EffectFunction,
3+
graph::{AnySubscriber, ReactiveNode, ToAnySubscriber},
4+
owner::{ArenaItem, LocalStorage, Storage, SyncStorage},
5+
traits::Dispose,
6+
};
7+
use std::sync::{Arc, RwLock};
8+
9+
/// Effects run a certain chunk of code whenever the signals they depend on change.
10+
///
11+
/// The effect runs on creation and again as soon as any tracked signal changes.
12+
///
13+
/// NOTE: you probably want use [`Effect`](super::Effect) instead.
14+
/// This is for the few cases where it's important to execute effects immediately and in order.
15+
///
16+
/// Effects are intended to run *side-effects* of the system, not to synchronize state
17+
/// *within* the system. In other words: In most cases, you usually should not write to
18+
/// signals inside effects. (If you need to define a signal that depends on the value of
19+
/// other signals, use a derived signal or a [`Memo`](crate::computed::Memo)).
20+
///
21+
/// You can provide an effect function without parameters or one with one parameter.
22+
/// If you provide such a parameter, the effect function is called with an argument containing
23+
/// whatever value it returned the last time it ran. On the initial run, this is `None`.
24+
///
25+
/// Effects stop running when their reactive [`Owner`](crate::owner::Owner) is disposed.
26+
///
27+
/// ## Example
28+
///
29+
/// ```
30+
/// # use reactive_graph::computed::*;
31+
/// # use reactive_graph::signal::*; let owner = reactive_graph::owner::Owner::new(); owner.set();
32+
/// # use reactive_graph::prelude::*;
33+
/// # use reactive_graph::effect::Effect;
34+
/// # use reactive_graph::owner::ArenaItem;
35+
/// # let owner = reactive_graph::owner::Owner::new(); owner.set();
36+
/// let a = RwSignal::new(0);
37+
/// let b = RwSignal::new(0);
38+
///
39+
/// // ✅ use effects to interact between reactive state and the outside world
40+
/// ImmediateEffect::new(move || {
41+
/// // on the next “tick” prints "Value: 0" and subscribes to `a`
42+
/// println!("Value: {}", a.get());
43+
/// });
44+
///
45+
/// // The effect runs immediately and subscribes to `a`, in the process it prints "Value: 0"
46+
/// # assert_eq!(a.get(), 0);
47+
/// a.set(1);
48+
/// # assert_eq!(a.get(), 1);
49+
/// // ✅ because it's subscribed to `a`, the effect reruns and prints "Value: 1"
50+
///
51+
/// // ❌ don't use effects to synchronize state within the reactive system
52+
/// Effect::new(move || {
53+
/// // this technically works but can cause unnecessary runs
54+
/// // and easily lead to problems like infinite loops
55+
/// b.set(a.get() + 1);
56+
/// });
57+
/// ```
58+
/// ## Web-Specific Notes
59+
///
60+
/// 1. **Scheduling**: Effects run immediately, as soon as any tracked signal changes.
61+
/// 2. By default, effects do not run unless the `effects` feature is enabled. If you are using
62+
/// this with a web framework, this generally means that effects **do not run on the server**.
63+
/// and you can call browser-specific APIs within the effect function without causing issues.
64+
/// If you need an effect to run on the server, use [`ImmediateEffect::new_isomorphic`].
65+
#[derive(Debug, Clone, Copy)]
66+
pub struct ImmediateEffect<S> {
67+
inner: Option<ArenaItem<StoredEffect, S>>,
68+
}
69+
70+
type StoredEffect = Option<Arc<RwLock<inner::EffectInner>>>;
71+
72+
impl<S> Dispose for ImmediateEffect<S> {
73+
fn dispose(self) {
74+
if let Some(inner) = self.inner {
75+
inner.dispose()
76+
}
77+
}
78+
}
79+
80+
impl ImmediateEffect<LocalStorage> {
81+
/// Creates a new effect, which runs immediately, then again as soon as any tracked signal changes.
82+
pub fn new<T, M>(mut fun: impl EffectFunction<T, M> + 'static) -> Self
83+
where
84+
T: Send + Sync + 'static,
85+
{
86+
if !cfg!(feature = "effects") {
87+
return Self { inner: None };
88+
}
89+
90+
let fun = {
91+
let thread_id = std::thread::current().id();
92+
93+
/// A wrapper type that is always `Send` and `Sync`.
94+
struct UnsafeSendSync<T>(T);
95+
unsafe impl<T> Send for UnsafeSendSync<T> {}
96+
unsafe impl<T> Sync for UnsafeSendSync<T> {}
97+
impl<F, T, M> EffectFunction<T, M> for UnsafeSendSync<F>
98+
where
99+
F: EffectFunction<T, M> + 'static,
100+
{
101+
fn run(&mut self, p: Option<T>) -> T {
102+
self.0.run(p)
103+
}
104+
}
105+
106+
UnsafeSendSync(move |v| {
107+
assert_eq!(thread_id, std::thread::current().id());
108+
fun.run(v)
109+
})
110+
};
111+
112+
let inner = inner::EffectInner::new(fun);
113+
114+
inner.update_if_necessary();
115+
116+
Self {
117+
inner: Some(ArenaItem::new_with_storage(Some(inner))),
118+
}
119+
}
120+
}
121+
122+
impl ImmediateEffect<SyncStorage> {
123+
/// Creates a new effect, which runs immediately, then again as soon as any tracked signal changes.
124+
pub fn new<T, M>(
125+
fun: impl EffectFunction<T, M> + Send + Sync + 'static,
126+
) -> Self
127+
where
128+
T: Send + Sync + 'static,
129+
{
130+
if !cfg!(feature = "effects") {
131+
return Self { inner: None };
132+
}
133+
134+
Self::new_isomorphic(fun)
135+
}
136+
137+
/// Creates a new effect, which runs immediately, then again as soon as any tracked signal changes.
138+
///
139+
/// This will run whether the `effects` feature is enabled or not.
140+
pub fn new_isomorphic<T, M>(
141+
fun: impl EffectFunction<T, M> + Send + Sync + 'static,
142+
) -> Self
143+
where
144+
T: Send + Sync + 'static,
145+
{
146+
let inner = inner::EffectInner::new(fun);
147+
148+
inner.update_if_necessary();
149+
150+
Self {
151+
inner: Some(ArenaItem::new_with_storage(Some(inner))),
152+
}
153+
}
154+
}
155+
156+
impl<S> ToAnySubscriber for ImmediateEffect<S>
157+
where
158+
S: Storage<StoredEffect>,
159+
{
160+
fn to_any_subscriber(&self) -> AnySubscriber {
161+
const MSG: &str = "tried to set effect that has been stopped";
162+
let inner = self.inner.as_ref().expect(MSG);
163+
inner
164+
.try_with_value(|inner| Some(inner.as_ref()?.to_any_subscriber()))
165+
.flatten()
166+
.expect(MSG)
167+
}
168+
}
169+
170+
mod inner {
171+
use crate::{
172+
effect::EffectFunction,
173+
graph::{
174+
AnySource, AnySubscriber, ReactiveNode, ReactiveNodeState,
175+
SourceSet, Subscriber, ToAnySubscriber, WithObserver,
176+
},
177+
owner::Owner,
178+
};
179+
use or_poisoned::OrPoisoned;
180+
use std::sync::{Arc, RwLock, Weak};
181+
182+
/// Handles subscription logic for effects.
183+
pub(super) struct EffectInner {
184+
pub(super) owner: Owner,
185+
pub(super) state: ReactiveNodeState,
186+
pub(super) fun: Box<dyn FnMut() + Send + Sync>,
187+
pub(super) sources: SourceSet,
188+
pub(super) any_subscriber: AnySubscriber,
189+
}
190+
191+
impl EffectInner {
192+
pub fn new<T, M>(
193+
mut fun: impl EffectFunction<T, M> + Send + Sync + 'static,
194+
) -> Arc<RwLock<EffectInner>>
195+
where
196+
T: Send + Sync + 'static,
197+
{
198+
let owner = Owner::new();
199+
200+
let fun = {
201+
let mut value = None;
202+
move || value = Some(fun.run(value.take()))
203+
};
204+
205+
Arc::new_cyclic(|weak| {
206+
let any_subscriber = AnySubscriber(
207+
weak.as_ptr() as usize,
208+
Weak::clone(weak) as Weak<dyn Subscriber + Send + Sync>,
209+
);
210+
211+
RwLock::new(EffectInner {
212+
owner,
213+
state: ReactiveNodeState::Dirty,
214+
fun: Box::new(fun),
215+
sources: SourceSet::new(),
216+
any_subscriber,
217+
})
218+
})
219+
}
220+
}
221+
222+
impl ToAnySubscriber for Arc<RwLock<EffectInner>> {
223+
fn to_any_subscriber(&self) -> AnySubscriber {
224+
AnySubscriber(
225+
Arc::as_ptr(self) as usize,
226+
Arc::downgrade(self) as Weak<dyn Subscriber + Send + Sync>,
227+
)
228+
}
229+
}
230+
231+
impl ReactiveNode for RwLock<EffectInner> {
232+
fn mark_subscribers_check(&self) {}
233+
234+
fn update_if_necessary(&self) -> bool {
235+
let guard = self.read().or_poisoned();
236+
237+
if guard.owner.paused() {
238+
return false;
239+
}
240+
241+
let needs_update = match guard.state {
242+
ReactiveNodeState::Clean => false,
243+
ReactiveNodeState::Check => {
244+
let sources = guard.sources.clone();
245+
246+
drop(guard);
247+
248+
sources
249+
.into_iter()
250+
.any(|source| source.update_if_necessary())
251+
}
252+
ReactiveNodeState::Dirty => true,
253+
};
254+
255+
if needs_update {
256+
let guard = self.read().or_poisoned();
257+
258+
let owner = guard.owner.clone();
259+
let any_subscriber = guard.any_subscriber.clone();
260+
261+
drop(guard);
262+
263+
any_subscriber.clear_sources(&any_subscriber);
264+
265+
let mut guard = self.write().or_poisoned();
266+
267+
owner.with_cleanup(|| {
268+
any_subscriber.with_observer(|| (guard.fun)());
269+
});
270+
271+
guard.state = ReactiveNodeState::Clean;
272+
}
273+
274+
needs_update
275+
}
276+
277+
fn mark_check(&self) {
278+
self.write().or_poisoned().state = ReactiveNodeState::Check;
279+
self.update_if_necessary();
280+
}
281+
282+
fn mark_dirty(&self) {
283+
self.write().or_poisoned().state = ReactiveNodeState::Dirty;
284+
self.update_if_necessary();
285+
}
286+
}
287+
288+
impl Subscriber for RwLock<EffectInner> {
289+
fn add_source(&self, source: AnySource) {
290+
self.write().or_poisoned().sources.insert(source);
291+
}
292+
293+
fn clear_sources(&self, subscriber: &AnySubscriber) {
294+
self.write().or_poisoned().sources.clear_sources(subscriber);
295+
}
296+
}
297+
298+
impl std::fmt::Debug for EffectInner {
299+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
300+
f.debug_struct("EffectInner")
301+
.field("owner", &self.owner)
302+
.field("state", &self.state)
303+
.field("sources", &self.sources)
304+
.field("any_subscriber", &self.any_subscriber)
305+
.finish()
306+
}
307+
}
308+
}

0 commit comments

Comments
 (0)