From 98793e015946c06a2d9493e5e6637d52a24ebe7d Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Mon, 25 Aug 2025 22:37:12 +0200 Subject: [PATCH] Turbopack: refactor backend jobs avoids any boxes and casting avoid extra vec collecting avoid cloning lists for parallelization --- .../turbo-tasks-backend/src/backend/mod.rs | 221 +++++++++--------- .../src/backend/operation/mod.rs | 26 +-- turbopack/crates/turbo-tasks/src/backend.rs | 7 +- turbopack/crates/turbo-tasks/src/id.rs | 1 - .../crates/turbo-tasks/src/id_factory.rs | 3 +- turbopack/crates/turbo-tasks/src/manager.rs | 14 +- 6 files changed, 139 insertions(+), 133 deletions(-) diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs index ac6f70958a374..9f677fa10334a 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs @@ -3,12 +3,13 @@ mod operation; mod storage; use std::{ - any::Any, borrow::Cow, + cmp::min, fmt::{self, Write}, future::Future, hash::BuildHasherDefault, mem::take, + ops::Range, pin::Pin, sync::{ Arc, @@ -26,11 +27,11 @@ use smallvec::{SmallVec, smallvec}; use tokio::time::{Duration, Instant}; use tracing::{field::Empty, info_span}; use turbo_tasks::{ - CellId, FxDashMap, KeyValuePair, RawVc, ReadCellOptions, ReadConsistency, SessionId, - TRANSIENT_TASK_BIT, TaskExecutionReason, TaskId, TraitTypeId, TurboTasksBackendApi, + CellId, FxDashMap, FxIndexMap, KeyValuePair, RawVc, ReadCellOptions, ReadConsistency, + SessionId, TRANSIENT_TASK_BIT, TaskExecutionReason, TaskId, TraitTypeId, TurboTasksBackendApi, ValueTypeId, backend::{ - Backend, BackendJobId, CachedTaskType, CellContent, TaskExecutionSpec, TransientTaskRoot, + Backend, CachedTaskType, CellContent, TaskExecutionSpec, TransientTaskRoot, TransientTaskType, TurboTasksExecutionError, TypedCellContent, }, event::{Event, EventListener}, @@ -39,7 +40,7 @@ use turbo_tasks::{ task_statistics::TaskStatisticsApi, trace::TraceRawVcs, turbo_tasks, - util::{IdFactoryWithReuse, good_chunk_size, into_chunks}, + util::{IdFactoryWithReuse, good_chunk_size}, }; pub use self::{operation::AnyOperation, storage::TaskDataCategory}; @@ -70,11 +71,6 @@ use crate::{ }, }; -const BACKEND_JOB_INITIAL_SNAPSHOT: BackendJobId = unsafe { BackendJobId::new_unchecked(1) }; -const BACKEND_JOB_FOLLOW_UP_SNAPSHOT: BackendJobId = unsafe { BackendJobId::new_unchecked(2) }; -const BACKEND_JOB_PREFETCH_TASK: BackendJobId = unsafe { BackendJobId::new_unchecked(3) }; -const BACKEND_JOB_PREFETCH_CHUNK_TASK: BackendJobId = unsafe { BackendJobId::new_unchecked(4) }; - const SNAPSHOT_REQUESTED_BIT: usize = 1 << (usize::BITS - 1); struct SnapshotRequest { @@ -160,6 +156,15 @@ impl Default for BackendOptions { } } +pub enum TurboTasksBackendJob { + InitialSnapshot, + FollowUpSnapshot, + Prefetch { + data: Arc>, + range: Option>, + }, +} + pub struct TurboTasksBackend(Arc>); type TaskCacheLog = Sharded, TaskId)>>; @@ -1210,7 +1215,7 @@ impl TurboTasksBackendInner { if self.should_persist() { // Schedule the snapshot job - turbo_tasks.schedule_backend_background_job(BACKEND_JOB_INITIAL_SNAPSHOT, None); + turbo_tasks.schedule_backend_background_job(TurboTasksBackendJob::InitialSnapshot); } } @@ -2135,115 +2140,118 @@ impl TurboTasksBackendInner { fn run_backend_job<'a>( self: &'a Arc, - id: BackendJobId, - data: Option>, + job: TurboTasksBackendJob, turbo_tasks: &'a dyn TurboTasksBackendApi>, ) -> Pin + Send + 'a>> { Box::pin(async move { - if id == BACKEND_JOB_INITIAL_SNAPSHOT || id == BACKEND_JOB_FOLLOW_UP_SNAPSHOT { - debug_assert!(self.should_persist()); - - let last_snapshot = self.last_snapshot.load(Ordering::Relaxed); - let mut last_snapshot = self.start_time + Duration::from_millis(last_snapshot); - loop { - const FIRST_SNAPSHOT_WAIT: Duration = Duration::from_secs(300); - const SNAPSHOT_INTERVAL: Duration = Duration::from_secs(120); - const IDLE_TIMEOUT: Duration = Duration::from_secs(2); - - let time = if id == BACKEND_JOB_INITIAL_SNAPSHOT { - FIRST_SNAPSHOT_WAIT - } else { - SNAPSHOT_INTERVAL - }; - - let until = last_snapshot + time; - if until > Instant::now() { - let mut stop_listener = self.stopping_event.listen(); - if self.stopping.load(Ordering::Acquire) { - return; - } - let mut idle_start_listener = self.idle_start_event.listen(); - let mut idle_end_listener = self.idle_end_event.listen(); - let mut idle_time = if turbo_tasks.is_idle() { - Instant::now() + IDLE_TIMEOUT + match job { + TurboTasksBackendJob::InitialSnapshot | TurboTasksBackendJob::FollowUpSnapshot => { + debug_assert!(self.should_persist()); + + let last_snapshot = self.last_snapshot.load(Ordering::Relaxed); + let mut last_snapshot = self.start_time + Duration::from_millis(last_snapshot); + loop { + const FIRST_SNAPSHOT_WAIT: Duration = Duration::from_secs(300); + const SNAPSHOT_INTERVAL: Duration = Duration::from_secs(120); + const IDLE_TIMEOUT: Duration = Duration::from_secs(2); + + let time = if matches!(job, TurboTasksBackendJob::InitialSnapshot) { + FIRST_SNAPSHOT_WAIT } else { - far_future() + SNAPSHOT_INTERVAL }; - loop { - tokio::select! { - _ = &mut stop_listener => { - return; - }, - _ = &mut idle_start_listener => { - idle_time = Instant::now() + IDLE_TIMEOUT; - idle_start_listener = self.idle_start_event.listen() - }, - _ = &mut idle_end_listener => { - idle_time = until + IDLE_TIMEOUT; - idle_end_listener = self.idle_end_event.listen() - }, - _ = tokio::time::sleep_until(until) => { - break; - }, - _ = tokio::time::sleep_until(idle_time) => { - if turbo_tasks.is_idle() { + + let until = last_snapshot + time; + if until > Instant::now() { + let mut stop_listener = self.stopping_event.listen(); + if self.stopping.load(Ordering::Acquire) { + return; + } + let mut idle_start_listener = self.idle_start_event.listen(); + let mut idle_end_listener = self.idle_end_event.listen(); + let mut idle_time = if turbo_tasks.is_idle() { + Instant::now() + IDLE_TIMEOUT + } else { + far_future() + }; + loop { + tokio::select! { + _ = &mut stop_listener => { + return; + }, + _ = &mut idle_start_listener => { + idle_time = Instant::now() + IDLE_TIMEOUT; + idle_start_listener = self.idle_start_event.listen() + }, + _ = &mut idle_end_listener => { + idle_time = until + IDLE_TIMEOUT; + idle_end_listener = self.idle_end_event.listen() + }, + _ = tokio::time::sleep_until(until) => { break; - } - }, + }, + _ = tokio::time::sleep_until(idle_time) => { + if turbo_tasks.is_idle() { + break; + } + }, + } } } - } - let this = self.clone(); - let snapshot = this.snapshot(); - if let Some((snapshot_start, new_data)) = snapshot { - last_snapshot = snapshot_start; - if new_data { - continue; - } - let last_snapshot = last_snapshot.duration_since(self.start_time); - self.last_snapshot.store( - last_snapshot.as_millis().try_into().unwrap(), - Ordering::Relaxed, - ); + let this = self.clone(); + let snapshot = this.snapshot(); + if let Some((snapshot_start, new_data)) = snapshot { + last_snapshot = snapshot_start; + if new_data { + continue; + } + let last_snapshot = last_snapshot.duration_since(self.start_time); + self.last_snapshot.store( + last_snapshot.as_millis().try_into().unwrap(), + Ordering::Relaxed, + ); - turbo_tasks - .schedule_backend_background_job(BACKEND_JOB_FOLLOW_UP_SNAPSHOT, None); - return; + turbo_tasks.schedule_backend_background_job( + TurboTasksBackendJob::FollowUpSnapshot, + ); + return; + } } } - } else if id == BACKEND_JOB_PREFETCH_TASK || id == BACKEND_JOB_PREFETCH_CHUNK_TASK { - const DATA_EXPECTATION: &str = - "Expected data to be a FxHashMap for BACKEND_JOB_PREFETCH_TASK"; - let data = Box::::downcast::>( - data.expect(DATA_EXPECTATION), - ) - .expect(DATA_EXPECTATION); - - fn prefetch_task(ctx: &mut impl ExecuteContext<'_>, task: TaskId, with_data: bool) { - let category = if with_data { - TaskDataCategory::All + TurboTasksBackendJob::Prefetch { data, range } => { + let range = if let Some(range) = range { + range } else { - TaskDataCategory::Meta + if data.len() > 128 { + let chunk_size = good_chunk_size(data.len()); + let chunks = data.len().div_ceil(chunk_size); + for i in 0..chunks { + turbo_tasks.schedule_backend_foreground_job( + TurboTasksBackendJob::Prefetch { + data: data.clone(), + range: Some( + (i * chunk_size)..min(data.len(), (i + 1) * chunk_size), + ), + }, + ); + } + return; + } + 0..data.len() }; - // Prefetch the task - drop(ctx.task(task, category)); - } - if id == BACKEND_JOB_PREFETCH_TASK && data.len() > 128 { - let chunk_size = good_chunk_size(data.len()); - for chunk in into_chunks(*data, chunk_size) { - let data: Box> = Box::new(chunk.collect::>()); - turbo_tasks.schedule_backend_foreground_job( - BACKEND_JOB_PREFETCH_CHUNK_TASK, - Some(data), - ); - } - } else { let _span = info_span!("prefetching").entered(); let mut ctx = self.execute_context(turbo_tasks); - for (task, with_data) in data.into_iter() { - prefetch_task(&mut ctx, task, with_data); + for i in range { + let (&task, &with_data) = data.get_index(i).unwrap(); + let category = if with_data { + TaskDataCategory::All + } else { + TaskDataCategory::Meta + }; + // Prefetch the task + drop(ctx.task(task, category)); } } } @@ -2934,13 +2942,14 @@ impl Backend for TurboTasksBackend { ) } + type BackendJob = TurboTasksBackendJob; + fn run_backend_job<'a>( &'a self, - id: BackendJobId, - data: Option>, + job: Self::BackendJob, turbo_tasks: &'a dyn TurboTasksBackendApi, ) -> Pin + Send + 'a>> { - self.0.run_backend_job(id, data, turbo_tasks) + self.0.run_backend_job(job, turbo_tasks) } fn try_read_task_output( diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs index c9ca06ded5aa1..cbbf96973114a 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs @@ -11,17 +11,16 @@ mod update_output; use std::{ fmt::{Debug, Formatter}, mem::transmute, - sync::atomic::Ordering, + sync::{Arc, atomic::Ordering}, }; -use rustc_hash::FxHashMap; use serde::{Deserialize, Serialize}; -use turbo_tasks::{KeyValuePair, SessionId, TaskId, TurboTasksBackendApi}; +use turbo_tasks::{FxIndexMap, KeyValuePair, SessionId, TaskId, TurboTasksBackendApi}; use crate::{ backend::{ - BACKEND_JOB_PREFETCH_TASK, OperationGuard, TaskDataCategory, TransientTask, - TurboTasksBackend, TurboTasksBackendInner, + OperationGuard, TaskDataCategory, TransientTask, TurboTasksBackend, TurboTasksBackendInner, + TurboTasksBackendJob, storage::{SpecificTaskDataCategory, StorageWriteGuard, iter_many}, }, backing_storage::BackingStorage, @@ -265,10 +264,11 @@ where fn schedule_task(&self, mut task: impl TaskGuard + '_) { if let Some(tasks_to_prefetch) = task.prefetch() { - self.turbo_tasks.schedule_backend_foreground_job( - BACKEND_JOB_PREFETCH_TASK, - Some(Box::new(tasks_to_prefetch)), - ); + self.turbo_tasks + .schedule_backend_foreground_job(TurboTasksBackendJob::Prefetch { + data: Arc::new(tasks_to_prefetch), + range: None, + }); } self.turbo_tasks.schedule(task.id()); } @@ -336,7 +336,7 @@ pub trait TaskGuard: Debug { where F: for<'a> FnMut(CachedDataItemKey, CachedDataItemValueRef<'a>) -> bool + 'l; fn invalidate_serialization(&mut self); - fn prefetch(&mut self) -> Option>; + fn prefetch(&mut self) -> Option>; fn is_immutable(&self) -> bool; } @@ -526,15 +526,15 @@ impl TaskGuard for TaskGuardImpl<'_, B> { } } - fn prefetch(&mut self) -> Option> { + fn prefetch(&mut self) -> Option> { if !self.task.state().prefetched() { self.task.state_mut().set_prefetched(true); let map = iter_many!(self, OutputDependency { target } => (target, false)) .chain(iter_many!(self, CellDependency { target } => (target.task, true))) .chain(iter_many!(self, CollectiblesDependency { target } => (target.task, true))) - .collect::>(); + .collect::>(); if map.len() > 16 { - return Some(map.into_iter().collect()); + return Some(map); } } None diff --git a/turbopack/crates/turbo-tasks/src/backend.rs b/turbopack/crates/turbo-tasks/src/backend.rs index d1f59fdb69720..70cf5fc95a2a8 100644 --- a/turbopack/crates/turbo-tasks/src/backend.rs +++ b/turbopack/crates/turbo-tasks/src/backend.rs @@ -1,5 +1,4 @@ use std::{ - any::Any, borrow::Cow, error::Error, fmt::{self, Debug, Display}, @@ -17,7 +16,6 @@ use serde::{Deserialize, Serialize}; use tracing::Span; use turbo_rcstr::RcStr; -pub use crate::id::BackendJobId; use crate::{ RawVc, ReadCellOptions, ReadRef, SharedReference, TaskId, TaskIdSet, TraitRef, TraitTypeId, TurboTasksPanic, ValueTypeId, VcRead, VcValueTrait, VcValueType, @@ -583,10 +581,11 @@ pub trait Backend: Sync + Send { turbo_tasks: &dyn TurboTasksBackendApi, ) -> bool; + type BackendJob: Send + 'static; + fn run_backend_job<'a>( &'a self, - id: BackendJobId, - data: Option>, + job: Self::BackendJob, turbo_tasks: &'a dyn TurboTasksBackendApi, ) -> Pin + Send + 'a>>; diff --git a/turbopack/crates/turbo-tasks/src/id.rs b/turbopack/crates/turbo-tasks/src/id.rs index a1fa5379d8dab..58969b228dd37 100644 --- a/turbopack/crates/turbo-tasks/src/id.rs +++ b/turbopack/crates/turbo-tasks/src/id.rs @@ -115,7 +115,6 @@ macro_rules! define_id { define_id!(TaskId: u32, derive(Serialize, Deserialize), serde(transparent)); define_id!(ValueTypeId: u32); define_id!(TraitTypeId: u32); -define_id!(BackendJobId: u32); define_id!(SessionId: u32, derive(Debug, Serialize, Deserialize), serde(transparent)); define_id!( LocalTaskId: u32, diff --git a/turbopack/crates/turbo-tasks/src/id_factory.rs b/turbopack/crates/turbo-tasks/src/id_factory.rs index 582b437e05898..0651538b18550 100644 --- a/turbopack/crates/turbo-tasks/src/id_factory.rs +++ b/turbopack/crates/turbo-tasks/src/id_factory.rs @@ -121,8 +121,7 @@ where } } -/// An [`IdFactory`], but extended with a free list to allow for id reuse for ids such as -/// [`BackendJobId`][crate::backend::BackendJobId]. +/// An [`IdFactory`], but extended with a free list to allow for id reuse. /// /// If silent untracked re-use of ids is okay, consider using the cheaper /// [`IdFactory::wrapping_get`] method. diff --git a/turbopack/crates/turbo-tasks/src/manager.rs b/turbopack/crates/turbo-tasks/src/manager.rs index dfd0d26c91b12..fcc67f5e8595c 100644 --- a/turbopack/crates/turbo-tasks/src/manager.rs +++ b/turbopack/crates/turbo-tasks/src/manager.rs @@ -30,7 +30,7 @@ use crate::{ }, capture_future::CaptureFuture, event::{Event, EventListener}, - id::{BackendJobId, ExecutionId, LocalTaskId, TRANSIENT_TASK_BIT, TraitTypeId}, + id::{ExecutionId, LocalTaskId, TRANSIENT_TASK_BIT, TraitTypeId}, id_factory::IdFactoryWithReuse, macro_helpers::NativeFunction, magic_any::MagicAny, @@ -250,8 +250,8 @@ pub trait TurboTasksBackendApi: TurboTasksCallApi + Sync + unsafe fn reuse_transient_task_id(&self, id: Unused); fn schedule(&self, task: TaskId); - fn schedule_backend_background_job(&self, id: BackendJobId, data: Option>); - fn schedule_backend_foreground_job(&self, id: BackendJobId, data: Option>); + fn schedule_backend_background_job(&self, job: B::BackendJob); + fn schedule_backend_foreground_job(&self, job: B::BackendJob); fn try_foreground_done(&self) -> Result<(), EventListener>; fn wait_foreground_done_excluding_own<'a>( @@ -1478,16 +1478,16 @@ impl TurboTasksBackendApi for TurboTasks { } #[track_caller] - fn schedule_backend_background_job(&self, id: BackendJobId, data: Option>) { + fn schedule_backend_background_job(&self, job: B::BackendJob) { self.schedule_background_job(move |this| async move { - this.backend.run_backend_job(id, data, &*this).await; + this.backend.run_backend_job(job, &*this).await; }) } #[track_caller] - fn schedule_backend_foreground_job(&self, id: BackendJobId, data: Option>) { + fn schedule_backend_foreground_job(&self, job: B::BackendJob) { self.schedule_foreground_job(move |this| async move { - this.backend.run_backend_job(id, data, &*this).await; + this.backend.run_backend_job(job, &*this).await; }) }