Skip to content

Commit e0e82be

Browse files
committed
TQ: Add node crash/restart actions to cluster test
We ensure that messages don't get sent to crashed nodes and API calls on the crashed nodes are not triggered. We clear all in memory state on node restart, while maintaining persistent state.
1 parent bc9a55b commit e0e82be

File tree

5 files changed

+214
-6
lines changed

5 files changed

+214
-6
lines changed

trust-quorum/src/node_ctx.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,14 @@ impl NodeCtx {
133133
alarms: BTreeSet::new(),
134134
}
135135
}
136+
137+
#[cfg(any(test, feature = "testing"))]
138+
pub fn clear_mutable_state(&mut self) {
139+
self.persistent_state_changed = false;
140+
self.outgoing.clear();
141+
self.connected.clear();
142+
self.alarms.clear();
143+
}
136144
}
137145

138146
impl NodeCommonCtx for NodeCtx {

trust-quorum/src/rack_secret_loader.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,12 +192,14 @@ pub struct ShareCollector {
192192
shares: BTreeMap<PlatformId, Share>,
193193
}
194194

195+
#[cfg(feature = "danger_partial_eq_ct_wrapper")]
195196
impl PartialEq for ShareCollector {
196197
fn eq(&self, other: &Self) -> bool {
197198
self.config == other.config && self.shares == other.shares
198199
}
199200
}
200201

202+
#[cfg(feature = "danger_partial_eq_ct_wrapper")]
201203
impl Eq for ShareCollector {}
202204

203205
impl<'daft> ShareCollectorDiff<'daft> {

trust-quorum/test-utils/src/event.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,11 @@ pub enum Event {
3333
DeliverNexusReply(NexusReply),
3434
CommitConfiguration(PlatformId),
3535
Reconfigure(NexusConfig),
36+
CrashNode(PlatformId),
37+
RestartNode {
38+
id: PlatformId,
39+
connection_order: Vec<PlatformId>,
40+
},
3641
}
3742

3843
impl Event {
@@ -50,6 +55,12 @@ impl Event {
5055
Self::ClearSecrets(id) => vec![id.clone()],
5156
Self::CommitConfiguration(id) => vec![id.clone()],
5257
Self::Reconfigure(_) => vec![],
58+
Self::CrashNode(id) => vec![id.clone()],
59+
Self::RestartNode { id, connection_order } => {
60+
let mut nodes = connection_order.clone();
61+
nodes.push(id.clone());
62+
nodes
63+
}
5364
}
5465
}
5566
}

trust-quorum/test-utils/src/state.rs

Lines changed: 80 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,11 @@ impl TqState {
176176

177177
pub fn send_envelopes_from(&mut self, id: &PlatformId) {
178178
let (_, ctx) = self.sut.nodes.get_mut(id).expect("node exists");
179-
for envelope in ctx.drain_envelopes() {
179+
// Only send envelopes to alive nodes
180+
for envelope in ctx
181+
.drain_envelopes()
182+
.filter(|e| !self.faults.crashed_nodes.contains(&e.to))
183+
{
180184
let msgs =
181185
self.bootstrap_network.entry(envelope.to.clone()).or_default();
182186
msgs.push(envelope);
@@ -220,6 +224,12 @@ impl TqState {
220224
Event::Reconfigure(nexus_config) => {
221225
self.apply_event_reconfigure(nexus_config)
222226
}
227+
Event::CrashNode(id) => {
228+
self.apply_event_crash_node(id);
229+
}
230+
Event::RestartNode { id, connection_order } => {
231+
self.apply_event_restart_node(id, connection_order);
232+
}
223233
}
224234
}
225235

@@ -327,6 +337,70 @@ impl TqState {
327337
self.underlay_network.push(reply);
328338
}
329339

340+
fn apply_event_crash_node(&mut self, id: PlatformId) {
341+
// We clear all the crashed node's destination messages
342+
self.bootstrap_network.remove(&id);
343+
344+
// Keep track of the crashed node
345+
self.faults.crashed_nodes.insert(id.clone());
346+
347+
// We get to define the semantics of the network with regards to an
348+
// inflight message sourced from a crashed node. We have two choices:
349+
// drop the message or let it be eventually delivered to the desination
350+
// if the destination node doesn't crash before delivery. We choose
351+
// the latter mostly for efficiency: we don't want to have to loop over
352+
// every destination in the bootstrap network and filter messages.
353+
//
354+
// However, we do still have to call `node.on_disconnect()` at all
355+
// connected nodes, so do that now. For simplicity, we do this at every
356+
// alive node in the same step.
357+
for (_, (node, ctx)) in self
358+
.sut
359+
.nodes
360+
.iter_mut()
361+
.filter(|(id, _)| !self.faults.crashed_nodes.contains(id))
362+
{
363+
node.on_disconnect(ctx, id.clone());
364+
}
365+
}
366+
367+
fn apply_event_restart_node(
368+
&mut self,
369+
id: PlatformId,
370+
connection_order: Vec<PlatformId>,
371+
) {
372+
// We need to clear the mutable state of the `Node`. We do this by
373+
// creating a new `Node` and passing in the existing context which
374+
// contains the persistent state.
375+
{
376+
let (node, ctx) = self.sut.nodes.get_mut(&id).expect("node exists");
377+
ctx.clear_mutable_state();
378+
*node = Node::new(&self.log, ctx);
379+
}
380+
381+
// We now need to connect to each node in the order given in
382+
// `connection_order`. We do this by calling `on_connect` at the
383+
// restarted node and the node in `connection_order`;
384+
for peer in connection_order {
385+
let (peer_node, peer_ctx) =
386+
self.sut.nodes.get_mut(&peer).expect("node exists");
387+
// Inform the peer of the connection
388+
peer_node.on_connect(peer_ctx, id.clone());
389+
// Send any messages output as a result of the connection
390+
send_envelopes(
391+
peer_ctx,
392+
&mut self.bootstrap_network,
393+
&mut self.faults,
394+
);
395+
396+
let (node, ctx) = self.sut.nodes.get_mut(&id).expect("node exists");
397+
// Inform the restarted node of the connection
398+
node.on_connect(ctx, peer);
399+
// Send any messages output as a result of the connection
400+
send_envelopes(ctx, &mut self.bootstrap_network, &self.faults);
401+
}
402+
}
403+
330404
fn apply_event_deliver_nexus_reply(&mut self, recorded_reply: NexusReply) {
331405
let mut latest_config = self.nexus.latest_config_mut();
332406
let reply = self.underlay_network.pop().expect("reply exists");
@@ -404,7 +478,7 @@ impl TqState {
404478
}
405479

406480
// Send any messages as a result of handling this message
407-
send_envelopes(ctx, &mut self.bootstrap_network);
481+
send_envelopes(ctx, &mut self.bootstrap_network, &self.faults);
408482

409483
// Remove any destinations with zero messages in-flight
410484
self.bootstrap_network.retain(|_, msgs| !msgs.is_empty());
@@ -462,8 +536,11 @@ impl TqState {
462536
fn send_envelopes(
463537
ctx: &mut NodeCtx,
464538
bootstrap_network: &mut BTreeMap<PlatformId, Vec<Envelope>>,
539+
faults: &Faults,
465540
) {
466-
for envelope in ctx.drain_envelopes() {
541+
for envelope in
542+
ctx.drain_envelopes().filter(|e| !faults.crashed_nodes.contains(&e.to))
543+
{
467544
let envelopes =
468545
bootstrap_network.entry(envelope.to.clone()).or_default();
469546
envelopes.push(envelope);

trust-quorum/tests/cluster.rs

Lines changed: 113 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,10 @@ impl TestState {
139139
threshold,
140140
coordinator,
141141
),
142+
Action::CrashNode(index) => self.action_to_events_crash_node(index),
143+
Action::RestartNode { id, connection_order } => {
144+
self.action_to_events_restart_node(id, connection_order)
145+
}
142146
}
143147
}
144148

@@ -154,6 +158,16 @@ impl TestState {
154158
let destination =
155159
selector.select(self.tq_state.bootstrap_network.keys());
156160

161+
// Envelopes should never be sent on the bootstrap network to crashed nodes
162+
// when events are applied in `TqState::apply_event`.
163+
//
164+
// The rationale is that we don't mutate state here, and so we can't
165+
// choose to pop the message that shouldn't be delivered off the
166+
// bootstrap network. We could choose not to actually deliver it when
167+
// applying the event, but that means we have events that don't actually
168+
// do anything in our event log, which is quite misleading.
169+
assert!(!self.tq_state.faults.crashed_nodes.contains(destination));
170+
157171
// We pop from the back and push on the front
158172
let envelope = self
159173
.tq_state
@@ -167,14 +181,84 @@ impl TestState {
167181
events
168182
}
169183

184+
fn action_to_events_crash_node(&self, selector: Selector) -> Vec<Event> {
185+
let mut faultable = self
186+
.tq_state
187+
.member_universe
188+
.iter()
189+
.filter(|m| !self.tq_state.faults.crashed_nodes.contains(&m))
190+
.peekable();
191+
192+
if faultable.peek().is_none() {
193+
// All nodes are down
194+
return vec![];
195+
}
196+
197+
let id = selector.select(faultable).clone();
198+
vec![Event::CrashNode(id)]
199+
}
200+
201+
fn action_to_events_restart_node(
202+
&self,
203+
id: Selector,
204+
connection_order_indexes: Vec<Index>,
205+
) -> Vec<Event> {
206+
if self.tq_state.faults.crashed_nodes.is_empty() {
207+
return vec![];
208+
}
209+
210+
// Choose the node to restart
211+
let id = id.select(self.tq_state.faults.crashed_nodes.iter()).clone();
212+
213+
// Now order the peer connections
214+
215+
// First find all the peers we want to connect to.
216+
let mut to_connect: Vec<_> = self
217+
.tq_state
218+
.member_universe
219+
.iter()
220+
.filter(|id| !self.tq_state.faults.crashed_nodes.contains(id))
221+
.cloned()
222+
.collect();
223+
224+
let total_connections = to_connect.len();
225+
226+
// Then remove them from `to_connect` and put them into `connection_order`.
227+
let mut connection_order = vec![];
228+
for index in connection_order_indexes {
229+
if to_connect.is_empty() {
230+
break;
231+
}
232+
let i = index.index(to_connect.len());
233+
let dst = to_connect.swap_remove(i);
234+
connection_order.push(dst);
235+
}
236+
237+
// If there is anything left in `to_connect`, then just extend
238+
// `connection_order` with it.
239+
connection_order.extend_from_slice(&to_connect);
240+
241+
// Ensure we have exactly the number of connections we want
242+
assert_eq!(connection_order.len(), total_connections);
243+
244+
vec![Event::RestartNode { id, connection_order }]
245+
}
246+
170247
fn action_to_events_load_latest_rack_secret(
171248
&self,
172249
selector: Selector,
173250
) -> Vec<Event> {
174251
let mut events = vec![];
175252
if let Some(c) = self.tq_state.nexus.last_committed_config() {
176-
let id = selector.select(c.members.iter()).clone();
177-
events.push(Event::LoadRackSecret(id, c.epoch));
253+
let mut loadable = c
254+
.members
255+
.iter()
256+
.filter(|m| !self.tq_state.faults.crashed_nodes.contains(m))
257+
.peekable();
258+
if loadable.peek().is_some() {
259+
let id = selector.select(loadable).clone();
260+
events.push(Event::LoadRackSecret(id, c.epoch));
261+
}
178262
}
179263
events
180264
}
@@ -205,6 +289,14 @@ impl TestState {
205289
return vec![];
206290
}
207291
let c = config.select(committed_configs_iter);
292+
let mut loadable = c
293+
.members
294+
.iter()
295+
.filter(|m| !self.tq_state.faults.crashed_nodes.contains(m))
296+
.peekable();
297+
if loadable.peek().is_none() {
298+
return vec![];
299+
}
208300
let id = id.select(c.members.iter()).clone();
209301
vec![Event::LoadRackSecret(id, c.epoch)]
210302
}
@@ -277,6 +369,7 @@ impl TestState {
277369
let committable: Vec<_> = latest_config
278370
.prepared_members
279371
.difference(&latest_config.committed_members)
372+
.filter(|m| !self.tq_state.faults.crashed_nodes.contains(m))
280373
.collect();
281374

282375
if committable.is_empty() {
@@ -703,6 +796,22 @@ pub enum Action {
703796
threshold: Index,
704797
coordinator: Selector,
705798
},
799+
800+
/// Crash a random node in the universe
801+
#[weight(2)]
802+
CrashNode(Selector),
803+
804+
/// Restart a crashed node if there is one
805+
///
806+
/// We randomize the connection order, because that influences the order
807+
/// that messages sent on reconnect will get delivered to the newly
808+
/// connected node.
809+
#[weight(2)]
810+
RestartNode {
811+
id: Selector,
812+
#[any(size_range(MEMBER_UNIVERSE_SIZE-1..MEMBER_UNIVERSE_SIZE).lift())]
813+
connection_order: Vec<Index>,
814+
},
706815
}
707816

708817
const MIN_CLUSTER_SIZE: usize = 3;
@@ -770,6 +879,7 @@ fn test_trust_quorum_protocol(input: TestInput) {
770879
let (parent_dir, _) = log_prefix_for_test(logctx.test_name());
771880
let event_log_path = parent_dir.join(format!("{test_name}.events.json"));
772881
let mut event_log = EventLog::new(&event_log_path);
882+
println!("Event log path: {event_log_path}");
773883

774884
let log = logctx.log.new(o!("component" => "tq-proptest"));
775885
let mut state = TestState::new(log.clone());
@@ -789,6 +899,6 @@ fn test_trust_quorum_protocol(input: TestInput) {
789899
"skipped_actions" => state.skipped_actions
790900
);
791901

792-
let _ = std::fs::remove_file(event_log_path);
902+
// let _ = std::fs::remove_file(event_log_path);
793903
logctx.cleanup_successful();
794904
}

0 commit comments

Comments
 (0)