Skip to content

Commit 34c12b4

Browse files
committed
Turbopack: refactor backend jobs
avoids any boxes and casting avoid extra vec collecting avoid cloning lists for parallelization
1 parent 4e6fb8b commit 34c12b4

File tree

6 files changed

+139
-133
lines changed

6 files changed

+139
-133
lines changed

turbopack/crates/turbo-tasks-backend/src/backend/mod.rs

Lines changed: 115 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,13 @@ mod operation;
33
mod storage;
44

55
use std::{
6-
any::Any,
76
borrow::Cow,
7+
cmp::min,
88
fmt::{self, Write},
99
future::Future,
1010
hash::BuildHasherDefault,
1111
mem::take,
12+
ops::Range,
1213
pin::Pin,
1314
sync::{
1415
Arc,
@@ -26,11 +27,11 @@ use smallvec::{SmallVec, smallvec};
2627
use tokio::time::{Duration, Instant};
2728
use tracing::{field::Empty, info_span};
2829
use turbo_tasks::{
29-
CellId, FxDashMap, KeyValuePair, RawVc, ReadCellOptions, ReadConsistency, SessionId,
30-
TRANSIENT_TASK_BIT, TaskExecutionReason, TaskId, TraitTypeId, TurboTasksBackendApi,
30+
CellId, FxDashMap, FxIndexMap, KeyValuePair, RawVc, ReadCellOptions, ReadConsistency,
31+
SessionId, TRANSIENT_TASK_BIT, TaskExecutionReason, TaskId, TraitTypeId, TurboTasksBackendApi,
3132
ValueTypeId,
3233
backend::{
33-
Backend, BackendJobId, CachedTaskType, CellContent, TaskExecutionSpec, TransientTaskRoot,
34+
Backend, CachedTaskType, CellContent, TaskExecutionSpec, TransientTaskRoot,
3435
TransientTaskType, TurboTasksExecutionError, TypedCellContent,
3536
},
3637
event::{Event, EventListener},
@@ -39,7 +40,7 @@ use turbo_tasks::{
3940
task_statistics::TaskStatisticsApi,
4041
trace::TraceRawVcs,
4142
turbo_tasks,
42-
util::{IdFactoryWithReuse, good_chunk_size, into_chunks},
43+
util::{IdFactoryWithReuse, good_chunk_size},
4344
};
4445

4546
pub use self::{operation::AnyOperation, storage::TaskDataCategory};
@@ -70,11 +71,6 @@ use crate::{
7071
},
7172
};
7273

73-
const BACKEND_JOB_INITIAL_SNAPSHOT: BackendJobId = unsafe { BackendJobId::new_unchecked(1) };
74-
const BACKEND_JOB_FOLLOW_UP_SNAPSHOT: BackendJobId = unsafe { BackendJobId::new_unchecked(2) };
75-
const BACKEND_JOB_PREFETCH_TASK: BackendJobId = unsafe { BackendJobId::new_unchecked(3) };
76-
const BACKEND_JOB_PREFETCH_CHUNK_TASK: BackendJobId = unsafe { BackendJobId::new_unchecked(4) };
77-
7874
const SNAPSHOT_REQUESTED_BIT: usize = 1 << (usize::BITS - 1);
7975

8076
struct SnapshotRequest {
@@ -160,6 +156,15 @@ impl Default for BackendOptions {
160156
}
161157
}
162158

159+
pub enum TurboTasksBackendJob {
160+
InitialSnapshot,
161+
FollowUpSnapshot,
162+
Prefetch {
163+
data: Arc<FxIndexMap<TaskId, bool>>,
164+
range: Option<Range<usize>>,
165+
},
166+
}
167+
163168
pub struct TurboTasksBackend<B: BackingStorage>(Arc<TurboTasksBackendInner<B>>);
164169

165170
type TaskCacheLog = Sharded<ChunkedVec<(Arc<CachedTaskType>, TaskId)>>;
@@ -1210,7 +1215,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
12101215

12111216
if self.should_persist() {
12121217
// Schedule the snapshot job
1213-
turbo_tasks.schedule_backend_background_job(BACKEND_JOB_INITIAL_SNAPSHOT, None);
1218+
turbo_tasks.schedule_backend_background_job(TurboTasksBackendJob::InitialSnapshot);
12141219
}
12151220
}
12161221

@@ -2135,115 +2140,118 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
21352140

21362141
fn run_backend_job<'a>(
21372142
self: &'a Arc<Self>,
2138-
id: BackendJobId,
2139-
data: Option<Box<dyn Any + Send>>,
2143+
job: TurboTasksBackendJob,
21402144
turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
21412145
) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
21422146
Box::pin(async move {
2143-
if id == BACKEND_JOB_INITIAL_SNAPSHOT || id == BACKEND_JOB_FOLLOW_UP_SNAPSHOT {
2144-
debug_assert!(self.should_persist());
2145-
2146-
let last_snapshot = self.last_snapshot.load(Ordering::Relaxed);
2147-
let mut last_snapshot = self.start_time + Duration::from_millis(last_snapshot);
2148-
loop {
2149-
const FIRST_SNAPSHOT_WAIT: Duration = Duration::from_secs(300);
2150-
const SNAPSHOT_INTERVAL: Duration = Duration::from_secs(120);
2151-
const IDLE_TIMEOUT: Duration = Duration::from_secs(2);
2152-
2153-
let time = if id == BACKEND_JOB_INITIAL_SNAPSHOT {
2154-
FIRST_SNAPSHOT_WAIT
2155-
} else {
2156-
SNAPSHOT_INTERVAL
2157-
};
2158-
2159-
let until = last_snapshot + time;
2160-
if until > Instant::now() {
2161-
let mut stop_listener = self.stopping_event.listen();
2162-
if self.stopping.load(Ordering::Acquire) {
2163-
return;
2164-
}
2165-
let mut idle_start_listener = self.idle_start_event.listen();
2166-
let mut idle_end_listener = self.idle_end_event.listen();
2167-
let mut idle_time = if turbo_tasks.is_idle() {
2168-
Instant::now() + IDLE_TIMEOUT
2147+
match job {
2148+
TurboTasksBackendJob::InitialSnapshot | TurboTasksBackendJob::FollowUpSnapshot => {
2149+
debug_assert!(self.should_persist());
2150+
2151+
let last_snapshot = self.last_snapshot.load(Ordering::Relaxed);
2152+
let mut last_snapshot = self.start_time + Duration::from_millis(last_snapshot);
2153+
loop {
2154+
const FIRST_SNAPSHOT_WAIT: Duration = Duration::from_secs(300);
2155+
const SNAPSHOT_INTERVAL: Duration = Duration::from_secs(120);
2156+
const IDLE_TIMEOUT: Duration = Duration::from_secs(2);
2157+
2158+
let time = if matches!(job, TurboTasksBackendJob::InitialSnapshot) {
2159+
FIRST_SNAPSHOT_WAIT
21692160
} else {
2170-
far_future()
2161+
SNAPSHOT_INTERVAL
21712162
};
2172-
loop {
2173-
tokio::select! {
2174-
_ = &mut stop_listener => {
2175-
return;
2176-
},
2177-
_ = &mut idle_start_listener => {
2178-
idle_time = Instant::now() + IDLE_TIMEOUT;
2179-
idle_start_listener = self.idle_start_event.listen()
2180-
},
2181-
_ = &mut idle_end_listener => {
2182-
idle_time = until + IDLE_TIMEOUT;
2183-
idle_end_listener = self.idle_end_event.listen()
2184-
},
2185-
_ = tokio::time::sleep_until(until) => {
2186-
break;
2187-
},
2188-
_ = tokio::time::sleep_until(idle_time) => {
2189-
if turbo_tasks.is_idle() {
2163+
2164+
let until = last_snapshot + time;
2165+
if until > Instant::now() {
2166+
let mut stop_listener = self.stopping_event.listen();
2167+
if self.stopping.load(Ordering::Acquire) {
2168+
return;
2169+
}
2170+
let mut idle_start_listener = self.idle_start_event.listen();
2171+
let mut idle_end_listener = self.idle_end_event.listen();
2172+
let mut idle_time = if turbo_tasks.is_idle() {
2173+
Instant::now() + IDLE_TIMEOUT
2174+
} else {
2175+
far_future()
2176+
};
2177+
loop {
2178+
tokio::select! {
2179+
_ = &mut stop_listener => {
2180+
return;
2181+
},
2182+
_ = &mut idle_start_listener => {
2183+
idle_time = Instant::now() + IDLE_TIMEOUT;
2184+
idle_start_listener = self.idle_start_event.listen()
2185+
},
2186+
_ = &mut idle_end_listener => {
2187+
idle_time = until + IDLE_TIMEOUT;
2188+
idle_end_listener = self.idle_end_event.listen()
2189+
},
2190+
_ = tokio::time::sleep_until(until) => {
21902191
break;
2191-
}
2192-
},
2192+
},
2193+
_ = tokio::time::sleep_until(idle_time) => {
2194+
if turbo_tasks.is_idle() {
2195+
break;
2196+
}
2197+
},
2198+
}
21932199
}
21942200
}
2195-
}
21962201

2197-
let this = self.clone();
2198-
let snapshot = this.snapshot();
2199-
if let Some((snapshot_start, new_data)) = snapshot {
2200-
last_snapshot = snapshot_start;
2201-
if new_data {
2202-
continue;
2203-
}
2204-
let last_snapshot = last_snapshot.duration_since(self.start_time);
2205-
self.last_snapshot.store(
2206-
last_snapshot.as_millis().try_into().unwrap(),
2207-
Ordering::Relaxed,
2208-
);
2202+
let this = self.clone();
2203+
let snapshot = this.snapshot();
2204+
if let Some((snapshot_start, new_data)) = snapshot {
2205+
last_snapshot = snapshot_start;
2206+
if new_data {
2207+
continue;
2208+
}
2209+
let last_snapshot = last_snapshot.duration_since(self.start_time);
2210+
self.last_snapshot.store(
2211+
last_snapshot.as_millis().try_into().unwrap(),
2212+
Ordering::Relaxed,
2213+
);
22092214

2210-
turbo_tasks
2211-
.schedule_backend_background_job(BACKEND_JOB_FOLLOW_UP_SNAPSHOT, None);
2212-
return;
2215+
turbo_tasks.schedule_backend_background_job(
2216+
TurboTasksBackendJob::FollowUpSnapshot,
2217+
);
2218+
return;
2219+
}
22132220
}
22142221
}
2215-
} else if id == BACKEND_JOB_PREFETCH_TASK || id == BACKEND_JOB_PREFETCH_CHUNK_TASK {
2216-
const DATA_EXPECTATION: &str =
2217-
"Expected data to be a FxHashMap<TaskId, bool> for BACKEND_JOB_PREFETCH_TASK";
2218-
let data = Box::<dyn Any + Send>::downcast::<Vec<(TaskId, bool)>>(
2219-
data.expect(DATA_EXPECTATION),
2220-
)
2221-
.expect(DATA_EXPECTATION);
2222-
2223-
fn prefetch_task(ctx: &mut impl ExecuteContext<'_>, task: TaskId, with_data: bool) {
2224-
let category = if with_data {
2225-
TaskDataCategory::All
2222+
TurboTasksBackendJob::Prefetch { data, range } => {
2223+
let range = if let Some(range) = range {
2224+
range
22262225
} else {
2227-
TaskDataCategory::Meta
2226+
if data.len() > 128 {
2227+
let chunk_size = good_chunk_size(data.len());
2228+
let chunks = data.len().div_ceil(chunk_size);
2229+
for i in 0..chunks {
2230+
turbo_tasks.schedule_backend_foreground_job(
2231+
TurboTasksBackendJob::Prefetch {
2232+
data: data.clone(),
2233+
range: Some(
2234+
(i * chunk_size)..min(data.len(), (i + 1) * chunk_size),
2235+
),
2236+
},
2237+
);
2238+
}
2239+
return;
2240+
}
2241+
0..data.len()
22282242
};
2229-
// Prefetch the task
2230-
drop(ctx.task(task, category));
2231-
}
22322243

2233-
if id == BACKEND_JOB_PREFETCH_TASK && data.len() > 128 {
2234-
let chunk_size = good_chunk_size(data.len());
2235-
for chunk in into_chunks(*data, chunk_size) {
2236-
let data: Box<Vec<(TaskId, bool)>> = Box::new(chunk.collect::<Vec<_>>());
2237-
turbo_tasks.schedule_backend_foreground_job(
2238-
BACKEND_JOB_PREFETCH_CHUNK_TASK,
2239-
Some(data),
2240-
);
2241-
}
2242-
} else {
22432244
let _span = info_span!("prefetching").entered();
22442245
let mut ctx = self.execute_context(turbo_tasks);
2245-
for (task, with_data) in data.into_iter() {
2246-
prefetch_task(&mut ctx, task, with_data);
2246+
for i in range {
2247+
let (&task, &with_data) = data.get_index(i).unwrap();
2248+
let category = if with_data {
2249+
TaskDataCategory::All
2250+
} else {
2251+
TaskDataCategory::Meta
2252+
};
2253+
// Prefetch the task
2254+
drop(ctx.task(task, category));
22472255
}
22482256
}
22492257
}
@@ -2934,13 +2942,14 @@ impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
29342942
)
29352943
}
29362944

2945+
type BackendJob = TurboTasksBackendJob;
2946+
29372947
fn run_backend_job<'a>(
29382948
&'a self,
2939-
id: BackendJobId,
2940-
data: Option<Box<dyn Any + Send>>,
2949+
job: Self::BackendJob,
29412950
turbo_tasks: &'a dyn TurboTasksBackendApi<Self>,
29422951
) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
2943-
self.0.run_backend_job(id, data, turbo_tasks)
2952+
self.0.run_backend_job(job, turbo_tasks)
29442953
}
29452954

29462955
fn try_read_task_output(

turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,16 @@ mod update_output;
1111
use std::{
1212
fmt::{Debug, Formatter},
1313
mem::transmute,
14-
sync::atomic::Ordering,
14+
sync::{Arc, atomic::Ordering},
1515
};
1616

17-
use rustc_hash::FxHashMap;
1817
use serde::{Deserialize, Serialize};
19-
use turbo_tasks::{KeyValuePair, SessionId, TaskId, TurboTasksBackendApi};
18+
use turbo_tasks::{FxIndexMap, KeyValuePair, SessionId, TaskId, TurboTasksBackendApi};
2019

2120
use crate::{
2221
backend::{
23-
BACKEND_JOB_PREFETCH_TASK, OperationGuard, TaskDataCategory, TransientTask,
24-
TurboTasksBackend, TurboTasksBackendInner,
22+
OperationGuard, TaskDataCategory, TransientTask, TurboTasksBackend, TurboTasksBackendInner,
23+
TurboTasksBackendJob,
2524
storage::{SpecificTaskDataCategory, StorageWriteGuard, iter_many},
2625
},
2726
backing_storage::BackingStorage,
@@ -265,10 +264,11 @@ where
265264

266265
fn schedule_task(&self, mut task: impl TaskGuard + '_) {
267266
if let Some(tasks_to_prefetch) = task.prefetch() {
268-
self.turbo_tasks.schedule_backend_foreground_job(
269-
BACKEND_JOB_PREFETCH_TASK,
270-
Some(Box::new(tasks_to_prefetch)),
271-
);
267+
self.turbo_tasks
268+
.schedule_backend_foreground_job(TurboTasksBackendJob::Prefetch {
269+
data: Arc::new(tasks_to_prefetch),
270+
range: None,
271+
});
272272
}
273273
self.turbo_tasks.schedule(task.id());
274274
}
@@ -336,7 +336,7 @@ pub trait TaskGuard: Debug {
336336
where
337337
F: for<'a> FnMut(CachedDataItemKey, CachedDataItemValueRef<'a>) -> bool + 'l;
338338
fn invalidate_serialization(&mut self);
339-
fn prefetch(&mut self) -> Option<Vec<(TaskId, bool)>>;
339+
fn prefetch(&mut self) -> Option<FxIndexMap<TaskId, bool>>;
340340
fn is_immutable(&self) -> bool;
341341
}
342342

@@ -526,15 +526,15 @@ impl<B: BackingStorage> TaskGuard for TaskGuardImpl<'_, B> {
526526
}
527527
}
528528

529-
fn prefetch(&mut self) -> Option<Vec<(TaskId, bool)>> {
529+
fn prefetch(&mut self) -> Option<FxIndexMap<TaskId, bool>> {
530530
if !self.task.state().prefetched() {
531531
self.task.state_mut().set_prefetched(true);
532532
let map = iter_many!(self, OutputDependency { target } => (target, false))
533533
.chain(iter_many!(self, CellDependency { target } => (target.task, true)))
534534
.chain(iter_many!(self, CollectiblesDependency { target } => (target.task, true)))
535-
.collect::<FxHashMap<_, _>>();
535+
.collect::<FxIndexMap<_, _>>();
536536
if map.len() > 16 {
537-
return Some(map.into_iter().collect());
537+
return Some(map);
538538
}
539539
}
540540
None

turbopack/crates/turbo-tasks/src/backend.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use std::{
2-
any::Any,
32
borrow::Cow,
43
error::Error,
54
fmt::{self, Debug, Display},
@@ -17,7 +16,6 @@ use serde::{Deserialize, Serialize};
1716
use tracing::Span;
1817
use turbo_rcstr::RcStr;
1918

20-
pub use crate::id::BackendJobId;
2119
use crate::{
2220
RawVc, ReadCellOptions, ReadRef, SharedReference, TaskId, TaskIdSet, TraitRef, TraitTypeId,
2321
TurboTasksPanic, ValueTypeId, VcRead, VcValueTrait, VcValueType,
@@ -583,10 +581,11 @@ pub trait Backend: Sync + Send {
583581
turbo_tasks: &dyn TurboTasksBackendApi<Self>,
584582
) -> bool;
585583

584+
type BackendJob: Send + 'static;
585+
586586
fn run_backend_job<'a>(
587587
&'a self,
588-
id: BackendJobId,
589-
data: Option<Box<dyn Any + Send>>,
588+
job: Self::BackendJob,
590589
turbo_tasks: &'a dyn TurboTasksBackendApi<Self>,
591590
) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>>;
592591

0 commit comments

Comments
 (0)