Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 47 additions & 100 deletions trust-quorum/test-utils/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub struct TqState {
pub member_universe: Vec<PlatformId>,

/// All possible system faults in our test
pub faults: Faults,
pub crashed_nodes: BTreeSet<PlatformId>,

/// All configurations ever generated by a coordinator.
///
Expand All @@ -79,7 +79,7 @@ impl TqState {
underlay_network: Vec::new(),
nexus: NexusState::new(),
member_universe,
faults: Faults::default(),
crashed_nodes: BTreeSet::new(),
all_coordinated_configs: IdOrdMap::new(),
expunged: BTreeSet::new(),
}
Expand All @@ -91,7 +91,7 @@ impl TqState {
pub fn send_reconfigure_msg(&mut self) {
let (coordinator, msg) = self.nexus.reconfigure_msg_for_latest_config();
let epoch_to_config = msg.epoch;
if !self.faults.crashed_nodes.contains(coordinator) {
if !self.crashed_nodes.contains(coordinator) {
let (node, ctx) = self
.sut
.nodes
Expand Down Expand Up @@ -122,7 +122,7 @@ impl TqState {
let (coordinator, msg) = self.nexus.reconfigure_msg_for_latest_config();

// The coordinator should have received the `ReconfigureMsg` from Nexus
if !self.faults.crashed_nodes.contains(coordinator) {
if !self.crashed_nodes.contains(coordinator) {
let (node, ctx) = self
.sut
.nodes
Expand All @@ -131,22 +131,24 @@ impl TqState {
let mut connected_members = 0;
// The coordinator should start preparing by sending a `PrepareMsg` to all
// connected nodes in the membership set.
for member in
msg.members.iter().filter(|&id| id != coordinator).cloned()
for member in msg
.members
.iter()
.filter(|&id| {
!self.crashed_nodes.contains(id) && id != ctx.platform_id()
})
.cloned()
{
if self.faults.is_connected(coordinator.clone(), member.clone())
{
connected_members += 1;
let msg_found = ctx.envelopes().any(|envelope| {
envelope.to == member
&& envelope.from == *coordinator
&& matches!(
envelope.msg.kind,
PeerMsgKind::Prepare { .. }
)
});
assert!(msg_found);
}
connected_members += 1;
let msg_found = ctx.envelopes().any(|envelope| {
envelope.to == member
&& envelope.from == *coordinator
&& matches!(
envelope.msg.kind,
PeerMsgKind::Prepare { .. }
)
});
assert!(msg_found);
}
assert_eq!(connected_members, ctx.envelopes().count());

Expand Down Expand Up @@ -176,7 +178,7 @@ impl TqState {
// Only send envelopes to alive nodes
for envelope in ctx
.drain_envelopes()
.filter(|e| !self.faults.crashed_nodes.contains(&e.to))
.filter(|e| !self.crashed_nodes.contains(&e.to))
{
let msgs =
self.bootstrap_network.entry(envelope.to.clone()).or_default();
Expand Down Expand Up @@ -241,7 +243,7 @@ impl TqState {
// Create the SUT nodes
self.sut = Sut::new(&self.log, self.member_universe.clone());

self.faults.crashed_nodes = crashed_nodes;
self.crashed_nodes = crashed_nodes;

// Inform nexus about the initial configuration
self.nexus.configs.insert_unique(config).expect("new config");
Expand All @@ -251,11 +253,13 @@ impl TqState {
.sut
.nodes
.iter_mut()
.filter(|(id, _)| !self.faults.crashed_nodes.contains(id))
.filter(|(id, _)| !self.crashed_nodes.contains(id))
{
for to in self.member_universe.iter().filter(|id| {
!self.faults.crashed_nodes.contains(id) && from != *id
}) {
for to in self
.member_universe
.iter()
.filter(|id| !self.crashed_nodes.contains(id) && from != *id)
{
node.on_connect(ctx, to.clone());
}
}
Expand Down Expand Up @@ -339,7 +343,7 @@ impl TqState {
self.bootstrap_network.remove(&id);

// Keep track of the crashed node
self.faults.crashed_nodes.insert(id.clone());
self.crashed_nodes.insert(id.clone());

// We get to define the semantics of the network with regards to an
// inflight message sourced from a crashed node. We have two choices:
Expand All @@ -355,7 +359,7 @@ impl TqState {
.sut
.nodes
.iter_mut()
.filter(|(id, _)| !self.faults.crashed_nodes.contains(id))
.filter(|(id, _)| !self.crashed_nodes.contains(id))
{
node.on_disconnect(ctx, id.clone());
}
Expand All @@ -367,7 +371,7 @@ impl TqState {
connection_order: Vec<PlatformId>,
) {
// The node is no longer crashed.
self.faults.crashed_nodes.remove(&id);
self.crashed_nodes.remove(&id);

// We need to clear the mutable state of the `Node`. We do this by
// creating a new `Node` and passing in the existing context which
Expand All @@ -390,14 +394,18 @@ impl TqState {
send_envelopes(
peer_ctx,
&mut self.bootstrap_network,
&mut self.faults,
&self.crashed_nodes,
);

let (node, ctx) = self.sut.nodes.get_mut(&id).expect("node exists");
// Inform the restarted node of the connection
node.on_connect(ctx, peer);
// Send any messages output as a result of the connection
send_envelopes(ctx, &mut self.bootstrap_network, &self.faults);
send_envelopes(
ctx,
&mut self.bootstrap_network,
&self.crashed_nodes,
);
}
}

Expand Down Expand Up @@ -478,7 +486,7 @@ impl TqState {
}

// Send any messages as a result of handling this message
send_envelopes(ctx, &mut self.bootstrap_network, &self.faults);
send_envelopes(ctx, &mut self.bootstrap_network, &self.crashed_nodes);

// Remove any destinations with zero messages in-flight
self.bootstrap_network.retain(|_, msgs| !msgs.is_empty());
Expand Down Expand Up @@ -536,10 +544,10 @@ impl TqState {
fn send_envelopes(
ctx: &mut NodeCtx,
bootstrap_network: &mut BTreeMap<PlatformId, Vec<Envelope>>,
faults: &Faults,
crashed_nodes: &BTreeSet<PlatformId>,
) {
for envelope in
ctx.drain_envelopes().filter(|e| !faults.crashed_nodes.contains(&e.to))
ctx.drain_envelopes().filter(|e| !crashed_nodes.contains(&e.to))
{
let envelopes =
bootstrap_network.entry(envelope.to.clone()).or_default();
Expand Down Expand Up @@ -574,55 +582,6 @@ impl Sut {
}
}

/// Faults in our system. It's useful to keep these self contained and not
/// in separate fields in `TestState` so that we can access them all at once
/// independently of other `TestState` fields.
#[derive(Default, Debug, Clone, Diffable)]
pub struct Faults {
// We allow nodes to crash and restart and therefore track crashed nodes here.
//
// A crashed node is implicitly disconnected from every other node. We don't
// bother storing the pairs in `disconnected_nodes`, but instead check both
// fields when necessary.
pub crashed_nodes: BTreeSet<PlatformId>,

/// The set of disconnected nodes
pub disconnected_nodes: DisconnectedNodes,
}

impl Faults {
pub fn is_connected(&self, node1: PlatformId, node2: PlatformId) -> bool {
!self.crashed_nodes.contains(&node1)
&& !self.crashed_nodes.contains(&node2)
&& !self.disconnected_nodes.contains(node1, node2)
}
}

/// For cardinality purposes, we assume all nodes are connected and explicitly
/// disconnect some of them. This allows us to track and compare much less data.
#[derive(Default, Debug, Clone, Diffable)]
pub struct DisconnectedNodes {
// We sort each pair on insert for quick lookups
pairs: BTreeSet<(PlatformId, PlatformId)>,
}

impl DisconnectedNodes {
// Return true if the pair is newly inserted
pub fn insert(&mut self, node1: PlatformId, node2: PlatformId) -> bool {
assert_ne!(node1, node2);

let pair = if node1 < node2 { (node1, node2) } else { (node2, node1) };
self.pairs.insert(pair)
}

// Return true if the pair of nodes is disconnected, false otherwise
pub fn contains(&self, node1: PlatformId, node2: PlatformId) -> bool {
assert_ne!(node1, node2);
let pair = if node1 < node2 { (node1, node2) } else { (node2, node1) };
self.pairs.contains(&pair)
}
}

/*****************************************************************************
*
* Diff related display code
Expand Down Expand Up @@ -657,7 +616,7 @@ impl Display for TqStateDiff<'_> {
display_bootstrap_network_diff(&self.bootstrap_network, f)?;
display_underlay_network_diff(&self.underlay_network, f)?;
display_nexus_state_diff(&self.nexus, f)?;
display_faults_diff(&self.faults, f)?;
display_faults_diff(&self.crashed_nodes, f)?;
display_expunged_diff(&self.expunged, f)?;

Ok(())
Expand All @@ -678,34 +637,22 @@ fn display_expunged_diff(
}

fn display_faults_diff(
diff: &FaultsDiff<'_>,
crashed_nodes: &BTreeSetDiff<'_, PlatformId>,
f: &mut std::fmt::Formatter<'_>,
) -> std::fmt::Result {
if !diff.crashed_nodes.added.is_empty() {
if !crashed_nodes.added.is_empty() {
writeln!(f, " Nodes crashed:")?;
for id in &diff.crashed_nodes.added {
for id in &crashed_nodes.added {
writeln!(f, " {id}")?;
}
}
if !diff.crashed_nodes.removed.is_empty() {
if !crashed_nodes.removed.is_empty() {
writeln!(f, " nodes started:")?;
for id in &diff.crashed_nodes.removed {
for id in &crashed_nodes.removed {
writeln!(f, " {id}")?;
}
}

if !diff.disconnected_nodes.pairs.added.is_empty() {
writeln!(f, " nodes disconnected from each other:")?;
for pair in &diff.disconnected_nodes.pairs.added {
writeln!(f, " {}, {}", pair.0, pair.1)?;
}
}
if !diff.disconnected_nodes.pairs.removed.is_empty() {
writeln!(f, " nodes connected to each other:")?;
for pair in &diff.disconnected_nodes.pairs.removed {
writeln!(f, " {}, {}", pair.0, pair.1)?;
}
}
Ok(())
}

Expand Down
29 changes: 12 additions & 17 deletions trust-quorum/tests/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ impl TestState {
// bootstrap network. We could choose not to actually deliver it when
// applying the event, but that means we have events that don't actually
// do anything in our event log, which is quite misleading.
assert!(!self.tq_state.faults.crashed_nodes.contains(destination));
assert!(!self.tq_state.crashed_nodes.contains(destination));

// We pop from the back and push on the front
let envelope = self
Expand All @@ -192,7 +192,7 @@ impl TestState {
.tq_state
.member_universe
.iter()
.filter(|m| !self.tq_state.faults.crashed_nodes.contains(&m))
.filter(|m| !self.tq_state.crashed_nodes.contains(&m))
.peekable();

if faultable.peek().is_none() {
Expand All @@ -209,11 +209,11 @@ impl TestState {
// out or receiving an error response on node restart because the
// configuration was lost.
vec![
Event::CrashNode(id.clone()),
Event::CrashNode(id),
Event::AbortConfiguration(latest_config.epoch),
]
} else {
vec![Event::CrashNode(id.clone())]
vec![Event::CrashNode(id)]
}
}

Expand All @@ -222,12 +222,12 @@ impl TestState {
id: Selector,
connection_order_indexes: Vec<Index>,
) -> Vec<Event> {
if self.tq_state.faults.crashed_nodes.is_empty() {
if self.tq_state.crashed_nodes.is_empty() {
return vec![];
}

// Choose the node to restart
let id = id.select(self.tq_state.faults.crashed_nodes.iter()).clone();
let id = id.select(self.tq_state.crashed_nodes.iter()).clone();

// Now order the peer connections

Expand All @@ -236,7 +236,7 @@ impl TestState {
.tq_state
.member_universe
.iter()
.filter(|id| !self.tq_state.faults.crashed_nodes.contains(id))
.filter(|id| !self.tq_state.crashed_nodes.contains(id))
.cloned()
.collect();

Expand Down Expand Up @@ -272,7 +272,7 @@ impl TestState {
let mut loadable = c
.members
.iter()
.filter(|m| !self.tq_state.faults.crashed_nodes.contains(m))
.filter(|m| !self.tq_state.crashed_nodes.contains(m))
.peekable();
if loadable.peek().is_some() {
let id = selector.select(loadable).clone();
Expand Down Expand Up @@ -311,7 +311,7 @@ impl TestState {
let mut loadable = c
.members
.iter()
.filter(|m| !self.tq_state.faults.crashed_nodes.contains(m))
.filter(|m| !self.tq_state.crashed_nodes.contains(m))
.peekable();
if loadable.peek().is_none() {
return vec![];
Expand All @@ -329,12 +329,7 @@ impl TestState {
}

// If the coordinator is currently down then Nexus should abort.
if self
.tq_state
.faults
.crashed_nodes
.contains(&latest_config.coordinator)
{
if self.tq_state.crashed_nodes.contains(&latest_config.coordinator) {
events.push(Event::AbortConfiguration(latest_config.epoch));
return events;
}
Expand Down Expand Up @@ -387,7 +382,7 @@ impl TestState {
let committable: Vec<_> = latest_config
.prepared_members
.difference(&latest_config.committed_members)
.filter(|m| !self.tq_state.faults.crashed_nodes.contains(m))
.filter(|m| !self.tq_state.crashed_nodes.contains(m))
.collect();

if committable.is_empty() {
Expand Down Expand Up @@ -534,7 +529,7 @@ impl TestState {
);
let mut events = vec![Event::Reconfigure(nexus_config)];

if self.tq_state.faults.crashed_nodes.contains(&coordinator) {
if self.tq_state.crashed_nodes.contains(&coordinator) {
// This simulates a timeout on the reply from the coordinator which
// triggers an abort.
events.push(Event::AbortConfiguration(epoch));
Expand Down
Loading