diff --git a/trust-quorum/src/lib.rs b/trust-quorum/src/lib.rs index aed8a518b9..bd0536f10d 100644 --- a/trust-quorum/src/lib.rs +++ b/trust-quorum/src/lib.rs @@ -39,7 +39,9 @@ pub use node::{Node, NodeDiff}; // public only for docs. pub use node_ctx::NodeHandlerCtx; pub use node_ctx::{NodeCallerCtx, NodeCommonCtx, NodeCtx, NodeCtxDiff}; -pub use persistent_state::{PersistentState, PersistentStateSummary}; +pub use persistent_state::{ + ExpungedMetadata, PersistentState, PersistentStateSummary, +}; #[derive( Debug, diff --git a/trust-quorum/src/messages.rs b/trust-quorum/src/messages.rs index 3167cba500..c373a4350d 100644 --- a/trust-quorum/src/messages.rs +++ b/trust-quorum/src/messages.rs @@ -65,7 +65,7 @@ pub enum PeerMsgKind { LrtqShare(LrtqShare), /// Inform a node that it is no longer part of the trust quorum as of the - /// given epoch + /// given epoch, which the responder knows is commmitted. Expunged(Epoch), /// Inform a node that it is utilizing an old committed onfiguration and diff --git a/trust-quorum/src/node.rs b/trust-quorum/src/node.rs index 16503dbef8..7684222f9d 100644 --- a/trust-quorum/src/node.rs +++ b/trust-quorum/src/node.rs @@ -20,8 +20,8 @@ use crate::validators::{ MismatchedRackIdError, ReconfigurationError, ValidatedReconfigureMsg, }; use crate::{ - Alarm, Configuration, CoordinatorState, Epoch, NodeHandlerCtx, PlatformId, - messages::*, + Alarm, Configuration, CoordinatorState, Epoch, ExpungedMetadata, + NodeHandlerCtx, PlatformId, messages::*, }; use daft::{Diffable, Leaf}; use gfss::shamir::Share; @@ -101,9 +101,10 @@ impl Node { }; if let Some(kcs) = &self.key_share_computer { - // We know from our `ValidatedReconfigureMsg` that we haven't seen a newer - // configuration and we have the correct last committed configuration. Therefore if we are computing a key share, - // we must be doing it for a stale commit and should cancel it. + // We know from our `ValidatedReconfigureMsg` that we haven't seen + // a newer configuration and we have the correct last committed + // configuration. Therefore if we are computing a key share, we must + // be doing it for a stale commit and should cancel it. // // I don't think it's actually possible to hit this condition, but // we check anyway. @@ -139,6 +140,19 @@ impl Node { { let ps = ctx.persistent_state(); + if let Some(expunged) = &ps.expunged { + error!( + self.log, + "Commit attempted on expunged node"; + "expunged_epoch" => %expunged.epoch, + "expunging_node" => %expunged.from + ); + return Err(CommitError::Expunged { + epoch: expunged.epoch, + from: expunged.from.clone(), + }); + } + // If we have a configuration the rack id must match the one from // Nexus if let Some(ps_rack_id) = ps.rack_id() { @@ -243,13 +257,26 @@ impl Node { from: PlatformId, msg: PeerMsg, ) { + if ctx.persistent_state().is_expunged() { + warn!( + self.log, + "Received message while expunged. Dropping."; + "from" => %from, + "msg" => msg.kind.name() + ); + return; + } + if let Some(rack_id) = ctx.persistent_state().rack_id() { if rack_id != msg.rack_id { - error!(self.log, "Mismatched rack id"; - "from" => %from, - "msg" => msg.kind.name(), - "expected" => %rack_id, - "got" => %msg.rack_id); + error!( + self.log, + "Mismatched rack id"; + "from" => %from, + "msg" => msg.kind.name(), + "expected" => %rack_id, + "got" => %msg.rack_id + ); return; } } @@ -269,6 +296,9 @@ impl Node { PeerMsgKind::CommitAdvance(config) => { self.handle_commit_advance(ctx, from, config) } + PeerMsgKind::Expunged(epoch) => { + self.handle_expunged(ctx, from, epoch); + } _ => todo!( "cannot handle message variant yet - not implemented: {msg:?}" ), @@ -308,6 +338,85 @@ impl Node { } } + fn handle_expunged( + &mut self, + ctx: &mut impl NodeHandlerCtx, + from: PlatformId, + epoch: Epoch, + ) { + if let Some(config) = ctx.persistent_state().latest_config() { + if epoch < config.epoch { + // It's possible, but unlikely, that we were expunged at `epoch` + // and later re-added to the trust-quorum, but the reply to + // an old message is still floating in the network. This is + // especially unlikely since, we should really have restarted + // sprockets connections in this case. In any event, the race + // condition exists at the protocol level, and so we handle it. + if config.members.contains_key(ctx.platform_id()) { + let m = concat!( + "Received Expunged message for old epoch. ", + "We must have been re-added as a trust-quorum member." + ); + warn!( + self.log, + "{m}"; + "from" => %from, + "received_epoch" => %epoch, + "epoch" => %config.epoch + ); + } + return; + } else if epoch > config.epoch { + let m = concat!( + "Received Expunged message for newer epoch. ", + "Recording expungement in persistent state." + ); + warn!( + self.log, + "{m}"; + "from" => %from, + "received_epoch" => %epoch, + "epoch" => %config.epoch + ); + // Intentionally fall through + } else { + let m = concat!( + "Received Expunged message for latest known epoch. ", + "Recording expungement in persistent state." + ); + warn!( + self.log, + "{m}"; + "from" => %from, + "received_epoch" => %epoch, + "epoch" => %config.epoch + ); + // Intentionally fall through + } + + // Perform the actual expunge + ctx.update_persistent_state(|ps| { + ps.expunged = Some(ExpungedMetadata { epoch, from }); + true + }); + + // Stop coordinating and computing a key share + self.coordinator_state = None; + self.key_share_computer = None; + } else { + let m = concat!( + "Received Expunge message, but we have no configurations. ", + "We must have been factory reset already." + ); + error!( + self.log, + "{m}"; + "from" => %from, + "received_epoch" => %epoch + ); + } + } + fn handle_commit_advance( &mut self, ctx: &mut impl NodeHandlerCtx, @@ -469,7 +578,10 @@ impl Node { %latest_committed_config.epoch, "requested_epoch" => %epoch ); - // TODO: Send an expunged message + ctx.send( + from, + PeerMsgKind::Expunged(latest_committed_config.epoch), + ); return; } info!( @@ -499,7 +611,13 @@ impl Node { "from" => %from, "epoch" => %epoch ); - // TODO: Send an expunged message + // Technically, this node does not yet know that the + // configuration at `epoch` has been committed. However, + // requesting nodes only ask for key shares when they know that + // the configuration has been committed. Therefore, rather than + // introduce a new message such as `NotAMember`, we inform the + // requesting node that they have been expunged. + ctx.send(from, PeerMsgKind::Expunged(epoch)); return; } } @@ -720,6 +838,8 @@ pub enum CommitError { ), #[error("cannot commit: not prepared for epoch {0}")] NotPrepared(Epoch), + #[error("cannot commit: expunged at epoch {epoch} by {from}")] + Expunged { epoch: Epoch, from: PlatformId }, } #[cfg(test)] diff --git a/trust-quorum/src/persistent_state.rs b/trust-quorum/src/persistent_state.rs index d2a9a09039..28435de15d 100644 --- a/trust-quorum/src/persistent_state.rs +++ b/trust-quorum/src/persistent_state.rs @@ -31,7 +31,7 @@ pub struct PersistentState { // Has the node been informed that it is no longer part of the trust quorum? // - // If at any time this gets set, than the it remains true for the lifetime + // If at any time this gets set, then the it remains true for the lifetime // of the node. The sled corresponding to the node must be factory reset by // wiping its storage. pub expunged: Option, @@ -62,11 +62,13 @@ impl PersistentState { self.lrtq.is_some() && self.latest_committed_epoch().is_none() } - // Are there any committed configurations or lrtq data? + /// Are there any committed configurations or lrtq data? pub fn is_uninitialized(&self) -> bool { self.lrtq.is_none() && self.latest_committed_epoch().is_none() } + /// The latest configuration that we know about, regardless of whether it + /// has been committed. pub fn latest_config(&self) -> Option<&Configuration> { self.configs.iter().last() } @@ -108,6 +110,11 @@ impl PersistentState { pub fn has_prepared(&self, epoch: Epoch) -> bool { self.configs.contains_key(&epoch) && self.shares.contains_key(&epoch) } + + /// Has this node been expunged? + pub fn is_expunged(&self) -> bool { + self.expunged.is_some() + } } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] diff --git a/trust-quorum/test-utils/src/state.rs b/trust-quorum/test-utils/src/state.rs index 35ae9f13e8..007408e7ae 100644 --- a/trust-quorum/test-utils/src/state.rs +++ b/trust-quorum/test-utils/src/state.rs @@ -10,7 +10,7 @@ use crate::nexus::{ use crate::{Event, member_universe}; use daft::{BTreeMapDiff, BTreeSetDiff, Diffable, Leaf}; use iddqd::IdOrdMap; -use slog::Logger; +use slog::{Logger, info}; use std::collections::{BTreeMap, BTreeSet}; use std::fmt::Display; use trust_quorum::{ @@ -280,6 +280,11 @@ impl TqState { NexusReply::AckedPreparesFromCoordinator { epoch, acks } => { if epoch == latest_config.epoch { latest_config.prepared_members.extend(acks); + + if latest_config.can_commit() { + drop(latest_config); + self.nexus_commit(); + } } } NexusReply::CommitAck { from, epoch } => { @@ -347,6 +352,47 @@ impl TqState { self.send_reconfigure_msg(); self.send_envelopes_from_coordinator(); } + + // Commit at nexus when preparing + fn nexus_commit(&mut self) { + let mut latest_config = self.nexus.latest_config_mut(); + info!( + self.log, + "nexus committed"; + "epoch" => %latest_config.epoch, + "coordinator" => %latest_config.coordinator + ); + + latest_config.op = NexusOp::Committed; + + let new_members = latest_config.members.clone(); + let new_epoch = latest_config.epoch; + + // Expunge any removed nodes from the last committed configuration + if let Some(last_committed_epoch) = latest_config.last_committed_epoch { + // Release our mutable borrow + drop(latest_config); + + let last_committed_config = self + .nexus + .configs + .get(&last_committed_epoch) + .expect("config exists"); + + let expunged = + last_committed_config.members.difference(&new_members).cloned(); + + for e in expunged { + info!( + self.log, + "expunged node"; + "epoch" => %new_epoch, + "platform_id" => %e + ); + self.expunged.insert(e); + } + } + } } /// Broken out of `TqState` to alleviate borrow checker woes @@ -528,7 +574,7 @@ fn display_nexus_state_diff( f: &mut std::fmt::Formatter<'_>, ) -> std::fmt::Result { if diff.configs.modified().count() != 0 { - writeln!(f, " nexus state changed:")?; + writeln!(f, "nexus state changed:")?; } // Nexus configs can only be added or modified diff --git a/trust-quorum/tests/cluster.rs b/trust-quorum/tests/cluster.rs index c4ddd620da..c514e861f2 100644 --- a/trust-quorum/tests/cluster.rs +++ b/trust-quorum/tests/cluster.rs @@ -309,7 +309,7 @@ impl TestState { for s in removed_nodes { // The same selection can be chosen more than once. so we // must add the extra check rather than shrinking the length - // of the `removed_nodes` iterator with `take`.; + // of the `removed_nodes` iterator with `take`. if nodes_to_remove.len() == max_nodes_to_remove { break; } @@ -398,6 +398,38 @@ impl TestState { self.invariant_nodes_have_committed_if_nexus_has_acks()?; self.invariant_nodes_not_coordinating_and_computing_key_share_simultaneously()?; self.invariant_no_alarms()?; + self.invariant_expunged_nodes_have_actually_been_expunged()?; + Ok(()) + } + + /// For all expunged nodes ensure that either: + /// * they know they are expunged + /// * have a latest committed configuration where they are still a member + /// * have no committed configurations + fn invariant_expunged_nodes_have_actually_been_expunged( + &self, + ) -> Result<(), TestCaseError> { + for id in &self.tq_state.expunged { + let (_, ctx) = + self.tq_state.sut.nodes.get(id).expect("node exists"); + let ps = ctx.persistent_state(); + if ps.is_expunged() { + continue; + } + if let Some(config) = ps.latest_committed_configuration() { + let nexus_config = self + .tq_state + .nexus + .configs + .get(&config.epoch) + .expect("config exists"); + prop_assert!(config.members.contains_key(ctx.platform_id())); + prop_assert!(nexus_config.members.contains(ctx.platform_id())); + } else { + continue; + } + } + Ok(()) }