From 06dfa4dbf6fdc64adac98541927cc9088523fb19 Mon Sep 17 00:00:00 2001 From: "Andrew J. Stone" Date: Wed, 20 Aug 2025 19:53:00 +0000 Subject: [PATCH 1/2] Add support for "Expunge" messages When a node requests a share for an old configuration it will receive an `Expunged` message if the share request receiving node has a later committed configuration where the node is not a member. When the `Expunged` message is receieved, the expunged node persists this fact and stops replying to peer messages. We also fix a bug in the test where nexus wasn't actually committing configurations at all. So far invariants really slow the test down. On my machine checking the invariants after every applied event makes the test take ~120s. Without invariant checking, it takes about 14s. I definitely want to add more invariants for correctness, but maybe not check them when they aren't applicable. Some may become postconditions on certain events instead. --- trust-quorum/src/lib.rs | 4 +- trust-quorum/src/messages.rs | 2 +- trust-quorum/src/node.rs | 118 +++++++++++++++++++++++++-- trust-quorum/src/persistent_state.rs | 11 ++- trust-quorum/test-utils/src/state.rs | 50 +++++++++++- trust-quorum/tests/cluster.rs | 34 +++++++- 6 files changed, 203 insertions(+), 16 deletions(-) diff --git a/trust-quorum/src/lib.rs b/trust-quorum/src/lib.rs index aed8a518b9..bd0536f10d 100644 --- a/trust-quorum/src/lib.rs +++ b/trust-quorum/src/lib.rs @@ -39,7 +39,9 @@ pub use node::{Node, NodeDiff}; // public only for docs. pub use node_ctx::NodeHandlerCtx; pub use node_ctx::{NodeCallerCtx, NodeCommonCtx, NodeCtx, NodeCtxDiff}; -pub use persistent_state::{PersistentState, PersistentStateSummary}; +pub use persistent_state::{ + ExpungedMetadata, PersistentState, PersistentStateSummary, +}; #[derive( Debug, diff --git a/trust-quorum/src/messages.rs b/trust-quorum/src/messages.rs index 3167cba500..c373a4350d 100644 --- a/trust-quorum/src/messages.rs +++ b/trust-quorum/src/messages.rs @@ -65,7 +65,7 @@ pub enum PeerMsgKind { LrtqShare(LrtqShare), /// Inform a node that it is no longer part of the trust quorum as of the - /// given epoch + /// given epoch, which the responder knows is commmitted. Expunged(Epoch), /// Inform a node that it is utilizing an old committed onfiguration and diff --git a/trust-quorum/src/node.rs b/trust-quorum/src/node.rs index 16503dbef8..77e7fd184a 100644 --- a/trust-quorum/src/node.rs +++ b/trust-quorum/src/node.rs @@ -20,8 +20,8 @@ use crate::validators::{ MismatchedRackIdError, ReconfigurationError, ValidatedReconfigureMsg, }; use crate::{ - Alarm, Configuration, CoordinatorState, Epoch, NodeHandlerCtx, PlatformId, - messages::*, + Alarm, Configuration, CoordinatorState, Epoch, ExpungedMetadata, + NodeHandlerCtx, PlatformId, messages::*, }; use daft::{Diffable, Leaf}; use gfss::shamir::Share; @@ -243,13 +243,26 @@ impl Node { from: PlatformId, msg: PeerMsg, ) { + if ctx.persistent_state().is_expunged() { + warn!( + self.log, + "Received message while expunged. Dropping."; + "from" => %from, + "msg" => msg.kind.name() + ); + return; + } + if let Some(rack_id) = ctx.persistent_state().rack_id() { if rack_id != msg.rack_id { - error!(self.log, "Mismatched rack id"; - "from" => %from, - "msg" => msg.kind.name(), - "expected" => %rack_id, - "got" => %msg.rack_id); + error!( + self.log, + "Mismatched rack id"; + "from" => %from, + "msg" => msg.kind.name(), + "expected" => %rack_id, + "got" => %msg.rack_id + ); return; } } @@ -269,6 +282,9 @@ impl Node { PeerMsgKind::CommitAdvance(config) => { self.handle_commit_advance(ctx, from, config) } + PeerMsgKind::Expunged(epoch) => { + self.handle_expunged(ctx, from, epoch); + } _ => todo!( "cannot handle message variant yet - not implemented: {msg:?}" ), @@ -308,6 +324,81 @@ impl Node { } } + fn handle_expunged( + &mut self, + ctx: &mut impl NodeHandlerCtx, + from: PlatformId, + epoch: Epoch, + ) { + if let Some(config) = ctx.persistent_state().latest_config() { + if epoch < config.epoch { + // It's possible, but unlikely, that we were expunged at `epoch` + // and later re-added to the trust-quorum, but the reply to + // an old message is still floating in the network. This is + // especially unlikely since, we should really have restarted + // sprockets connections in this case. In any event, the race + // condition exists at the protocol level, and so we handle it. + if config.members.contains_key(ctx.platform_id()) { + let m = concat!( + "Received Expunged message for old epoch. ", + "We must have been re-added as a trust-quorum member." + ); + warn!( + self.log, + "{m}"; + "from" => %from, + "received_epoch" => %epoch, + "epoch" => %config.epoch + ); + } + return; + } else if epoch > config.epoch { + let m = concat!( + "Received Expunged message for newer epoch. ", + "Recording expungement in persistent state." + ); + warn!( + self.log, + "{m}"; + "from" => %from, + "received_epoch" => %epoch, + "epoch" => %config.epoch + ); + // Intentionally fall through + } else { + let m = concat!( + "Received Expunged message for latest known epoch. ", + "Recording expungement in persistent state." + ); + warn!( + self.log, + "{m}"; + "from" => %from, + "received_epoch" => %epoch, + "epoch" => %config.epoch + ); + // Intentionally fall through + } + + // Perform the actual expunge + ctx.update_persistent_state(|ps| { + ps.expunged = Some(ExpungedMetadata { epoch, from }); + true + }); + } else { + let m = concat!( + "Received Expunge message, but we have no configurations. ", + "We must have been factory reset already." + ); + error!( + self.log, + "{m}"; + "from" => %from, + "received_epoch" => %epoch + ); + } + } + fn handle_commit_advance( &mut self, ctx: &mut impl NodeHandlerCtx, @@ -469,7 +560,10 @@ impl Node { %latest_committed_config.epoch, "requested_epoch" => %epoch ); - // TODO: Send an expunged message + ctx.send( + from, + PeerMsgKind::Expunged(latest_committed_config.epoch), + ); return; } info!( @@ -499,7 +593,13 @@ impl Node { "from" => %from, "epoch" => %epoch ); - // TODO: Send an expunged message + // Technically, this node does not yet know that the + // configuration at `epoch` has been committed. However, + // requesting nodes only ask for key shares when they know that + // the configuration has been committed. Therefore, rather than + // introduce a new message such as `NotAMember`, we inform the + // requesting node that they have been expunged. + ctx.send(from, PeerMsgKind::Expunged(epoch)); return; } } diff --git a/trust-quorum/src/persistent_state.rs b/trust-quorum/src/persistent_state.rs index d2a9a09039..28435de15d 100644 --- a/trust-quorum/src/persistent_state.rs +++ b/trust-quorum/src/persistent_state.rs @@ -31,7 +31,7 @@ pub struct PersistentState { // Has the node been informed that it is no longer part of the trust quorum? // - // If at any time this gets set, than the it remains true for the lifetime + // If at any time this gets set, then the it remains true for the lifetime // of the node. The sled corresponding to the node must be factory reset by // wiping its storage. pub expunged: Option, @@ -62,11 +62,13 @@ impl PersistentState { self.lrtq.is_some() && self.latest_committed_epoch().is_none() } - // Are there any committed configurations or lrtq data? + /// Are there any committed configurations or lrtq data? pub fn is_uninitialized(&self) -> bool { self.lrtq.is_none() && self.latest_committed_epoch().is_none() } + /// The latest configuration that we know about, regardless of whether it + /// has been committed. pub fn latest_config(&self) -> Option<&Configuration> { self.configs.iter().last() } @@ -108,6 +110,11 @@ impl PersistentState { pub fn has_prepared(&self, epoch: Epoch) -> bool { self.configs.contains_key(&epoch) && self.shares.contains_key(&epoch) } + + /// Has this node been expunged? + pub fn is_expunged(&self) -> bool { + self.expunged.is_some() + } } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] diff --git a/trust-quorum/test-utils/src/state.rs b/trust-quorum/test-utils/src/state.rs index 35ae9f13e8..007408e7ae 100644 --- a/trust-quorum/test-utils/src/state.rs +++ b/trust-quorum/test-utils/src/state.rs @@ -10,7 +10,7 @@ use crate::nexus::{ use crate::{Event, member_universe}; use daft::{BTreeMapDiff, BTreeSetDiff, Diffable, Leaf}; use iddqd::IdOrdMap; -use slog::Logger; +use slog::{Logger, info}; use std::collections::{BTreeMap, BTreeSet}; use std::fmt::Display; use trust_quorum::{ @@ -280,6 +280,11 @@ impl TqState { NexusReply::AckedPreparesFromCoordinator { epoch, acks } => { if epoch == latest_config.epoch { latest_config.prepared_members.extend(acks); + + if latest_config.can_commit() { + drop(latest_config); + self.nexus_commit(); + } } } NexusReply::CommitAck { from, epoch } => { @@ -347,6 +352,47 @@ impl TqState { self.send_reconfigure_msg(); self.send_envelopes_from_coordinator(); } + + // Commit at nexus when preparing + fn nexus_commit(&mut self) { + let mut latest_config = self.nexus.latest_config_mut(); + info!( + self.log, + "nexus committed"; + "epoch" => %latest_config.epoch, + "coordinator" => %latest_config.coordinator + ); + + latest_config.op = NexusOp::Committed; + + let new_members = latest_config.members.clone(); + let new_epoch = latest_config.epoch; + + // Expunge any removed nodes from the last committed configuration + if let Some(last_committed_epoch) = latest_config.last_committed_epoch { + // Release our mutable borrow + drop(latest_config); + + let last_committed_config = self + .nexus + .configs + .get(&last_committed_epoch) + .expect("config exists"); + + let expunged = + last_committed_config.members.difference(&new_members).cloned(); + + for e in expunged { + info!( + self.log, + "expunged node"; + "epoch" => %new_epoch, + "platform_id" => %e + ); + self.expunged.insert(e); + } + } + } } /// Broken out of `TqState` to alleviate borrow checker woes @@ -528,7 +574,7 @@ fn display_nexus_state_diff( f: &mut std::fmt::Formatter<'_>, ) -> std::fmt::Result { if diff.configs.modified().count() != 0 { - writeln!(f, " nexus state changed:")?; + writeln!(f, "nexus state changed:")?; } // Nexus configs can only be added or modified diff --git a/trust-quorum/tests/cluster.rs b/trust-quorum/tests/cluster.rs index c4ddd620da..c514e861f2 100644 --- a/trust-quorum/tests/cluster.rs +++ b/trust-quorum/tests/cluster.rs @@ -309,7 +309,7 @@ impl TestState { for s in removed_nodes { // The same selection can be chosen more than once. so we // must add the extra check rather than shrinking the length - // of the `removed_nodes` iterator with `take`.; + // of the `removed_nodes` iterator with `take`. if nodes_to_remove.len() == max_nodes_to_remove { break; } @@ -398,6 +398,38 @@ impl TestState { self.invariant_nodes_have_committed_if_nexus_has_acks()?; self.invariant_nodes_not_coordinating_and_computing_key_share_simultaneously()?; self.invariant_no_alarms()?; + self.invariant_expunged_nodes_have_actually_been_expunged()?; + Ok(()) + } + + /// For all expunged nodes ensure that either: + /// * they know they are expunged + /// * have a latest committed configuration where they are still a member + /// * have no committed configurations + fn invariant_expunged_nodes_have_actually_been_expunged( + &self, + ) -> Result<(), TestCaseError> { + for id in &self.tq_state.expunged { + let (_, ctx) = + self.tq_state.sut.nodes.get(id).expect("node exists"); + let ps = ctx.persistent_state(); + if ps.is_expunged() { + continue; + } + if let Some(config) = ps.latest_committed_configuration() { + let nexus_config = self + .tq_state + .nexus + .configs + .get(&config.epoch) + .expect("config exists"); + prop_assert!(config.members.contains_key(ctx.platform_id())); + prop_assert!(nexus_config.members.contains(ctx.platform_id())); + } else { + continue; + } + } + Ok(()) } From d85c5d9de8ecf76c18fcbd47f6282a39406d4e59 Mon Sep 17 00:00:00 2001 From: "Andrew J. Stone" Date: Wed, 20 Aug 2025 23:11:14 +0000 Subject: [PATCH 2/2] Do not allow commits on expunged nodes --- trust-quorum/src/node.rs | 26 +++++++++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/trust-quorum/src/node.rs b/trust-quorum/src/node.rs index 77e7fd184a..7684222f9d 100644 --- a/trust-quorum/src/node.rs +++ b/trust-quorum/src/node.rs @@ -101,9 +101,10 @@ impl Node { }; if let Some(kcs) = &self.key_share_computer { - // We know from our `ValidatedReconfigureMsg` that we haven't seen a newer - // configuration and we have the correct last committed configuration. Therefore if we are computing a key share, - // we must be doing it for a stale commit and should cancel it. + // We know from our `ValidatedReconfigureMsg` that we haven't seen + // a newer configuration and we have the correct last committed + // configuration. Therefore if we are computing a key share, we must + // be doing it for a stale commit and should cancel it. // // I don't think it's actually possible to hit this condition, but // we check anyway. @@ -139,6 +140,19 @@ impl Node { { let ps = ctx.persistent_state(); + if let Some(expunged) = &ps.expunged { + error!( + self.log, + "Commit attempted on expunged node"; + "expunged_epoch" => %expunged.epoch, + "expunging_node" => %expunged.from + ); + return Err(CommitError::Expunged { + epoch: expunged.epoch, + from: expunged.from.clone(), + }); + } + // If we have a configuration the rack id must match the one from // Nexus if let Some(ps_rack_id) = ps.rack_id() { @@ -385,6 +399,10 @@ impl Node { ps.expunged = Some(ExpungedMetadata { epoch, from }); true }); + + // Stop coordinating and computing a key share + self.coordinator_state = None; + self.key_share_computer = None; } else { let m = concat!( "Received Expunge message, but we have no configurations. ", @@ -820,6 +838,8 @@ pub enum CommitError { ), #[error("cannot commit: not prepared for epoch {0}")] NotPrepared(Epoch), + #[error("cannot commit: expunged at epoch {epoch} by {from}")] + Expunged { epoch: Epoch, from: PlatformId }, } #[cfg(test)]