@@ -3,12 +3,13 @@ mod operation;
3
3
mod storage;
4
4
5
5
use std:: {
6
- any:: Any ,
7
6
borrow:: Cow ,
7
+ cmp:: min,
8
8
fmt:: { self , Write } ,
9
9
future:: Future ,
10
10
hash:: BuildHasherDefault ,
11
11
mem:: take,
12
+ ops:: Range ,
12
13
pin:: Pin ,
13
14
sync:: {
14
15
Arc ,
@@ -26,11 +27,11 @@ use smallvec::{SmallVec, smallvec};
26
27
use tokio:: time:: { Duration , Instant } ;
27
28
use tracing:: { field:: Empty , info_span} ;
28
29
use turbo_tasks:: {
29
- CellId , FxDashMap , KeyValuePair , RawVc , ReadCellOptions , ReadConsistency , SessionId ,
30
- TRANSIENT_TASK_BIT , TaskExecutionReason , TaskId , TraitTypeId , TurboTasksBackendApi ,
30
+ CellId , FxDashMap , FxIndexMap , KeyValuePair , RawVc , ReadCellOptions , ReadConsistency ,
31
+ SessionId , TRANSIENT_TASK_BIT , TaskExecutionReason , TaskId , TraitTypeId , TurboTasksBackendApi ,
31
32
ValueTypeId ,
32
33
backend:: {
33
- Backend , BackendJobId , CachedTaskType , CellContent , TaskExecutionSpec , TransientTaskRoot ,
34
+ Backend , CachedTaskType , CellContent , TaskExecutionSpec , TransientTaskRoot ,
34
35
TransientTaskType , TurboTasksExecutionError , TypedCellContent ,
35
36
} ,
36
37
event:: { Event , EventListener } ,
@@ -39,7 +40,7 @@ use turbo_tasks::{
39
40
task_statistics:: TaskStatisticsApi ,
40
41
trace:: TraceRawVcs ,
41
42
turbo_tasks,
42
- util:: { IdFactoryWithReuse , good_chunk_size, into_chunks } ,
43
+ util:: { IdFactoryWithReuse , good_chunk_size} ,
43
44
} ;
44
45
45
46
pub use self :: { operation:: AnyOperation , storage:: TaskDataCategory } ;
@@ -70,11 +71,6 @@ use crate::{
70
71
} ,
71
72
} ;
72
73
73
- const BACKEND_JOB_INITIAL_SNAPSHOT : BackendJobId = unsafe { BackendJobId :: new_unchecked ( 1 ) } ;
74
- const BACKEND_JOB_FOLLOW_UP_SNAPSHOT : BackendJobId = unsafe { BackendJobId :: new_unchecked ( 2 ) } ;
75
- const BACKEND_JOB_PREFETCH_TASK : BackendJobId = unsafe { BackendJobId :: new_unchecked ( 3 ) } ;
76
- const BACKEND_JOB_PREFETCH_CHUNK_TASK : BackendJobId = unsafe { BackendJobId :: new_unchecked ( 4 ) } ;
77
-
78
74
const SNAPSHOT_REQUESTED_BIT : usize = 1 << ( usize:: BITS - 1 ) ;
79
75
80
76
struct SnapshotRequest {
@@ -160,6 +156,15 @@ impl Default for BackendOptions {
160
156
}
161
157
}
162
158
159
+ pub enum TurboTasksBackendJob {
160
+ InitialSnapshot ,
161
+ FollowUpSnapshot ,
162
+ Prefetch {
163
+ data : Arc < FxIndexMap < TaskId , bool > > ,
164
+ range : Option < Range < usize > > ,
165
+ } ,
166
+ }
167
+
163
168
pub struct TurboTasksBackend < B : BackingStorage > ( Arc < TurboTasksBackendInner < B > > ) ;
164
169
165
170
type TaskCacheLog = Sharded < ChunkedVec < ( Arc < CachedTaskType > , TaskId ) > > ;
@@ -1210,7 +1215,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
1210
1215
1211
1216
if self . should_persist ( ) {
1212
1217
// Schedule the snapshot job
1213
- turbo_tasks. schedule_backend_background_job ( BACKEND_JOB_INITIAL_SNAPSHOT , None ) ;
1218
+ turbo_tasks. schedule_backend_background_job ( TurboTasksBackendJob :: InitialSnapshot ) ;
1214
1219
}
1215
1220
}
1216
1221
@@ -2135,115 +2140,118 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
2135
2140
2136
2141
fn run_backend_job < ' a > (
2137
2142
self : & ' a Arc < Self > ,
2138
- id : BackendJobId ,
2139
- data : Option < Box < dyn Any + Send > > ,
2143
+ job : TurboTasksBackendJob ,
2140
2144
turbo_tasks : & ' a dyn TurboTasksBackendApi < TurboTasksBackend < B > > ,
2141
2145
) -> Pin < Box < dyn Future < Output = ( ) > + Send + ' a > > {
2142
2146
Box :: pin ( async move {
2143
- if id == BACKEND_JOB_INITIAL_SNAPSHOT || id == BACKEND_JOB_FOLLOW_UP_SNAPSHOT {
2144
- debug_assert ! ( self . should_persist( ) ) ;
2145
-
2146
- let last_snapshot = self . last_snapshot . load ( Ordering :: Relaxed ) ;
2147
- let mut last_snapshot = self . start_time + Duration :: from_millis ( last_snapshot) ;
2148
- loop {
2149
- const FIRST_SNAPSHOT_WAIT : Duration = Duration :: from_secs ( 300 ) ;
2150
- const SNAPSHOT_INTERVAL : Duration = Duration :: from_secs ( 120 ) ;
2151
- const IDLE_TIMEOUT : Duration = Duration :: from_secs ( 2 ) ;
2152
-
2153
- let time = if id == BACKEND_JOB_INITIAL_SNAPSHOT {
2154
- FIRST_SNAPSHOT_WAIT
2155
- } else {
2156
- SNAPSHOT_INTERVAL
2157
- } ;
2158
-
2159
- let until = last_snapshot + time;
2160
- if until > Instant :: now ( ) {
2161
- let mut stop_listener = self . stopping_event . listen ( ) ;
2162
- if self . stopping . load ( Ordering :: Acquire ) {
2163
- return ;
2164
- }
2165
- let mut idle_start_listener = self . idle_start_event . listen ( ) ;
2166
- let mut idle_end_listener = self . idle_end_event . listen ( ) ;
2167
- let mut idle_time = if turbo_tasks. is_idle ( ) {
2168
- Instant :: now ( ) + IDLE_TIMEOUT
2147
+ match job {
2148
+ TurboTasksBackendJob :: InitialSnapshot | TurboTasksBackendJob :: FollowUpSnapshot => {
2149
+ debug_assert ! ( self . should_persist( ) ) ;
2150
+
2151
+ let last_snapshot = self . last_snapshot . load ( Ordering :: Relaxed ) ;
2152
+ let mut last_snapshot = self . start_time + Duration :: from_millis ( last_snapshot) ;
2153
+ loop {
2154
+ const FIRST_SNAPSHOT_WAIT : Duration = Duration :: from_secs ( 300 ) ;
2155
+ const SNAPSHOT_INTERVAL : Duration = Duration :: from_secs ( 120 ) ;
2156
+ const IDLE_TIMEOUT : Duration = Duration :: from_secs ( 2 ) ;
2157
+
2158
+ let time = if matches ! ( job, TurboTasksBackendJob :: InitialSnapshot ) {
2159
+ FIRST_SNAPSHOT_WAIT
2169
2160
} else {
2170
- far_future ( )
2161
+ SNAPSHOT_INTERVAL
2171
2162
} ;
2172
- loop {
2173
- tokio:: select! {
2174
- _ = & mut stop_listener => {
2175
- return ;
2176
- } ,
2177
- _ = & mut idle_start_listener => {
2178
- idle_time = Instant :: now( ) + IDLE_TIMEOUT ;
2179
- idle_start_listener = self . idle_start_event. listen( )
2180
- } ,
2181
- _ = & mut idle_end_listener => {
2182
- idle_time = until + IDLE_TIMEOUT ;
2183
- idle_end_listener = self . idle_end_event. listen( )
2184
- } ,
2185
- _ = tokio:: time:: sleep_until( until) => {
2186
- break ;
2187
- } ,
2188
- _ = tokio:: time:: sleep_until( idle_time) => {
2189
- if turbo_tasks. is_idle( ) {
2163
+
2164
+ let until = last_snapshot + time;
2165
+ if until > Instant :: now ( ) {
2166
+ let mut stop_listener = self . stopping_event . listen ( ) ;
2167
+ if self . stopping . load ( Ordering :: Acquire ) {
2168
+ return ;
2169
+ }
2170
+ let mut idle_start_listener = self . idle_start_event . listen ( ) ;
2171
+ let mut idle_end_listener = self . idle_end_event . listen ( ) ;
2172
+ let mut idle_time = if turbo_tasks. is_idle ( ) {
2173
+ Instant :: now ( ) + IDLE_TIMEOUT
2174
+ } else {
2175
+ far_future ( )
2176
+ } ;
2177
+ loop {
2178
+ tokio:: select! {
2179
+ _ = & mut stop_listener => {
2180
+ return ;
2181
+ } ,
2182
+ _ = & mut idle_start_listener => {
2183
+ idle_time = Instant :: now( ) + IDLE_TIMEOUT ;
2184
+ idle_start_listener = self . idle_start_event. listen( )
2185
+ } ,
2186
+ _ = & mut idle_end_listener => {
2187
+ idle_time = until + IDLE_TIMEOUT ;
2188
+ idle_end_listener = self . idle_end_event. listen( )
2189
+ } ,
2190
+ _ = tokio:: time:: sleep_until( until) => {
2190
2191
break ;
2191
- }
2192
- } ,
2192
+ } ,
2193
+ _ = tokio:: time:: sleep_until( idle_time) => {
2194
+ if turbo_tasks. is_idle( ) {
2195
+ break ;
2196
+ }
2197
+ } ,
2198
+ }
2193
2199
}
2194
2200
}
2195
- }
2196
2201
2197
- let this = self . clone ( ) ;
2198
- let snapshot = this. snapshot ( ) ;
2199
- if let Some ( ( snapshot_start, new_data) ) = snapshot {
2200
- last_snapshot = snapshot_start;
2201
- if new_data {
2202
- continue ;
2203
- }
2204
- let last_snapshot = last_snapshot. duration_since ( self . start_time ) ;
2205
- self . last_snapshot . store (
2206
- last_snapshot. as_millis ( ) . try_into ( ) . unwrap ( ) ,
2207
- Ordering :: Relaxed ,
2208
- ) ;
2202
+ let this = self . clone ( ) ;
2203
+ let snapshot = this. snapshot ( ) ;
2204
+ if let Some ( ( snapshot_start, new_data) ) = snapshot {
2205
+ last_snapshot = snapshot_start;
2206
+ if new_data {
2207
+ continue ;
2208
+ }
2209
+ let last_snapshot = last_snapshot. duration_since ( self . start_time ) ;
2210
+ self . last_snapshot . store (
2211
+ last_snapshot. as_millis ( ) . try_into ( ) . unwrap ( ) ,
2212
+ Ordering :: Relaxed ,
2213
+ ) ;
2209
2214
2210
- turbo_tasks
2211
- . schedule_backend_background_job ( BACKEND_JOB_FOLLOW_UP_SNAPSHOT , None ) ;
2212
- return ;
2215
+ turbo_tasks. schedule_backend_background_job (
2216
+ TurboTasksBackendJob :: FollowUpSnapshot ,
2217
+ ) ;
2218
+ return ;
2219
+ }
2213
2220
}
2214
2221
}
2215
- } else if id == BACKEND_JOB_PREFETCH_TASK || id == BACKEND_JOB_PREFETCH_CHUNK_TASK {
2216
- const DATA_EXPECTATION : & str =
2217
- "Expected data to be a FxHashMap<TaskId, bool> for BACKEND_JOB_PREFETCH_TASK" ;
2218
- let data = Box :: < dyn Any + Send > :: downcast :: < Vec < ( TaskId , bool ) > > (
2219
- data. expect ( DATA_EXPECTATION ) ,
2220
- )
2221
- . expect ( DATA_EXPECTATION ) ;
2222
-
2223
- fn prefetch_task ( ctx : & mut impl ExecuteContext < ' _ > , task : TaskId , with_data : bool ) {
2224
- let category = if with_data {
2225
- TaskDataCategory :: All
2222
+ TurboTasksBackendJob :: Prefetch { data, range } => {
2223
+ let range = if let Some ( range) = range {
2224
+ range
2226
2225
} else {
2227
- TaskDataCategory :: Meta
2226
+ if data. len ( ) > 128 {
2227
+ let chunk_size = good_chunk_size ( data. len ( ) ) ;
2228
+ let chunks = data. len ( ) . div_ceil ( chunk_size) ;
2229
+ for i in 0 ..chunks {
2230
+ turbo_tasks. schedule_backend_foreground_job (
2231
+ TurboTasksBackendJob :: Prefetch {
2232
+ data : data. clone ( ) ,
2233
+ range : Some (
2234
+ ( i * chunk_size) ..min ( data. len ( ) , ( i + 1 ) * chunk_size) ,
2235
+ ) ,
2236
+ } ,
2237
+ ) ;
2238
+ }
2239
+ return ;
2240
+ }
2241
+ 0 ..data. len ( )
2228
2242
} ;
2229
- // Prefetch the task
2230
- drop ( ctx. task ( task, category) ) ;
2231
- }
2232
2243
2233
- if id == BACKEND_JOB_PREFETCH_TASK && data. len ( ) > 128 {
2234
- let chunk_size = good_chunk_size ( data. len ( ) ) ;
2235
- for chunk in into_chunks ( * data, chunk_size) {
2236
- let data: Box < Vec < ( TaskId , bool ) > > = Box :: new ( chunk. collect :: < Vec < _ > > ( ) ) ;
2237
- turbo_tasks. schedule_backend_foreground_job (
2238
- BACKEND_JOB_PREFETCH_CHUNK_TASK ,
2239
- Some ( data) ,
2240
- ) ;
2241
- }
2242
- } else {
2243
2244
let _span = info_span ! ( "prefetching" ) . entered ( ) ;
2244
2245
let mut ctx = self . execute_context ( turbo_tasks) ;
2245
- for ( task, with_data) in data. into_iter ( ) {
2246
- prefetch_task ( & mut ctx, task, with_data) ;
2246
+ for i in range {
2247
+ let ( & task, & with_data) = data. get_index ( i) . unwrap ( ) ;
2248
+ let category = if with_data {
2249
+ TaskDataCategory :: All
2250
+ } else {
2251
+ TaskDataCategory :: Meta
2252
+ } ;
2253
+ // Prefetch the task
2254
+ drop ( ctx. task ( task, category) ) ;
2247
2255
}
2248
2256
}
2249
2257
}
@@ -2934,13 +2942,14 @@ impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
2934
2942
)
2935
2943
}
2936
2944
2945
+ type BackendJob = TurboTasksBackendJob ;
2946
+
2937
2947
fn run_backend_job < ' a > (
2938
2948
& ' a self ,
2939
- id : BackendJobId ,
2940
- data : Option < Box < dyn Any + Send > > ,
2949
+ job : Self :: BackendJob ,
2941
2950
turbo_tasks : & ' a dyn TurboTasksBackendApi < Self > ,
2942
2951
) -> Pin < Box < dyn Future < Output = ( ) > + Send + ' a > > {
2943
- self . 0 . run_backend_job ( id , data , turbo_tasks)
2952
+ self . 0 . run_backend_job ( job , turbo_tasks)
2944
2953
}
2945
2954
2946
2955
fn try_read_task_output (
0 commit comments