@@ -350,7 +350,10 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
350
350
return Ok ( KeepChain ) ;
351
351
}
352
352
BatchState :: Poisoned => unreachable ! ( "Poisoned batch" ) ,
353
- BatchState :: Processing ( _) | BatchState :: AwaitingDownload | BatchState :: Failed => {
353
+ // Batches can be in `AwaitingDownload` state if there weren't good data column subnet
354
+ // peers to send the request to.
355
+ BatchState :: AwaitingDownload => return Ok ( KeepChain ) ,
356
+ BatchState :: Processing ( _) | BatchState :: Failed => {
354
357
// these are all inconsistent states:
355
358
// - Processing -> `self.current_processing_batch` is None
356
359
// - Failed -> non recoverable batch. For an optimistic batch, it should
@@ -384,7 +387,10 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
384
387
// Batch is not ready, nothing to process
385
388
}
386
389
BatchState :: Poisoned => unreachable ! ( "Poisoned batch" ) ,
387
- BatchState :: Failed | BatchState :: AwaitingDownload | BatchState :: Processing ( _) => {
390
+ // Batches can be in `AwaitingDownload` state if there weren't good data column subnet
391
+ // peers to send the request to.
392
+ BatchState :: AwaitingDownload => return Ok ( KeepChain ) ,
393
+ BatchState :: Failed | BatchState :: Processing ( _) => {
388
394
// these are all inconsistent states:
389
395
// - Failed -> non recoverable batch. Chain should have been removed
390
396
// - AwaitingDownload -> A recoverable failed batch should have been
@@ -582,8 +588,8 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
582
588
BatchProcessResult :: NonFaultyFailure => {
583
589
batch. processing_completed ( BatchProcessingResult :: NonFaultyFailure ) ?;
584
590
585
- // Simply re-download the batch .
586
- self . send_batch ( network, batch_id )
591
+ // Simply re-download all batches in `AwaitingDownload` state .
592
+ self . attempt_send_awaiting_download_batches ( network, "non-faulty-failure" )
587
593
}
588
594
}
589
595
}
@@ -717,6 +723,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
717
723
previous_start = %old_start,
718
724
new_start = %self . start_epoch,
719
725
processing_target = %self . processing_target,
726
+ id=%self . id,
720
727
"Chain advanced"
721
728
) ;
722
729
}
@@ -753,7 +760,6 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
753
760
}
754
761
// this is our robust `processing_target`. All previous batches must be awaiting
755
762
// validation
756
- let mut redownload_queue = Vec :: new ( ) ;
757
763
758
764
for ( id, batch) in self . batches . range_mut ( ..batch_id) {
759
765
if let BatchOperationOutcome :: Failed { blacklist } = batch. validation_failed ( ) ? {
@@ -763,18 +769,14 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
763
769
failing_batch : * id,
764
770
} ) ;
765
771
}
766
- redownload_queue. push ( * id) ;
767
772
}
768
773
769
774
// no batch maxed out it process attempts, so now the chain's volatile progress must be
770
775
// reset
771
776
self . processing_target = self . start_epoch ;
772
777
773
- for id in redownload_queue {
774
- self . send_batch ( network, id) ?;
775
- }
776
- // finally, re-request the failed batch.
777
- self . send_batch ( network, batch_id)
778
+ // finally, re-request the failed batch and all other batches in `AwaitingDownload` state.
779
+ self . attempt_send_awaiting_download_batches ( network, "handle_invalid_batch" )
778
780
}
779
781
780
782
pub fn stop_syncing ( & mut self ) {
@@ -810,6 +812,9 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
810
812
811
813
// advance the chain to the new validating epoch
812
814
self . advance_chain ( network, validating_epoch) ;
815
+ // attempt to download any batches stuck in the `AwaitingDownload` state because of
816
+ // a lack of peers earlier
817
+ self . attempt_send_awaiting_download_batches ( network, "start_syncing" ) ?;
813
818
if self . optimistic_start . is_none ( )
814
819
&& optimistic_epoch > self . processing_target
815
820
&& !self . attempted_optimistic_starts . contains ( & optimistic_epoch)
@@ -939,6 +944,41 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
939
944
}
940
945
}
941
946
947
+ /// Attempts to send all batches that are in `AwaitingDownload` state.
948
+ ///
949
+ /// Batches might get stuck in `AwaitingDownload` post peerdas because of lack of peers
950
+ /// in required subnets. We need to progress them if peers are available at a later point.
951
+ pub fn attempt_send_awaiting_download_batches (
952
+ & mut self ,
953
+ network : & mut SyncNetworkContext < T > ,
954
+ src : & str ,
955
+ ) -> ProcessingResult {
956
+ // Collect all batches in AwaitingDownload state and see if they can be sent
957
+ let awaiting_downloads: Vec < _ > = self
958
+ . batches
959
+ . iter ( )
960
+ . filter ( |( _, batch) | matches ! ( batch. state( ) , BatchState :: AwaitingDownload ) )
961
+ . map ( |( batch_id, _) | batch_id)
962
+ . copied ( )
963
+ . collect ( ) ;
964
+ debug ! (
965
+ ?awaiting_downloads,
966
+ src, "Attempting to send batches awaiting downlaod"
967
+ ) ;
968
+
969
+ for batch_id in awaiting_downloads {
970
+ if self . good_peers_on_sampling_subnets ( batch_id, network) {
971
+ self . send_batch ( network, batch_id) ?;
972
+ } else {
973
+ debug ! (
974
+ src = "attempt_send_awaiting_download_batches" ,
975
+ "Waiting for peers to be available on sampling column subnets"
976
+ ) ;
977
+ }
978
+ }
979
+ Ok ( KeepChain )
980
+ }
981
+
942
982
/// Requests the batch assigned to the given id from a given peer.
943
983
pub fn send_batch (
944
984
& mut self ,
@@ -1089,14 +1129,16 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
1089
1129
if !matches ! ( self . state, ChainSyncingState :: Syncing ) {
1090
1130
return Ok ( KeepChain ) ;
1091
1131
}
1092
-
1093
1132
// find the next pending batch and request it from the peer
1094
1133
1095
1134
// check if we have the batch for our optimistic start. If not, request it first.
1096
1135
// We wait for this batch before requesting any other batches.
1097
1136
if let Some ( epoch) = self . optimistic_start {
1098
1137
if !self . good_peers_on_sampling_subnets ( epoch, network) {
1099
- debug ! ( "Waiting for peers to be available on sampling column subnets" ) ;
1138
+ debug ! (
1139
+ src = "request_batches_optimistic" ,
1140
+ "Waiting for peers to be available on sampling column subnets"
1141
+ ) ;
1100
1142
return Ok ( KeepChain ) ;
1101
1143
}
1102
1144
@@ -1105,6 +1147,8 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
1105
1147
let optimistic_batch = BatchInfo :: new ( & epoch, EPOCHS_PER_BATCH , batch_type) ;
1106
1148
entry. insert ( optimistic_batch) ;
1107
1149
self . send_batch ( network, epoch) ?;
1150
+ } else {
1151
+ self . attempt_send_awaiting_download_batches ( network, "request_batches_optimistic" ) ?;
1108
1152
}
1109
1153
return Ok ( KeepChain ) ;
1110
1154
}
@@ -1179,7 +1223,10 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
1179
1223
// block and data column requests are currently coupled. This can be removed once we find a
1180
1224
// way to decouple the requests and do retries individually, see issue #6258.
1181
1225
if !self . good_peers_on_sampling_subnets ( self . to_be_downloaded , network) {
1182
- debug ! ( "Waiting for peers to be available on custody column subnets" ) ;
1226
+ debug ! (
1227
+ src = "include_next_batch" ,
1228
+ "Waiting for peers to be available on custody column subnets"
1229
+ ) ;
1183
1230
return None ;
1184
1231
}
1185
1232
0 commit comments