Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
221 changes: 115 additions & 106 deletions turbopack/crates/turbo-tasks-backend/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ mod operation;
mod storage;

use std::{
any::Any,
borrow::Cow,
cmp::min,
fmt::{self, Write},
future::Future,
hash::BuildHasherDefault,
mem::take,
ops::Range,
pin::Pin,
sync::{
Arc,
Expand All @@ -26,11 +27,11 @@ use smallvec::{SmallVec, smallvec};
use tokio::time::{Duration, Instant};
use tracing::{field::Empty, info_span};
use turbo_tasks::{
CellId, FxDashMap, KeyValuePair, RawVc, ReadCellOptions, ReadConsistency, SessionId,
TRANSIENT_TASK_BIT, TaskExecutionReason, TaskId, TraitTypeId, TurboTasksBackendApi,
CellId, FxDashMap, FxIndexMap, KeyValuePair, RawVc, ReadCellOptions, ReadConsistency,
SessionId, TRANSIENT_TASK_BIT, TaskExecutionReason, TaskId, TraitTypeId, TurboTasksBackendApi,
ValueTypeId,
backend::{
Backend, BackendJobId, CachedTaskType, CellContent, TaskExecutionSpec, TransientTaskRoot,
Backend, CachedTaskType, CellContent, TaskExecutionSpec, TransientTaskRoot,
TransientTaskType, TurboTasksExecutionError, TypedCellContent,
},
event::{Event, EventListener},
Expand All @@ -39,7 +40,7 @@ use turbo_tasks::{
task_statistics::TaskStatisticsApi,
trace::TraceRawVcs,
turbo_tasks,
util::{IdFactoryWithReuse, good_chunk_size, into_chunks},
util::{IdFactoryWithReuse, good_chunk_size},
};

pub use self::{operation::AnyOperation, storage::TaskDataCategory};
Expand Down Expand Up @@ -70,11 +71,6 @@ use crate::{
},
};

const BACKEND_JOB_INITIAL_SNAPSHOT: BackendJobId = unsafe { BackendJobId::new_unchecked(1) };
const BACKEND_JOB_FOLLOW_UP_SNAPSHOT: BackendJobId = unsafe { BackendJobId::new_unchecked(2) };
const BACKEND_JOB_PREFETCH_TASK: BackendJobId = unsafe { BackendJobId::new_unchecked(3) };
const BACKEND_JOB_PREFETCH_CHUNK_TASK: BackendJobId = unsafe { BackendJobId::new_unchecked(4) };

const SNAPSHOT_REQUESTED_BIT: usize = 1 << (usize::BITS - 1);

struct SnapshotRequest {
Expand Down Expand Up @@ -160,6 +156,15 @@ impl Default for BackendOptions {
}
}

pub enum TurboTasksBackendJob {
InitialSnapshot,
FollowUpSnapshot,
Prefetch {
data: Arc<FxIndexMap<TaskId, bool>>,
range: Option<Range<usize>>,
},
}

pub struct TurboTasksBackend<B: BackingStorage>(Arc<TurboTasksBackendInner<B>>);

type TaskCacheLog = Sharded<ChunkedVec<(Arc<CachedTaskType>, TaskId)>>;
Expand Down Expand Up @@ -1210,7 +1215,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {

if self.should_persist() {
// Schedule the snapshot job
turbo_tasks.schedule_backend_background_job(BACKEND_JOB_INITIAL_SNAPSHOT, None);
turbo_tasks.schedule_backend_background_job(TurboTasksBackendJob::InitialSnapshot);
}
}

Expand Down Expand Up @@ -2135,115 +2140,118 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {

fn run_backend_job<'a>(
self: &'a Arc<Self>,
id: BackendJobId,
data: Option<Box<dyn Any + Send>>,
job: TurboTasksBackendJob,
turbo_tasks: &'a dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
Box::pin(async move {
if id == BACKEND_JOB_INITIAL_SNAPSHOT || id == BACKEND_JOB_FOLLOW_UP_SNAPSHOT {
debug_assert!(self.should_persist());

let last_snapshot = self.last_snapshot.load(Ordering::Relaxed);
let mut last_snapshot = self.start_time + Duration::from_millis(last_snapshot);
loop {
const FIRST_SNAPSHOT_WAIT: Duration = Duration::from_secs(300);
const SNAPSHOT_INTERVAL: Duration = Duration::from_secs(120);
const IDLE_TIMEOUT: Duration = Duration::from_secs(2);

let time = if id == BACKEND_JOB_INITIAL_SNAPSHOT {
FIRST_SNAPSHOT_WAIT
} else {
SNAPSHOT_INTERVAL
};

let until = last_snapshot + time;
if until > Instant::now() {
let mut stop_listener = self.stopping_event.listen();
if self.stopping.load(Ordering::Acquire) {
return;
}
let mut idle_start_listener = self.idle_start_event.listen();
let mut idle_end_listener = self.idle_end_event.listen();
let mut idle_time = if turbo_tasks.is_idle() {
Instant::now() + IDLE_TIMEOUT
match job {
TurboTasksBackendJob::InitialSnapshot | TurboTasksBackendJob::FollowUpSnapshot => {
debug_assert!(self.should_persist());

let last_snapshot = self.last_snapshot.load(Ordering::Relaxed);
let mut last_snapshot = self.start_time + Duration::from_millis(last_snapshot);
loop {
const FIRST_SNAPSHOT_WAIT: Duration = Duration::from_secs(300);
const SNAPSHOT_INTERVAL: Duration = Duration::from_secs(120);
const IDLE_TIMEOUT: Duration = Duration::from_secs(2);

let time = if matches!(job, TurboTasksBackendJob::InitialSnapshot) {
FIRST_SNAPSHOT_WAIT
} else {
far_future()
SNAPSHOT_INTERVAL
};
loop {
tokio::select! {
_ = &mut stop_listener => {
return;
},
_ = &mut idle_start_listener => {
idle_time = Instant::now() + IDLE_TIMEOUT;
idle_start_listener = self.idle_start_event.listen()
},
_ = &mut idle_end_listener => {
idle_time = until + IDLE_TIMEOUT;
idle_end_listener = self.idle_end_event.listen()
},
_ = tokio::time::sleep_until(until) => {
break;
},
_ = tokio::time::sleep_until(idle_time) => {
if turbo_tasks.is_idle() {

let until = last_snapshot + time;
if until > Instant::now() {
let mut stop_listener = self.stopping_event.listen();
if self.stopping.load(Ordering::Acquire) {
return;
}
let mut idle_start_listener = self.idle_start_event.listen();
let mut idle_end_listener = self.idle_end_event.listen();
let mut idle_time = if turbo_tasks.is_idle() {
Instant::now() + IDLE_TIMEOUT
} else {
far_future()
};
loop {
tokio::select! {
_ = &mut stop_listener => {
return;
},
_ = &mut idle_start_listener => {
idle_time = Instant::now() + IDLE_TIMEOUT;
idle_start_listener = self.idle_start_event.listen()
},
_ = &mut idle_end_listener => {
idle_time = until + IDLE_TIMEOUT;
idle_end_listener = self.idle_end_event.listen()
},
_ = tokio::time::sleep_until(until) => {
break;
}
},
},
_ = tokio::time::sleep_until(idle_time) => {
if turbo_tasks.is_idle() {
break;
}
},
}
}
}
}

let this = self.clone();
let snapshot = this.snapshot();
if let Some((snapshot_start, new_data)) = snapshot {
last_snapshot = snapshot_start;
if new_data {
continue;
}
let last_snapshot = last_snapshot.duration_since(self.start_time);
self.last_snapshot.store(
last_snapshot.as_millis().try_into().unwrap(),
Ordering::Relaxed,
);
let this = self.clone();
let snapshot = this.snapshot();
if let Some((snapshot_start, new_data)) = snapshot {
last_snapshot = snapshot_start;
if new_data {
continue;
}
let last_snapshot = last_snapshot.duration_since(self.start_time);
self.last_snapshot.store(
last_snapshot.as_millis().try_into().unwrap(),
Ordering::Relaxed,
);

turbo_tasks
.schedule_backend_background_job(BACKEND_JOB_FOLLOW_UP_SNAPSHOT, None);
return;
turbo_tasks.schedule_backend_background_job(
TurboTasksBackendJob::FollowUpSnapshot,
);
return;
}
}
}
} else if id == BACKEND_JOB_PREFETCH_TASK || id == BACKEND_JOB_PREFETCH_CHUNK_TASK {
const DATA_EXPECTATION: &str =
"Expected data to be a FxHashMap<TaskId, bool> for BACKEND_JOB_PREFETCH_TASK";
let data = Box::<dyn Any + Send>::downcast::<Vec<(TaskId, bool)>>(
data.expect(DATA_EXPECTATION),
)
.expect(DATA_EXPECTATION);

fn prefetch_task(ctx: &mut impl ExecuteContext<'_>, task: TaskId, with_data: bool) {
let category = if with_data {
TaskDataCategory::All
TurboTasksBackendJob::Prefetch { data, range } => {
let range = if let Some(range) = range {
range
} else {
TaskDataCategory::Meta
if data.len() > 128 {
let chunk_size = good_chunk_size(data.len());
let chunks = data.len().div_ceil(chunk_size);
for i in 0..chunks {
turbo_tasks.schedule_backend_foreground_job(
TurboTasksBackendJob::Prefetch {
data: data.clone(),
range: Some(
(i * chunk_size)..min(data.len(), (i + 1) * chunk_size),
),
},
);
}
return;
}
0..data.len()
};
// Prefetch the task
drop(ctx.task(task, category));
}

if id == BACKEND_JOB_PREFETCH_TASK && data.len() > 128 {
let chunk_size = good_chunk_size(data.len());
for chunk in into_chunks(*data, chunk_size) {
let data: Box<Vec<(TaskId, bool)>> = Box::new(chunk.collect::<Vec<_>>());
turbo_tasks.schedule_backend_foreground_job(
BACKEND_JOB_PREFETCH_CHUNK_TASK,
Some(data),
);
}
} else {
let _span = info_span!("prefetching").entered();
let mut ctx = self.execute_context(turbo_tasks);
for (task, with_data) in data.into_iter() {
prefetch_task(&mut ctx, task, with_data);
for i in range {
let (&task, &with_data) = data.get_index(i).unwrap();
let category = if with_data {
TaskDataCategory::All
} else {
TaskDataCategory::Meta
};
// Prefetch the task
drop(ctx.task(task, category));
}
}
}
Expand Down Expand Up @@ -2934,13 +2942,14 @@ impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
)
}

type BackendJob = TurboTasksBackendJob;

fn run_backend_job<'a>(
&'a self,
id: BackendJobId,
data: Option<Box<dyn Any + Send>>,
job: Self::BackendJob,
turbo_tasks: &'a dyn TurboTasksBackendApi<Self>,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
self.0.run_backend_job(id, data, turbo_tasks)
self.0.run_backend_job(job, turbo_tasks)
}

fn try_read_task_output(
Expand Down
26 changes: 13 additions & 13 deletions turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,16 @@ mod update_output;
use std::{
fmt::{Debug, Formatter},
mem::transmute,
sync::atomic::Ordering,
sync::{Arc, atomic::Ordering},
};

use rustc_hash::FxHashMap;
use serde::{Deserialize, Serialize};
use turbo_tasks::{KeyValuePair, SessionId, TaskId, TurboTasksBackendApi};
use turbo_tasks::{FxIndexMap, KeyValuePair, SessionId, TaskId, TurboTasksBackendApi};

use crate::{
backend::{
BACKEND_JOB_PREFETCH_TASK, OperationGuard, TaskDataCategory, TransientTask,
TurboTasksBackend, TurboTasksBackendInner,
OperationGuard, TaskDataCategory, TransientTask, TurboTasksBackend, TurboTasksBackendInner,
TurboTasksBackendJob,
storage::{SpecificTaskDataCategory, StorageWriteGuard, iter_many},
},
backing_storage::BackingStorage,
Expand Down Expand Up @@ -265,10 +264,11 @@ where

fn schedule_task(&self, mut task: impl TaskGuard + '_) {
if let Some(tasks_to_prefetch) = task.prefetch() {
self.turbo_tasks.schedule_backend_foreground_job(
BACKEND_JOB_PREFETCH_TASK,
Some(Box::new(tasks_to_prefetch)),
);
self.turbo_tasks
.schedule_backend_foreground_job(TurboTasksBackendJob::Prefetch {
data: Arc::new(tasks_to_prefetch),
range: None,
});
}
self.turbo_tasks.schedule(task.id());
}
Expand Down Expand Up @@ -336,7 +336,7 @@ pub trait TaskGuard: Debug {
where
F: for<'a> FnMut(CachedDataItemKey, CachedDataItemValueRef<'a>) -> bool + 'l;
fn invalidate_serialization(&mut self);
fn prefetch(&mut self) -> Option<Vec<(TaskId, bool)>>;
fn prefetch(&mut self) -> Option<FxIndexMap<TaskId, bool>>;
fn is_immutable(&self) -> bool;
}

Expand Down Expand Up @@ -526,15 +526,15 @@ impl<B: BackingStorage> TaskGuard for TaskGuardImpl<'_, B> {
}
}

fn prefetch(&mut self) -> Option<Vec<(TaskId, bool)>> {
fn prefetch(&mut self) -> Option<FxIndexMap<TaskId, bool>> {
if !self.task.state().prefetched() {
self.task.state_mut().set_prefetched(true);
let map = iter_many!(self, OutputDependency { target } => (target, false))
.chain(iter_many!(self, CellDependency { target } => (target.task, true)))
.chain(iter_many!(self, CollectiblesDependency { target } => (target.task, true)))
.collect::<FxHashMap<_, _>>();
.collect::<FxIndexMap<_, _>>();
if map.len() > 16 {
return Some(map.into_iter().collect());
return Some(map);
}
}
None
Expand Down
7 changes: 3 additions & 4 deletions turbopack/crates/turbo-tasks/src/backend.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::{
any::Any,
borrow::Cow,
error::Error,
fmt::{self, Debug, Display},
Expand All @@ -17,7 +16,6 @@ use serde::{Deserialize, Serialize};
use tracing::Span;
use turbo_rcstr::RcStr;

pub use crate::id::BackendJobId;
use crate::{
RawVc, ReadCellOptions, ReadRef, SharedReference, TaskId, TaskIdSet, TraitRef, TraitTypeId,
TurboTasksPanic, ValueTypeId, VcRead, VcValueTrait, VcValueType,
Expand Down Expand Up @@ -583,10 +581,11 @@ pub trait Backend: Sync + Send {
turbo_tasks: &dyn TurboTasksBackendApi<Self>,
) -> bool;

type BackendJob: Send + 'static;

fn run_backend_job<'a>(
&'a self,
id: BackendJobId,
data: Option<Box<dyn Any + Send>>,
job: Self::BackendJob,
turbo_tasks: &'a dyn TurboTasksBackendApi<Self>,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>>;

Expand Down
Loading
Loading