diff --git a/misc/python/materialize/cloudtest/k8s/environmentd.py b/misc/python/materialize/cloudtest/k8s/environmentd.py index 9770797f49d34..7fc700c096f7e 100644 --- a/misc/python/materialize/cloudtest/k8s/environmentd.py +++ b/misc/python/materialize/cloudtest/k8s/environmentd.py @@ -50,13 +50,14 @@ def __init__(self, namespace: str = DEFAULT_K8S_NAMESPACE) -> None: service_port = V1ServicePort(name="sql", port=6875) http_port = V1ServicePort(name="http", port=6876) internal_port = V1ServicePort(name="internal", port=6877) + internal_http_port = V1ServicePort(name="internalhttp", port=6878) self.service = V1Service( api_version="v1", kind="Service", metadata=V1ObjectMeta(name="environmentd", labels={"app": "environmentd"}), spec=V1ServiceSpec( type="NodePort", - ports=[service_port, internal_port, http_port], + ports=[service_port, internal_port, http_port, internal_http_port], selector={"app": "environmentd"}, ), ) @@ -184,6 +185,7 @@ def args(self) -> list[str]: f"--adapter-stash-url=postgres://root@cockroach.{self.cockroach_namespace}:26257?options=--search_path=adapter", f"--storage-stash-url=postgres://root@cockroach.{self.cockroach_namespace}:26257?options=--search_path=storage", "--internal-sql-listen-addr=0.0.0.0:6877", + "--internal-http-listen-addr=0.0.0.0:6878", "--unsafe-mode", # cloudtest may be called upon to spin up older versions of # Materialize too! If you are adding a command-line option that is diff --git a/src/adapter/src/catalog.rs b/src/adapter/src/catalog.rs index 1836919b60774..6caa251074e55 100644 --- a/src/adapter/src/catalog.rs +++ b/src/adapter/src/catalog.rs @@ -117,7 +117,7 @@ use crate::{AdapterError, AdapterNotice, ExecuteResponse}; mod builtin_table_updates; mod config; -mod consistency; +pub(crate) mod consistency; mod error; mod migrate; diff --git a/src/adapter/src/catalog/consistency.rs b/src/adapter/src/catalog/consistency.rs index ac9bc75c6998e..dc3dc9e9f8bbb 100644 --- a/src/adapter/src/catalog/consistency.rs +++ b/src/adapter/src/catalog/consistency.rs @@ -23,7 +23,7 @@ use serde::Serialize; use super::CatalogState; -#[derive(Debug, Default, Serialize)] +#[derive(Debug, Default, Clone, Serialize, PartialEq)] pub struct CatalogInconsistencies { /// Inconsistencies found with internal fields, if any. internal_fields: Vec, @@ -347,7 +347,7 @@ impl CatalogState { } } -#[derive(Debug, Serialize)] +#[derive(Debug, Serialize, Clone, PartialEq)] enum InternalFieldsInconsistency { Database(String, DatabaseId), AmbientSchema(String, SchemaId), @@ -356,7 +356,7 @@ enum InternalFieldsInconsistency { Role(String, RoleId), } -#[derive(Debug, Serialize)] +#[derive(Debug, Serialize, Clone, PartialEq)] enum RoleInconsistency { Database(DatabaseId, RoleId), Schema(SchemaId, RoleId), @@ -374,7 +374,7 @@ enum RoleInconsistency { }, } -#[derive(Debug, Serialize)] +#[derive(Debug, Serialize, Clone, PartialEq)] enum CommentInconsistency { /// A comment was found for an object that no longer exists. Dangling(CommentObjectId), @@ -382,7 +382,7 @@ enum CommentInconsistency { NonRelation(CommentObjectId, usize), } -#[derive(Debug, Serialize)] +#[derive(Debug, Serialize, Clone, PartialEq)] enum ObjectDependencyInconsistency { /// Object A uses Object B, but Object B does not exist. MissingUses { diff --git a/src/adapter/src/client.rs b/src/adapter/src/client.rs index ba50f2a9fec56..91023d9c473a4 100644 --- a/src/adapter/src/client.rs +++ b/src/adapter/src/client.rs @@ -590,6 +590,21 @@ impl SessionClient { catalog.check_consistency() } + /// Checks the coordinator for internal consistency, returning a JSON object describing the + /// inconsistencies, if there are any. This is a superset of checks that check_catalog performs, + /// + /// No authorization is performed, so access to this function must be limited to internal + /// servers or superusers. + pub async fn check_coordinator(&mut self) -> Result<(), serde_json::Value> { + self.send_without_session(|tx| Command::CheckConsistency { tx }) + .await + .map_err(|inconsistencies| { + serde_json::to_value(inconsistencies).unwrap_or_else(|_| { + serde_json::Value::String("failed to serialize inconsistencies".to_string()) + }) + }) + } + /// Tells the coordinator a statement has finished execution, in the cases /// where we have no other reason to communicate with the coordinator. pub fn retire_execute( @@ -727,7 +742,8 @@ impl SessionClient { | Command::GetSystemVars { .. } | Command::SetSystemVars { .. } | Command::Terminate { .. } - | Command::RetireExecute { .. } => {} + | Command::RetireExecute { .. } + | Command::CheckConsistency { .. } => {} }; cmd }); diff --git a/src/adapter/src/command.rs b/src/adapter/src/command.rs index e33039b4d8195..945a4a5427b1f 100644 --- a/src/adapter/src/command.rs +++ b/src/adapter/src/command.rs @@ -43,6 +43,7 @@ use uuid::Uuid; use crate::catalog::Catalog; use crate::client::{ConnectionId, ConnectionIdType}; +use crate::coord::consistency::CoordinatorInconsistencies; use crate::coord::peek::PeekResponseUnary; use crate::coord::ExecuteContextExtra; use crate::error::AdapterError; @@ -137,6 +138,10 @@ pub enum Command { data: ExecuteContextExtra, reason: StatementEndedExecutionReason, }, + + CheckConsistency { + tx: oneshot::Sender>, + }, } impl Command { @@ -151,7 +156,8 @@ impl Command { | Command::Terminate { .. } | Command::GetSystemVars { .. } | Command::SetSystemVars { .. } - | Command::RetireExecute { .. } => None, + | Command::RetireExecute { .. } + | Command::CheckConsistency { .. } => None, } } @@ -166,7 +172,8 @@ impl Command { | Command::Terminate { .. } | Command::GetSystemVars { .. } | Command::SetSystemVars { .. } - | Command::RetireExecute { .. } => None, + | Command::RetireExecute { .. } + | Command::CheckConsistency { .. } => None, } } } diff --git a/src/adapter/src/coord.rs b/src/adapter/src/coord.rs index 865c6aa0e7862..9c5828d61f6c7 100644 --- a/src/adapter/src/coord.rs +++ b/src/adapter/src/coord.rs @@ -159,6 +159,7 @@ pub(crate) mod timestamp_selection; mod appends; mod command_handler; +pub mod consistency; mod ddl; mod indexes; mod introspection; diff --git a/src/adapter/src/coord/command_handler.rs b/src/adapter/src/coord/command_handler.rs index ca83928812e31..6a9f0891f3f03 100644 --- a/src/adapter/src/coord/command_handler.rs +++ b/src/adapter/src/coord/command_handler.rs @@ -210,6 +210,10 @@ impl Coordinator { catalog: self.owned_catalog(), }); } + + Command::CheckConsistency { tx } => { + let _ = tx.send(self.check_consistency()); + } } } diff --git a/src/adapter/src/coord/consistency.rs b/src/adapter/src/coord/consistency.rs new file mode 100644 index 0000000000000..495727e1bf151 --- /dev/null +++ b/src/adapter/src/coord/consistency.rs @@ -0,0 +1,82 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +//! Internal consistency checks that validate invariants of [`Coordinator`]. + +use super::Coordinator; +use crate::catalog::consistency::CatalogInconsistencies; +use mz_repr::GlobalId; +use serde::Serialize; + +#[derive(Debug, Default, Serialize, PartialEq)] +pub struct CoordinatorInconsistencies { + /// Inconsistencies found in the catalog. + catalog_inconsistencies: CatalogInconsistencies, + /// Inconsistencies found in read capabilities. + read_capabilities: Vec, +} + +impl CoordinatorInconsistencies { + pub fn is_empty(&self) -> bool { + self.catalog_inconsistencies.is_empty() && self.read_capabilities.is_empty() + } +} + +impl Coordinator { + /// Checks the [`Coordinator`] to make sure we're internally consistent. + pub fn check_consistency(&self) -> Result<(), CoordinatorInconsistencies> { + let mut inconsistencies = CoordinatorInconsistencies::default(); + + if let Err(catalog_inconsistencies) = self.catalog().state().check_consistency() { + inconsistencies.catalog_inconsistencies = catalog_inconsistencies; + } + + if let Err(read_capabilities) = self.check_read_capabilities() { + inconsistencies.read_capabilities = read_capabilities; + } + + if inconsistencies.is_empty() { + Ok(()) + } else { + Err(inconsistencies) + } + } + + /// # Invariants: + /// + /// * Read capabilities should reference known objects. + /// + fn check_read_capabilities(&self) -> Result<(), Vec> { + let mut read_capabilities_inconsistencies = Vec::new(); + for (gid, _) in &self.storage_read_capabilities { + if self.catalog().try_get_entry(gid).is_none() { + read_capabilities_inconsistencies + .push(ReadCapabilitiesInconsistency::Storage(gid.clone())); + } + } + for (gid, _) in &self.compute_read_capabilities { + if !gid.is_transient() && self.catalog().try_get_entry(gid).is_none() { + read_capabilities_inconsistencies + .push(ReadCapabilitiesInconsistency::Compute(gid.clone())); + } + } + + if read_capabilities_inconsistencies.is_empty() { + Ok(()) + } else { + Err(read_capabilities_inconsistencies) + } + } +} + +#[derive(Debug, Serialize, PartialEq, Eq)] +enum ReadCapabilitiesInconsistency { + Storage(GlobalId), + Compute(GlobalId), +} diff --git a/src/adapter/src/coord/ddl.rs b/src/adapter/src/coord/ddl.rs index a911dd191bb83..1242dd5c10987 100644 --- a/src/adapter/src/coord/ddl.rs +++ b/src/adapter/src/coord/ddl.rs @@ -124,6 +124,7 @@ impl Coordinator { let mut update_metrics_retention = false; let mut update_secrets_caching_config = false; let mut update_cluster_scheduling_config = false; + let mut log_indexes_to_drop = Vec::new(); for op in &ops { match op { @@ -198,6 +199,13 @@ impl Coordinator { } catalog::Op::DropObject(ObjectId::Cluster(id)) => { clusters_to_drop.push(*id); + log_indexes_to_drop.extend( + self.catalog() + .get_cluster(*id) + .log_indexes + .values() + .cloned(), + ); } catalog::Op::DropObject(ObjectId::ClusterReplica((cluster_id, replica_id))) => { // Drop the cluster replica itself. @@ -440,6 +448,11 @@ impl Coordinator { self.drop_replica(cluster_id, replica_id).await; } } + if !log_indexes_to_drop.is_empty() { + for id in log_indexes_to_drop { + self.drop_compute_read_policy(&id); + } + } if !clusters_to_drop.is_empty() { for cluster_id in clusters_to_drop { self.controller.drop_cluster(cluster_id); @@ -517,7 +530,7 @@ impl Coordinator { // Note: It's important that we keep the function call inside macro, this way we only run // the consistency checks if sort assertions are enabled. - mz_ore::soft_assert_eq!(self.catalog().check_consistency(), Ok(())); + mz_ore::soft_assert_eq!(self.check_consistency(), Ok(())); Ok(result) } diff --git a/src/environmentd/src/http.rs b/src/environmentd/src/http.rs index aac7de0f58346..6a7c0d6ab8af2 100644 --- a/src/environmentd/src/http.rs +++ b/src/environmentd/src/http.rs @@ -429,6 +429,10 @@ impl InternalHttpServer { "/api/catalog/check", routing::get(catalog::handle_catalog_check), ) + .route( + "/api/coordinator/check", + routing::get(catalog::handle_coordinator_check), + ) .route( "/api/internal-console", routing::get(|| async move { diff --git a/src/environmentd/src/http/catalog.rs b/src/environmentd/src/http/catalog.rs index 820e62b7d4c86..75f1f8819a7f3 100644 --- a/src/environmentd/src/http/catalog.rs +++ b/src/environmentd/src/http/catalog.rs @@ -30,3 +30,11 @@ pub async fn handle_catalog_check(mut client: AuthedClient) -> impl IntoResponse }; (TypedHeader(ContentType::json()), response.to_string()) } + +pub async fn handle_coordinator_check(mut client: AuthedClient) -> impl IntoResponse { + let response = match client.client.check_coordinator().await { + Ok(_) => serde_json::Value::String("".to_string()), + Err(inconsistencies) => serde_json::json!({ "err": inconsistencies }), + }; + (TypedHeader(ContentType::json()), response.to_string()) +} diff --git a/src/testdrive/src/action/sql.rs b/src/testdrive/src/action/sql.rs index 049dc06a889ca..24bf721605fde 100644 --- a/src/testdrive/src/action/sql.rs +++ b/src/testdrive/src/action/sql.rs @@ -11,9 +11,10 @@ use std::ascii; use std::error::Error; use std::fmt::{self, Display, Formatter, Write as _}; use std::io::{self, Write}; -use std::time::SystemTime; +use std::time::{Duration, SystemTime}; use anyhow::{anyhow, bail, Context}; +use http::StatusCode; use md5::{Digest, Md5}; use mz_ore::collections::CollectionExt; use mz_ore::retry::Retry; @@ -136,18 +137,47 @@ pub async fn run_sql(mut cmd: SqlCommand, state: &mut State) -> Result { + let response = Retry::default() + .max_duration(Duration::from_secs(3)) + .clamp_backoff(Duration::from_millis(500)) + .retry_async(|_| async { + reqwest::get(&format!( + "http://{}/api/coordinator/check", + state.materialize_internal_http_addr, + )) + .await + .context("while getting response from coordinator check") + }) + .await?; + if response.status() == StatusCode::NOT_FOUND { + tracing::info!( + "not performing coordinator check because the endpoint doesn't exist" + ); + } else { + // 404 can happen if we're testing an older version of environmentd + let inconsistencies = response + .error_for_status() + .context("response from coordinator check returned an error")? + .text() + .await + .context("while getting text from coordinator check")?; + let inconsistencies: serde_json::Value = serde_json::from_str(&inconsistencies) + .with_context(|| { + format!( + "while parsing result from consistency check: {:?}", + inconsistencies + ) + })?; + if inconsistencies != serde_json::json!("") { + bail!("Internal catalog inconsistencies {inconsistencies:#?}"); + } + } + let catalog_state = state .with_catalog_copy(|catalog| catalog.state().clone()) .await .map_err(|e| anyhow!("failed to read on-disk catalog state: {e}"))?; - // Run internal consistency checks. - if let Some(state) = &catalog_state { - if let Err(inconsistencies) = state.check_consistency() { - bail!("Internal catalog inconsistencies {inconsistencies:#?}"); - } - } - // Check that our on-disk state matches the in-memory state. let disk_state = catalog_state.map(|state| state.dump().expect("state must be dumpable"));