Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,11 @@ use crate::{AdapterError, AdapterNotice, ExecuteResponse};

mod builtin_table_updates;
mod config;
mod consistency;
mod error;
mod migrate;

pub mod builtin;
pub(crate) mod consistency;
mod inner;
pub mod storage;

Expand Down
1 change: 1 addition & 0 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ pub(crate) mod timestamp_selection;

mod appends;
mod command_handler;
mod consistency;
mod ddl;
mod indexes;
mod introspection;
Expand Down
82 changes: 82 additions & 0 deletions src/adapter/src/coord/consistency.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

//! Internal consistency checks that validate invariants of [`Coordinator`].

use super::Coordinator;
use crate::catalog::consistency::CatalogInconsistencies;
use mz_repr::GlobalId;
use serde::Serialize;

#[derive(Debug, Default, Serialize)]
pub struct CoordinatorInconsistencies {
/// Inconsistencies found in the catalog.
catalog_inconsistencies: CatalogInconsistencies,
/// Inconsistencies found in read capabilities.
read_capabilities: Vec<ReadCapabilitiesInconsistency>,
}

impl CoordinatorInconsistencies {
pub fn is_empty(&self) -> bool {
self.catalog_inconsistencies.is_empty() && self.read_capabilities.is_empty()
}
}

impl Coordinator {
/// Checks the [`Coordinator`] to make sure we're internally consistent.
pub fn check_consistency(&self) -> Result<(), CoordinatorInconsistencies> {
let mut inconsistencies = CoordinatorInconsistencies::default();

if let Err(catalog_inconsistencies) = self.catalog().state().check_consistency() {
inconsistencies.catalog_inconsistencies = catalog_inconsistencies;
}

if let Err(read_capabilities) = self.check_read_capabilities() {
inconsistencies.read_capabilities = read_capabilities;
}

if inconsistencies.is_empty() {
Ok(())
} else {
Err(inconsistencies)
}
}

/// # Invariants:
///
/// * Read capabilities should reference known objects.
///
fn check_read_capabilities(&self) -> Result<(), Vec<ReadCapabilitiesInconsistency>> {
let mut read_capabilities_inconsistencies = Vec::new();
for (gid, _) in &self.storage_read_capabilities {
if self.catalog().try_get_entry(gid).is_none() {
read_capabilities_inconsistencies
.push(ReadCapabilitiesInconsistency::Storage(gid.clone()));
}
}
for (gid, _) in &self.compute_read_capabilities {
if self.catalog().try_get_entry(gid).is_none() {
read_capabilities_inconsistencies
.push(ReadCapabilitiesInconsistency::Compute(gid.clone()));
}
}

if read_capabilities_inconsistencies.is_empty() {
Ok(())
} else {
Err(read_capabilities_inconsistencies)
}
}
}

#[derive(Debug, Serialize)]
enum ReadCapabilitiesInconsistency {
Storage(GlobalId),
Compute(GlobalId),
}
2 changes: 1 addition & 1 deletion src/adapter/src/coord/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ impl Coordinator {

// Note: It's important that we keep the function call inside macro, this way we only run
// the consistency checks if sort assertions are enabled.
mz_ore::soft_assert_eq!(self.catalog().check_consistency(), Ok(()));
mz_ore::soft_assert_eq!(self.check_consistency(), Ok(()));

Ok(result)
}
Expand Down