Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion misc/python/materialize/cloudtest/k8s/environmentd.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,14 @@ def __init__(self, namespace: str = DEFAULT_K8S_NAMESPACE) -> None:
service_port = V1ServicePort(name="sql", port=6875)
http_port = V1ServicePort(name="http", port=6876)
internal_port = V1ServicePort(name="internal", port=6877)
internal_http_port = V1ServicePort(name="internalhttp", port=6878)
self.service = V1Service(
api_version="v1",
kind="Service",
metadata=V1ObjectMeta(name="environmentd", labels={"app": "environmentd"}),
spec=V1ServiceSpec(
type="NodePort",
ports=[service_port, internal_port, http_port],
ports=[service_port, internal_port, http_port, internal_http_port],
selector={"app": "environmentd"},
),
)
Expand Down Expand Up @@ -184,6 +185,7 @@ def args(self) -> list[str]:
f"--adapter-stash-url=postgres://root@cockroach.{self.cockroach_namespace}:26257?options=--search_path=adapter",
f"--storage-stash-url=postgres://root@cockroach.{self.cockroach_namespace}:26257?options=--search_path=storage",
"--internal-sql-listen-addr=0.0.0.0:6877",
"--internal-http-listen-addr=0.0.0.0:6878",
"--unsafe-mode",
# cloudtest may be called upon to spin up older versions of
# Materialize too! If you are adding a command-line option that is
Expand Down
2 changes: 1 addition & 1 deletion src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ use crate::{AdapterError, AdapterNotice, ExecuteResponse};

mod builtin_table_updates;
mod config;
mod consistency;
pub(crate) mod consistency;
mod error;
mod migrate;

Expand Down
10 changes: 5 additions & 5 deletions src/adapter/src/catalog/consistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use serde::Serialize;

use super::CatalogState;

#[derive(Debug, Default, Serialize)]
#[derive(Debug, Default, Clone, Serialize, PartialEq)]
pub struct CatalogInconsistencies {
/// Inconsistencies found with internal fields, if any.
internal_fields: Vec<InternalFieldsInconsistency>,
Expand Down Expand Up @@ -347,7 +347,7 @@ impl CatalogState {
}
}

#[derive(Debug, Serialize)]
#[derive(Debug, Serialize, Clone, PartialEq)]
enum InternalFieldsInconsistency {
Database(String, DatabaseId),
AmbientSchema(String, SchemaId),
Expand All @@ -356,7 +356,7 @@ enum InternalFieldsInconsistency {
Role(String, RoleId),
}

#[derive(Debug, Serialize)]
#[derive(Debug, Serialize, Clone, PartialEq)]
enum RoleInconsistency {
Database(DatabaseId, RoleId),
Schema(SchemaId, RoleId),
Expand All @@ -374,15 +374,15 @@ enum RoleInconsistency {
},
}

#[derive(Debug, Serialize)]
#[derive(Debug, Serialize, Clone, PartialEq)]
enum CommentInconsistency {
/// A comment was found for an object that no longer exists.
Dangling(CommentObjectId),
/// A comment with a column position was found on a non-relation.
NonRelation(CommentObjectId, usize),
}

#[derive(Debug, Serialize)]
#[derive(Debug, Serialize, Clone, PartialEq)]
enum ObjectDependencyInconsistency {
/// Object A uses Object B, but Object B does not exist.
MissingUses {
Expand Down
18 changes: 17 additions & 1 deletion src/adapter/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,21 @@ impl SessionClient {
catalog.check_consistency()
}

/// Checks the coordinator for internal consistency, returning a JSON object describing the
/// inconsistencies, if there are any. This is a superset of checks that check_catalog performs,
///
/// No authorization is performed, so access to this function must be limited to internal
/// servers or superusers.
pub async fn check_coordinator(&mut self) -> Result<(), serde_json::Value> {
self.send_without_session(|tx| Command::CheckConsistency { tx })
.await
.map_err(|inconsistencies| {
serde_json::to_value(inconsistencies).unwrap_or_else(|_| {
serde_json::Value::String("failed to serialize inconsistencies".to_string())
})
})
}

/// Tells the coordinator a statement has finished execution, in the cases
/// where we have no other reason to communicate with the coordinator.
pub fn retire_execute(
Expand Down Expand Up @@ -727,7 +742,8 @@ impl SessionClient {
| Command::GetSystemVars { .. }
| Command::SetSystemVars { .. }
| Command::Terminate { .. }
| Command::RetireExecute { .. } => {}
| Command::RetireExecute { .. }
| Command::CheckConsistency { .. } => {}
};
cmd
});
Expand Down
11 changes: 9 additions & 2 deletions src/adapter/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use uuid::Uuid;

use crate::catalog::Catalog;
use crate::client::{ConnectionId, ConnectionIdType};
use crate::coord::consistency::CoordinatorInconsistencies;
use crate::coord::peek::PeekResponseUnary;
use crate::coord::ExecuteContextExtra;
use crate::error::AdapterError;
Expand Down Expand Up @@ -137,6 +138,10 @@ pub enum Command {
data: ExecuteContextExtra,
reason: StatementEndedExecutionReason,
},

CheckConsistency {
tx: oneshot::Sender<Result<(), CoordinatorInconsistencies>>,
},
}

impl Command {
Expand All @@ -151,7 +156,8 @@ impl Command {
| Command::Terminate { .. }
| Command::GetSystemVars { .. }
| Command::SetSystemVars { .. }
| Command::RetireExecute { .. } => None,
| Command::RetireExecute { .. }
| Command::CheckConsistency { .. } => None,
}
}

Expand All @@ -166,7 +172,8 @@ impl Command {
| Command::Terminate { .. }
| Command::GetSystemVars { .. }
| Command::SetSystemVars { .. }
| Command::RetireExecute { .. } => None,
| Command::RetireExecute { .. }
| Command::CheckConsistency { .. } => None,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ pub(crate) mod timestamp_selection;

mod appends;
mod command_handler;
pub mod consistency;
mod ddl;
mod indexes;
mod introspection;
Expand Down
4 changes: 4 additions & 0 deletions src/adapter/src/coord/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,10 @@ impl Coordinator {
catalog: self.owned_catalog(),
});
}

Command::CheckConsistency { tx } => {
let _ = tx.send(self.check_consistency());
}
}
}

Expand Down
82 changes: 82 additions & 0 deletions src/adapter/src/coord/consistency.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

//! Internal consistency checks that validate invariants of [`Coordinator`].

use super::Coordinator;
use crate::catalog::consistency::CatalogInconsistencies;
use mz_repr::GlobalId;
use serde::Serialize;

#[derive(Debug, Default, Serialize, PartialEq)]
pub struct CoordinatorInconsistencies {
/// Inconsistencies found in the catalog.
catalog_inconsistencies: CatalogInconsistencies,
/// Inconsistencies found in read capabilities.
read_capabilities: Vec<ReadCapabilitiesInconsistency>,
}

impl CoordinatorInconsistencies {
pub fn is_empty(&self) -> bool {
self.catalog_inconsistencies.is_empty() && self.read_capabilities.is_empty()
}
}

impl Coordinator {
/// Checks the [`Coordinator`] to make sure we're internally consistent.
pub fn check_consistency(&self) -> Result<(), CoordinatorInconsistencies> {
let mut inconsistencies = CoordinatorInconsistencies::default();

if let Err(catalog_inconsistencies) = self.catalog().state().check_consistency() {
inconsistencies.catalog_inconsistencies = catalog_inconsistencies;
}

if let Err(read_capabilities) = self.check_read_capabilities() {
inconsistencies.read_capabilities = read_capabilities;
}

if inconsistencies.is_empty() {
Ok(())
} else {
Err(inconsistencies)
}
}

/// # Invariants:
///
/// * Read capabilities should reference known objects.
///
fn check_read_capabilities(&self) -> Result<(), Vec<ReadCapabilitiesInconsistency>> {
let mut read_capabilities_inconsistencies = Vec::new();
for (gid, _) in &self.storage_read_capabilities {
if self.catalog().try_get_entry(gid).is_none() {
read_capabilities_inconsistencies
.push(ReadCapabilitiesInconsistency::Storage(gid.clone()));
}
}
for (gid, _) in &self.compute_read_capabilities {
if !gid.is_transient() && self.catalog().try_get_entry(gid).is_none() {
read_capabilities_inconsistencies
.push(ReadCapabilitiesInconsistency::Compute(gid.clone()));
}
}

if read_capabilities_inconsistencies.is_empty() {
Ok(())
} else {
Err(read_capabilities_inconsistencies)
}
}
}

#[derive(Debug, Serialize, PartialEq, Eq)]
enum ReadCapabilitiesInconsistency {
Storage(GlobalId),
Compute(GlobalId),
}
15 changes: 14 additions & 1 deletion src/adapter/src/coord/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ impl Coordinator {
let mut update_metrics_retention = false;
let mut update_secrets_caching_config = false;
let mut update_cluster_scheduling_config = false;
let mut log_indexes_to_drop = Vec::new();

for op in &ops {
match op {
Expand Down Expand Up @@ -198,6 +199,13 @@ impl Coordinator {
}
catalog::Op::DropObject(ObjectId::Cluster(id)) => {
clusters_to_drop.push(*id);
log_indexes_to_drop.extend(
self.catalog()
.get_cluster(*id)
.log_indexes
.values()
.cloned(),
);
}
catalog::Op::DropObject(ObjectId::ClusterReplica((cluster_id, replica_id))) => {
// Drop the cluster replica itself.
Expand Down Expand Up @@ -440,6 +448,11 @@ impl Coordinator {
self.drop_replica(cluster_id, replica_id).await;
}
}
if !log_indexes_to_drop.is_empty() {
for id in log_indexes_to_drop {
self.drop_compute_read_policy(&id);
}
}
if !clusters_to_drop.is_empty() {
for cluster_id in clusters_to_drop {
self.controller.drop_cluster(cluster_id);
Expand Down Expand Up @@ -517,7 +530,7 @@ impl Coordinator {

// Note: It's important that we keep the function call inside macro, this way we only run
// the consistency checks if sort assertions are enabled.
mz_ore::soft_assert_eq!(self.catalog().check_consistency(), Ok(()));
mz_ore::soft_assert_eq!(self.check_consistency(), Ok(()));

Ok(result)
}
Expand Down
4 changes: 4 additions & 0 deletions src/environmentd/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,10 @@ impl InternalHttpServer {
"/api/catalog/check",
routing::get(catalog::handle_catalog_check),
)
.route(
"/api/coordinator/check",
routing::get(catalog::handle_coordinator_check),
)
.route(
"/api/internal-console",
routing::get(|| async move {
Expand Down
8 changes: 8 additions & 0 deletions src/environmentd/src/http/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,11 @@ pub async fn handle_catalog_check(mut client: AuthedClient) -> impl IntoResponse
};
(TypedHeader(ContentType::json()), response.to_string())
}

pub async fn handle_coordinator_check(mut client: AuthedClient) -> impl IntoResponse {
let response = match client.client.check_coordinator().await {
Ok(_) => serde_json::Value::String("".to_string()),
Err(inconsistencies) => serde_json::json!({ "err": inconsistencies }),
};
(TypedHeader(ContentType::json()), response.to_string())
}
46 changes: 38 additions & 8 deletions src/testdrive/src/action/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ use std::ascii;
use std::error::Error;
use std::fmt::{self, Display, Formatter, Write as _};
use std::io::{self, Write};
use std::time::SystemTime;
use std::time::{Duration, SystemTime};

use anyhow::{anyhow, bail, Context};
use http::StatusCode;
use md5::{Digest, Md5};
use mz_ore::collections::CollectionExt;
use mz_ore::retry::Retry;
Expand Down Expand Up @@ -136,18 +137,47 @@ pub async fn run_sql(mut cmd: SqlCommand, state: &mut State) -> Result<ControlFl
| Statement::GrantRole { .. }
| Statement::RevokePrivileges { .. }
| Statement::RevokeRole { .. } => {
let response = Retry::default()
.max_duration(Duration::from_secs(3))
.clamp_backoff(Duration::from_millis(500))
.retry_async(|_| async {
reqwest::get(&format!(
"http://{}/api/coordinator/check",
state.materialize_internal_http_addr,
))
.await
.context("while getting response from coordinator check")
})
.await?;
if response.status() == StatusCode::NOT_FOUND {
tracing::info!(
"not performing coordinator check because the endpoint doesn't exist"
);
} else {
// 404 can happen if we're testing an older version of environmentd
let inconsistencies = response
.error_for_status()
.context("response from coordinator check returned an error")?
.text()
.await
.context("while getting text from coordinator check")?;
let inconsistencies: serde_json::Value = serde_json::from_str(&inconsistencies)
.with_context(|| {
format!(
"while parsing result from consistency check: {:?}",
inconsistencies
)
})?;
if inconsistencies != serde_json::json!("") {
bail!("Internal catalog inconsistencies {inconsistencies:#?}");
}
}

let catalog_state = state
.with_catalog_copy(|catalog| catalog.state().clone())
.await
.map_err(|e| anyhow!("failed to read on-disk catalog state: {e}"))?;

// Run internal consistency checks.
if let Some(state) = &catalog_state {
if let Err(inconsistencies) = state.check_consistency() {
bail!("Internal catalog inconsistencies {inconsistencies:#?}");
}
}

// Check that our on-disk state matches the in-memory state.
let disk_state =
catalog_state.map(|state| state.dump().expect("state must be dumpable"));
Expand Down