@@ -167,6 +167,53 @@ impl Node {
167
167
self . rack_secret_loader . is_collecting_shares_for_rack_secret ( epoch)
168
168
}
169
169
170
+ /// Handle a `PrepareAndCommit` message from nexus
171
+ pub fn prepare_and_commit (
172
+ & mut self ,
173
+ ctx : & mut impl NodeHandlerCtx ,
174
+ config : Configuration ,
175
+ ) -> Result < ( ) , PrepareAndCommitError > {
176
+ let ps = ctx. persistent_state ( ) ;
177
+
178
+ if let Some ( expunged) = & ps. expunged {
179
+ error ! (
180
+ self . log,
181
+ "PrepareAndCommit attempted on expunged node" ;
182
+ "expunged_epoch" => %expunged. epoch,
183
+ "expunging_node" => %expunged. from
184
+ ) ;
185
+ return Err ( PrepareAndCommitError :: Expunged {
186
+ epoch : expunged. epoch ,
187
+ from : expunged. from . clone ( ) ,
188
+ } ) ;
189
+ }
190
+
191
+ // If we have a configuration the rack id must match the one from
192
+ // Nexus
193
+ if let Some ( ps_rack_id) = ps. rack_id ( ) {
194
+ if config. rack_id != ps_rack_id {
195
+ error ! (
196
+ self . log,
197
+ "PrepareAndCommit attempted with invalid rack_id" ;
198
+ "expected" => %ps_rack_id,
199
+ "got" => %config. rack_id
200
+ ) ;
201
+ return Err ( PrepareAndCommitError :: InvalidRackId (
202
+ MismatchedRackIdError {
203
+ expected : ps_rack_id,
204
+ got : config. rack_id ,
205
+ } ,
206
+ ) ) ;
207
+ }
208
+ }
209
+
210
+ // `PrepareAndCommit` from Nexus shares a behavior with a
211
+ // `CommitAdvance` message from a peer node.
212
+ self . handle_commit_advance ( ctx, "Nexus" , "PrepareAndCommit" , config) ;
213
+
214
+ Ok ( ( ) )
215
+ }
216
+
170
217
/// Commit a configuration
171
218
///
172
219
/// This is triggered by a message from Nexus for each node in the
@@ -278,8 +325,9 @@ impl Node {
278
325
ctx. add_connection ( peer. clone ( ) ) ;
279
326
self . send_coordinator_msgs_to ( ctx, peer. clone ( ) ) ;
280
327
if let Some ( ksc) = & mut self . key_share_computer {
281
- ksc. on_connect ( ctx, peer) ;
328
+ ksc. on_connect ( ctx, peer. clone ( ) ) ;
282
329
}
330
+ self . rack_secret_loader . on_connect ( ctx, peer) ;
283
331
}
284
332
285
333
/// A peer node has disconnected from this one
@@ -335,7 +383,8 @@ impl Node {
335
383
self . handle_share ( ctx, from, epoch, share) ;
336
384
}
337
385
PeerMsgKind :: CommitAdvance ( config) => {
338
- self . handle_commit_advance ( ctx, from, config)
386
+ let from = from. to_string ( ) ;
387
+ self . handle_commit_advance ( ctx, & from, "CommitAdvance" , config)
339
388
}
340
389
PeerMsgKind :: Expunged ( epoch) => {
341
390
self . handle_expunged ( ctx, from, epoch) ;
@@ -461,7 +510,8 @@ impl Node {
461
510
fn handle_commit_advance (
462
511
& mut self ,
463
512
ctx : & mut impl NodeHandlerCtx ,
464
- from : PlatformId ,
513
+ from : & str ,
514
+ op : & str ,
465
515
config : Configuration ,
466
516
) {
467
517
// The sender sent us a configuration even though we are not part of the
@@ -470,100 +520,121 @@ impl Node {
470
520
if !config. members . contains_key ( ctx. platform_id ( ) ) {
471
521
error ! (
472
522
self . log,
473
- "Received CommitAdvance , but not a member of configuration" ;
523
+ "Received {op} , but not a member of configuration" ;
474
524
"from" => %from,
475
525
"epoch" => %config. epoch
476
526
) ;
477
527
return ;
478
528
}
479
529
480
- // We may have already advanced by the time we receive this message.
481
- // Let's check.
482
- if ctx. persistent_state ( ) . commits . contains ( & config. epoch ) {
483
- info ! (
484
- self . log,
485
- "Received CommitAdvance, but already committed" ;
486
- "from" => %from,
487
- "epoch" => %config. epoch
488
- ) ;
489
- return ;
530
+ if let Some ( latest_committed_epoch) =
531
+ ctx. persistent_state ( ) . latest_committed_epoch ( )
532
+ {
533
+ if latest_committed_epoch > config. epoch {
534
+ info ! (
535
+ self . log,
536
+ "Received {op}, but already committed at later epoch" ;
537
+ "from" => %from,
538
+ "epoch" => %config. epoch,
539
+ "latest_committed_epoch" => %latest_committed_epoch
540
+ ) ;
541
+ return ;
542
+ } else if latest_committed_epoch == config. epoch {
543
+ info ! (
544
+ self . log,
545
+ "Received {op}, but already committed" ;
546
+ "from" => %from,
547
+ "epoch" => %config. epoch
548
+ ) ;
549
+ return ;
550
+ }
490
551
}
552
+
553
+ let mut just_committed = false ;
491
554
if ctx. persistent_state ( ) . has_prepared ( config. epoch ) {
492
555
// Go ahead and commit
493
556
info ! (
494
557
self . log,
495
- "Received CommitAdvance . Already prepared, now committing" ;
558
+ "Received {op} . Already prepared, now committing" ;
496
559
"from" => %from,
497
560
"epoch" => %config. epoch
498
561
) ;
499
562
ctx. update_persistent_state ( |ps| ps. commits . insert ( config. epoch ) ) ;
500
- return ;
501
- }
502
-
503
- // Do we have the configuration in our persistent state? If not save it.
504
- if let Some ( existing) =
505
- ctx. persistent_state ( ) . configuration ( config. epoch )
506
- {
507
- if existing != & config {
508
- error ! (
509
- self . log,
510
- "Received a configuration mismatch" ;
511
- "from" => %from,
512
- "existing_config" => #?existing,
513
- "received_config" => #?config
514
- ) ;
515
- ctx. raise_alarm ( Alarm :: MismatchedConfigurations {
516
- config1 : ( * existing) . clone ( ) ,
517
- config2 : config. clone ( ) ,
518
- from : from. clone ( ) ,
563
+ just_committed = true ;
564
+ } else {
565
+ // Do we have the configuration in our persistent state? If not save it.
566
+ if let Some ( existing) =
567
+ ctx. persistent_state ( ) . configuration ( config. epoch )
568
+ {
569
+ if existing != & config {
570
+ error ! (
571
+ self . log,
572
+ "Received a configuration mismatch" ;
573
+ "from" => %from,
574
+ "existing_config" => #?existing,
575
+ "received_config" => #?config
576
+ ) ;
577
+ ctx. raise_alarm ( Alarm :: MismatchedConfigurations {
578
+ config1 : ( * existing) . clone ( ) ,
579
+ config2 : config. clone ( ) ,
580
+ from : from. to_string ( ) ,
581
+ } ) ;
582
+ return ;
583
+ }
584
+ } else {
585
+ ctx. update_persistent_state ( |ps| {
586
+ ps. configs
587
+ . insert_unique ( config. clone ( ) )
588
+ . expect ( "new config" ) ;
589
+ true
519
590
} ) ;
520
- return ;
521
591
}
522
- } else {
523
- ctx. update_persistent_state ( |ps| {
524
- ps. configs . insert_unique ( config. clone ( ) ) . expect ( "new config" ) ;
525
- true
526
- } ) ;
527
592
}
528
593
529
- // Are we coordinating for an older epoch? If so, cancel.
530
594
if let Some ( cs) = & self . coordinator_state {
531
595
let coordinating_epoch = cs. reconfigure_msg ( ) . epoch ( ) ;
596
+
597
+ // Are we coordinating for an older epoch? If so, cancel.
532
598
if coordinating_epoch < config. epoch {
533
599
info ! (
534
600
self . log,
535
- "Received CommitAdvance . Cancelling stale coordination" ;
601
+ "Received {op} . Cancelling stale coordination" ;
536
602
"from" => %from,
537
603
"coordinating_epoch" => %coordinating_epoch,
538
604
"received_epoch" => %config. epoch
539
605
) ;
540
606
self . coordinator_state = None ;
541
- // Intentionally fall through
542
607
} else if coordinating_epoch == config. epoch {
608
+ // We want to cancel coordination here as well. Nexus has
609
+ // informed the sending node of the commit (or it learned from
610
+ // another node), and Nexus will eventually inform us. But since
611
+ // we have committed above by updating the persistent state, the
612
+ // message from Nexus will be a no-op.
543
613
info ! (
544
614
self . log,
545
- "Received CommitAdvance while coordinating for same epoch!" ;
615
+ "Received {op} while coordinating for same epoch. \
616
+ Cancelling coordination.";
546
617
"from" => %from,
547
618
"epoch" => %config. epoch
548
619
) ;
549
- return ;
620
+ self . coordinator_state = None ;
550
621
} else {
622
+ // We are coordinating for a later epoch. Continue to do so.
551
623
info ! (
552
624
self . log,
553
- "Received CommitAdvance for stale epoch while coordinating" ;
625
+ "Received {op} for stale epoch while coordinating" ;
554
626
"from" => %from,
555
627
"received_epoch" => %config. epoch,
556
628
"coordinating_epoch" => %coordinating_epoch
557
629
) ;
558
- return ;
559
630
}
560
631
}
561
632
562
633
// Are we already trying to compute our share for this config?
563
634
if let Some ( ksc) = & mut self . key_share_computer {
564
635
if ksc. config ( ) . epoch > config. epoch {
565
636
let msg = concat ! (
566
- "Received stale CommitAdvance . " ,
637
+ "Received stale {op} . " ,
567
638
"Already computing for later epoch"
568
639
) ;
569
640
info ! (
@@ -577,27 +648,32 @@ impl Node {
577
648
} else if ksc. config ( ) . epoch == config. epoch {
578
649
info ! (
579
650
self . log,
580
- "Received CommitAdvance while already computing share" ;
651
+ "Received {op} while already computing share" ;
581
652
"from" => %from,
582
653
"epoch" => %config. epoch
583
654
) ;
655
+ if just_committed {
656
+ self . key_share_computer = None ;
657
+ }
584
658
return ;
585
659
} else {
586
660
info ! (
587
661
self . log,
588
- "Received CommitAdvance while computing share for old epoch" ;
662
+ "Received {op} while computing share for old epoch" ;
589
663
"from" => %from,
590
664
"epoch" => %ksc. config( ) . epoch,
591
665
"received_epoch" => %config. epoch
592
666
) ;
667
+ self . key_share_computer = None ;
593
668
// Intentionally fall through
594
669
}
595
670
}
596
671
597
- // We either were collecting shares for an old epoch or haven't started
598
- // yet.
599
- self . key_share_computer =
600
- Some ( KeyShareComputer :: new ( & self . log , ctx, config) ) ;
672
+ // We need to compute our key share for this epoch if we have gotten here and not committed.
673
+ if !just_committed {
674
+ self . key_share_computer =
675
+ Some ( KeyShareComputer :: new ( & self . log , ctx, config) ) ;
676
+ }
601
677
}
602
678
603
679
fn handle_get_share (
@@ -814,9 +890,8 @@ impl Node {
814
890
ctx : & mut impl NodeHandlerCtx ,
815
891
platform_id : PlatformId ,
816
892
) {
817
- // This function is called unconditionally in `tick` callbacks. In this
818
- // case we may not actually be a coordinator. We ignore the call in
819
- // that case.
893
+ // This function is called unconditionally in callbacks. We may not
894
+ // actually be a coordinator. We ignore the call in that case.
820
895
if let Some ( c) = self . coordinator_state . as_mut ( ) {
821
896
c. send_msgs_to ( ctx, platform_id) ;
822
897
}
@@ -878,6 +953,18 @@ pub enum CommitError {
878
953
Expunged { epoch : Epoch , from : PlatformId } ,
879
954
}
880
955
956
+ #[ derive( Debug , Clone , thiserror:: Error , PartialEq , Eq ) ]
957
+ pub enum PrepareAndCommitError {
958
+ #[ error( "invalid rack id" ) ]
959
+ InvalidRackId (
960
+ #[ from]
961
+ #[ source]
962
+ MismatchedRackIdError ,
963
+ ) ,
964
+ #[ error( "cannot commit: expunged at epoch {epoch} by {from}" ) ]
965
+ Expunged { epoch : Epoch , from : PlatformId } ,
966
+ }
967
+
881
968
#[ cfg( test) ]
882
969
mod tests {
883
970
use super :: * ;
0 commit comments