Skip to content

Add support for "Expunge" messages #8874

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: tqdb
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion trust-quorum/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ pub use node::{Node, NodeDiff};
// public only for docs.
pub use node_ctx::NodeHandlerCtx;
pub use node_ctx::{NodeCallerCtx, NodeCommonCtx, NodeCtx, NodeCtxDiff};
pub use persistent_state::{PersistentState, PersistentStateSummary};
pub use persistent_state::{
ExpungedMetadata, PersistentState, PersistentStateSummary,
};

#[derive(
Debug,
Expand Down
2 changes: 1 addition & 1 deletion trust-quorum/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ pub enum PeerMsgKind {
LrtqShare(LrtqShare),

/// Inform a node that it is no longer part of the trust quorum as of the
/// given epoch
/// given epoch, which the responder knows is commmitted.
Expunged(Epoch),

/// Inform a node that it is utilizing an old committed onfiguration and
Expand Down
144 changes: 132 additions & 12 deletions trust-quorum/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ use crate::validators::{
MismatchedRackIdError, ReconfigurationError, ValidatedReconfigureMsg,
};
use crate::{
Alarm, Configuration, CoordinatorState, Epoch, NodeHandlerCtx, PlatformId,
messages::*,
Alarm, Configuration, CoordinatorState, Epoch, ExpungedMetadata,
NodeHandlerCtx, PlatformId, messages::*,
};
use daft::{Diffable, Leaf};
use gfss::shamir::Share;
Expand Down Expand Up @@ -101,9 +101,10 @@ impl Node {
};

if let Some(kcs) = &self.key_share_computer {
// We know from our `ValidatedReconfigureMsg` that we haven't seen a newer
// configuration and we have the correct last committed configuration. Therefore if we are computing a key share,
// we must be doing it for a stale commit and should cancel it.
// We know from our `ValidatedReconfigureMsg` that we haven't seen
// a newer configuration and we have the correct last committed
// configuration. Therefore if we are computing a key share, we must
// be doing it for a stale commit and should cancel it.
//
// I don't think it's actually possible to hit this condition, but
// we check anyway.
Expand Down Expand Up @@ -139,6 +140,19 @@ impl Node {
{
let ps = ctx.persistent_state();

if let Some(expunged) = &ps.expunged {
error!(
self.log,
"Commit attempted on expunged node";
"expunged_epoch" => %expunged.epoch,
"expunging_node" => %expunged.from
);
return Err(CommitError::Expunged {
epoch: expunged.epoch,
from: expunged.from.clone(),
});
}

// If we have a configuration the rack id must match the one from
// Nexus
if let Some(ps_rack_id) = ps.rack_id() {
Expand Down Expand Up @@ -243,13 +257,26 @@ impl Node {
from: PlatformId,
msg: PeerMsg,
) {
if ctx.persistent_state().is_expunged() {
warn!(
self.log,
"Received message while expunged. Dropping.";
"from" => %from,
"msg" => msg.kind.name()
);
return;
}

if let Some(rack_id) = ctx.persistent_state().rack_id() {
if rack_id != msg.rack_id {
error!(self.log, "Mismatched rack id";
"from" => %from,
"msg" => msg.kind.name(),
"expected" => %rack_id,
"got" => %msg.rack_id);
error!(
self.log,
"Mismatched rack id";
"from" => %from,
"msg" => msg.kind.name(),
"expected" => %rack_id,
"got" => %msg.rack_id
);
return;
}
}
Expand All @@ -269,6 +296,9 @@ impl Node {
PeerMsgKind::CommitAdvance(config) => {
self.handle_commit_advance(ctx, from, config)
}
PeerMsgKind::Expunged(epoch) => {
self.handle_expunged(ctx, from, epoch);
}
_ => todo!(
"cannot handle message variant yet - not implemented: {msg:?}"
),
Expand Down Expand Up @@ -308,6 +338,85 @@ impl Node {
}
}

fn handle_expunged(
&mut self,
ctx: &mut impl NodeHandlerCtx,
from: PlatformId,
epoch: Epoch,
) {
if let Some(config) = ctx.persistent_state().latest_config() {
if epoch < config.epoch {
// It's possible, but unlikely, that we were expunged at `epoch`
// and later re-added to the trust-quorum, but the reply to
// an old message is still floating in the network. This is
// especially unlikely since, we should really have restarted
// sprockets connections in this case. In any event, the race
// condition exists at the protocol level, and so we handle it.
if config.members.contains_key(ctx.platform_id()) {
let m = concat!(
"Received Expunged message for old epoch. ",
"We must have been re-added as a trust-quorum member."
);
warn!(
self.log,
"{m}";
"from" => %from,
"received_epoch" => %epoch,
"epoch" => %config.epoch
);
}
return;
} else if epoch > config.epoch {
let m = concat!(
"Received Expunged message for newer epoch. ",
"Recording expungement in persistent state."
);
warn!(
self.log,
"{m}";
"from" => %from,
"received_epoch" => %epoch,
"epoch" => %config.epoch
);
// Intentionally fall through
} else {
let m = concat!(
"Received Expunged message for latest known epoch. ",
"Recording expungement in persistent state."
);
warn!(
self.log,
"{m}";
"from" => %from,
"received_epoch" => %epoch,
"epoch" => %config.epoch
);
// Intentionally fall through
}

// Perform the actual expunge
ctx.update_persistent_state(|ps| {
ps.expunged = Some(ExpungedMetadata { epoch, from });
true
});

// Stop coordinating and computing a key share
self.coordinator_state = None;
self.key_share_computer = None;
} else {
let m = concat!(
"Received Expunge message, but we have no configurations. ",
"We must have been factory reset already."
);
error!(
self.log,
"{m}";
"from" => %from,
"received_epoch" => %epoch
);
}
}

fn handle_commit_advance(
&mut self,
ctx: &mut impl NodeHandlerCtx,
Expand Down Expand Up @@ -469,7 +578,10 @@ impl Node {
%latest_committed_config.epoch,
"requested_epoch" => %epoch
);
// TODO: Send an expunged message
ctx.send(
from,
PeerMsgKind::Expunged(latest_committed_config.epoch),
);
return;
}
info!(
Expand Down Expand Up @@ -499,7 +611,13 @@ impl Node {
"from" => %from,
"epoch" => %epoch
);
// TODO: Send an expunged message
// Technically, this node does not yet know that the
// configuration at `epoch` has been committed. However,
// requesting nodes only ask for key shares when they know that
// the configuration has been committed. Therefore, rather than
// introduce a new message such as `NotAMember`, we inform the
// requesting node that they have been expunged.
ctx.send(from, PeerMsgKind::Expunged(epoch));
return;
}
}
Expand Down Expand Up @@ -720,6 +838,8 @@ pub enum CommitError {
),
#[error("cannot commit: not prepared for epoch {0}")]
NotPrepared(Epoch),
#[error("cannot commit: expunged at epoch {epoch} by {from}")]
Expunged { epoch: Epoch, from: PlatformId },
}

#[cfg(test)]
Expand Down
11 changes: 9 additions & 2 deletions trust-quorum/src/persistent_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub struct PersistentState {

// Has the node been informed that it is no longer part of the trust quorum?
//
// If at any time this gets set, than the it remains true for the lifetime
// If at any time this gets set, then the it remains true for the lifetime
// of the node. The sled corresponding to the node must be factory reset by
// wiping its storage.
pub expunged: Option<ExpungedMetadata>,
Expand Down Expand Up @@ -62,11 +62,13 @@ impl PersistentState {
self.lrtq.is_some() && self.latest_committed_epoch().is_none()
}

// Are there any committed configurations or lrtq data?
/// Are there any committed configurations or lrtq data?
pub fn is_uninitialized(&self) -> bool {
self.lrtq.is_none() && self.latest_committed_epoch().is_none()
}

/// The latest configuration that we know about, regardless of whether it
/// has been committed.
pub fn latest_config(&self) -> Option<&Configuration> {
self.configs.iter().last()
}
Expand Down Expand Up @@ -108,6 +110,11 @@ impl PersistentState {
pub fn has_prepared(&self, epoch: Epoch) -> bool {
self.configs.contains_key(&epoch) && self.shares.contains_key(&epoch)
}

/// Has this node been expunged?
pub fn is_expunged(&self) -> bool {
self.expunged.is_some()
}
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
Expand Down
50 changes: 48 additions & 2 deletions trust-quorum/test-utils/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::nexus::{
use crate::{Event, member_universe};
use daft::{BTreeMapDiff, BTreeSetDiff, Diffable, Leaf};
use iddqd::IdOrdMap;
use slog::Logger;
use slog::{Logger, info};
use std::collections::{BTreeMap, BTreeSet};
use std::fmt::Display;
use trust_quorum::{
Expand Down Expand Up @@ -280,6 +280,11 @@ impl TqState {
NexusReply::AckedPreparesFromCoordinator { epoch, acks } => {
if epoch == latest_config.epoch {
latest_config.prepared_members.extend(acks);

if latest_config.can_commit() {
drop(latest_config);
self.nexus_commit();
}
}
}
NexusReply::CommitAck { from, epoch } => {
Expand Down Expand Up @@ -347,6 +352,47 @@ impl TqState {
self.send_reconfigure_msg();
self.send_envelopes_from_coordinator();
}

// Commit at nexus when preparing
fn nexus_commit(&mut self) {
let mut latest_config = self.nexus.latest_config_mut();
info!(
self.log,
"nexus committed";
"epoch" => %latest_config.epoch,
"coordinator" => %latest_config.coordinator
);

latest_config.op = NexusOp::Committed;

let new_members = latest_config.members.clone();
let new_epoch = latest_config.epoch;

// Expunge any removed nodes from the last committed configuration
if let Some(last_committed_epoch) = latest_config.last_committed_epoch {
// Release our mutable borrow
drop(latest_config);

let last_committed_config = self
.nexus
.configs
.get(&last_committed_epoch)
.expect("config exists");

let expunged =
last_committed_config.members.difference(&new_members).cloned();

for e in expunged {
info!(
self.log,
"expunged node";
"epoch" => %new_epoch,
"platform_id" => %e
);
self.expunged.insert(e);
}
}
}
}

/// Broken out of `TqState` to alleviate borrow checker woes
Expand Down Expand Up @@ -528,7 +574,7 @@ fn display_nexus_state_diff(
f: &mut std::fmt::Formatter<'_>,
) -> std::fmt::Result {
if diff.configs.modified().count() != 0 {
writeln!(f, " nexus state changed:")?;
writeln!(f, "nexus state changed:")?;
}

// Nexus configs can only be added or modified
Expand Down
34 changes: 33 additions & 1 deletion trust-quorum/tests/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ impl TestState {
for s in removed_nodes {
// The same selection can be chosen more than once. so we
// must add the extra check rather than shrinking the length
// of the `removed_nodes` iterator with `take`.;
// of the `removed_nodes` iterator with `take`.
if nodes_to_remove.len() == max_nodes_to_remove {
break;
}
Expand Down Expand Up @@ -398,6 +398,38 @@ impl TestState {
self.invariant_nodes_have_committed_if_nexus_has_acks()?;
self.invariant_nodes_not_coordinating_and_computing_key_share_simultaneously()?;
self.invariant_no_alarms()?;
self.invariant_expunged_nodes_have_actually_been_expunged()?;
Ok(())
}

/// For all expunged nodes ensure that either:
/// * they know they are expunged
/// * have a latest committed configuration where they are still a member
/// * have no committed configurations
fn invariant_expunged_nodes_have_actually_been_expunged(
&self,
) -> Result<(), TestCaseError> {
for id in &self.tq_state.expunged {
let (_, ctx) =
self.tq_state.sut.nodes.get(id).expect("node exists");
let ps = ctx.persistent_state();
if ps.is_expunged() {
continue;
}
if let Some(config) = ps.latest_committed_configuration() {
let nexus_config = self
.tq_state
.nexus
.configs
.get(&config.epoch)
.expect("config exists");
prop_assert!(config.members.contains_key(ctx.platform_id()));
prop_assert!(nexus_config.members.contains(ctx.platform_id()));
} else {
continue;
}
}

Ok(())
}

Expand Down
Loading