diff --git a/Cargo.lock b/Cargo.lock index cf7d76cb644c4..5b8b9085f2021 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3063,6 +3063,7 @@ dependencies = [ "dec", "derivative", "differential-dataflow", + "enum-kinds", "fail", "futures", "itertools", @@ -4070,6 +4071,7 @@ dependencies = [ "aws-sdk-sts", "chrono", "datadriven", + "enum-kinds", "globset", "hex", "itertools", @@ -4113,6 +4115,7 @@ version = "0.0.0" dependencies = [ "anyhow", "datadriven", + "enum-kinds", "itertools", "mz-ore", "mz-walkabout", diff --git a/src/adapter/Cargo.toml b/src/adapter/Cargo.toml index e3ab3b3c2fed9..38da0cabd4c79 100644 --- a/src/adapter/Cargo.toml +++ b/src/adapter/Cargo.toml @@ -14,6 +14,7 @@ const_format = "0.2.26" dec = "0.4.8" derivative = "2.2.0" differential-dataflow = { git = "https://github.com/TimelyDataflow/differential-dataflow.git" } +enum-kinds = "0.5.1" fail = { version = "0.5.0", features = ["failpoints"] } futures = "0.3.24" itertools = "0.10.3" diff --git a/src/adapter/src/client.rs b/src/adapter/src/client.rs index 818af92b12c9f..2f979c5bd38ad 100644 --- a/src/adapter/src/client.rs +++ b/src/adapter/src/client.rs @@ -12,6 +12,7 @@ use std::future::Future; use std::sync::Arc; use std::time::Instant; +use mz_sql::ast::display::AstDisplay; use tokio::sync::{mpsc, oneshot, watch}; use uuid::Uuid; @@ -19,12 +20,13 @@ use mz_ore::collections::CollectionExt; use mz_ore::id_gen::IdAllocator; use mz_ore::thread::JoinOnDropHandle; use mz_repr::{GlobalId, Row, ScalarType}; -use mz_sql::ast::{Raw, Statement}; +use mz_sql::ast::{Raw, SelectStatement, Statement}; +use mz_sql::plan::Plan; use crate::catalog::SYSTEM_USER; use crate::command::{ - Canceled, Command, ExecuteResponse, Response, SimpleExecuteResponse, SimpleResult, - StartupResponse, + Canceled, Command, ExecuteResponse, ExecuteResponseKind, Response, SimpleExecuteResponse, + SimpleResult, StartupResponse, }; use crate::coord::peek::PeekResponseUnary; use crate::error::AdapterError; @@ -535,6 +537,36 @@ impl SessionClient { let mut results = vec![]; for stmt in stmts { + let execute_responses = Plan::generated_from((&stmt).into()) + .into_iter() + .map(ExecuteResponse::generated_from) + .flatten() + .collect::>(); + + if execute_responses.iter().any(|execute_response| { + matches!( + execute_response, + ExecuteResponseKind::Fetch + | ExecuteResponseKind::SetVariable + | ExecuteResponseKind::Tailing + | ExecuteResponseKind::CopyTo + | ExecuteResponseKind::CopyFrom + | ExecuteResponseKind::Raise + | ExecuteResponseKind::DeclaredCursor + | ExecuteResponseKind::ClosedCursor + ) + }) && !matches!( + stmt, + // Both `SelectStatement` and `CopyStatement` generate + // `PeekPlan`, but `SELECT` should be permitted and `COPY` not. + Statement::Select(SelectStatement { query: _, as_of: _ }) + ) { + results.push(SimpleResult::err(format!( + "unsupported via this API: {}", + stmt.to_ast_string() + ))); + } + if matches!(self.session().transaction(), TransactionStatus::Failed(_)) { break; } diff --git a/src/adapter/src/command.rs b/src/adapter/src/command.rs index d4fffc7baf3b6..a7a0431d2edf1 100644 --- a/src/adapter/src/command.rs +++ b/src/adapter/src/command.rs @@ -7,12 +7,18 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +// `EnumKind` unconditionally introduces a lifetime. TODO: remove this once +// https://github.com/rust-lang/rust-clippy/pull/9037 makes it into stable +#![allow(clippy::extra_unused_lifetimes)] + use std::fmt; use std::future::Future; use std::pin::Pin; use std::sync::Arc; use derivative::Derivative; +use enum_kinds::EnumKind; +use mz_sql::plan::PlanKind; use serde::Serialize; use tokio::sync::oneshot; use tokio::sync::watch; @@ -29,6 +35,7 @@ use crate::coord::peek::PeekResponseUnary; use crate::error::AdapterError; use crate::session::ClientSeverity; use crate::session::{EndTransactionAction, RowBatchStream, Session}; +use crate::util::Transmittable; #[derive(Debug)] pub enum Command { @@ -120,6 +127,15 @@ pub struct StartupResponse { pub messages: Vec, } +// Facile implementation for `StartupResponse`, which does not use the `allowed` +// feature of `ClientTransmitter`. +impl Transmittable for StartupResponse { + type Allowed = bool; + fn to_allowed(&self) -> Self::Allowed { + true + } +} + /// Messages in a [`StartupResponse`]. #[derive(Debug)] pub enum StartupMessage { @@ -165,8 +181,9 @@ pub struct ExecuteResponsePartialError { } /// The response to [`SessionClient::execute`](crate::SessionClient::execute). -#[derive(Derivative)] +#[derive(EnumKind, Derivative)] #[derivative(Debug)] +#[enum_kind(ExecuteResponseKind)] pub enum ExecuteResponse { /// The active transaction was exited. TransactionExited { @@ -592,8 +609,88 @@ impl ExecuteResponse { r } + + /// Expresses which [`PlanKind`] generate which set of + /// `ExecuteResponseKind`. + /// + /// Empty results indicate that the type of response is not known. + pub fn generated_from(plan: PlanKind) -> Vec { + use ExecuteResponseKind::*; + use PlanKind::*; + + match plan { + AbortTransaction | CommitTransaction => vec![TransactionExited], + AlterItemRename | AlterNoop | AlterSecret | AlterSource | RotateKeys => { + vec![AlteredObject] + } + AlterIndexSetOptions | AlterIndexResetOptions => { + vec![AlteredObject, AlteredIndexLogicalCompaction] + } + AlterSystemSet | AlterSystemReset | AlterSystemResetAll => { + vec![AlteredSystemConfiguraion] + } + Close => vec![ClosedCursor], + PlanKind::CopyFrom => vec![ExecuteResponseKind::CopyFrom], + CreateConnection => vec![CreatedConnection], + CreateDatabase => vec![CreatedDatabase], + CreateSchema => vec![CreatedSchema], + CreateRole => vec![CreatedRole], + CreateComputeInstance => vec![CreatedComputeInstance], + CreateComputeInstanceReplica => vec![CreatedComputeInstanceReplica], + CreateSource => vec![CreatedSource, CreatedSources], + CreateSecret => vec![CreatedSecret], + CreateSink => vec![CreatedSink], + CreateTable => vec![CreatedTable], + CreateView => vec![CreatedView], + CreateViews => vec![CreatedViews], + CreateMaterializedView => vec![CreatedMaterializedView], + CreateIndex => vec![CreatedIndex], + CreateType => vec![CreatedType], + PlanKind::Deallocate => vec![ExecuteResponseKind::Deallocate], + Declare => vec![DeclaredCursor], + DiscardTemp => vec![DiscardedTemp], + DiscardAll => vec![DiscardedAll], + DropDatabase => vec![DroppedDatabase], + DropSchema => vec![DroppedSchema], + DropRoles => vec![DroppedRole], + DropComputeInstances => vec![DroppedComputeInstance], + DropComputeInstanceReplica => vec![DroppedComputeInstanceReplicas], + DropItems => vec![ + DroppedConnection, + DroppedSource, + DroppedTable, + DroppedView, + DroppedMaterializedView, + DroppedIndex, + DroppedSink, + DroppedType, + DroppedSecret, + ], + PlanKind::EmptyQuery => vec![ExecuteResponseKind::EmptyQuery], + Explain | Peek | SendRows | ShowAllVariables | ShowVariable => { + vec![CopyTo, SendingRows] + } + Execute | ReadThenWrite | SendDiffs => vec![Deleted, Inserted, SendingRows, Updated], + PlanKind::Fetch => vec![ExecuteResponseKind::Fetch], + Insert => vec![Inserted, SendingRows], + PlanKind::Prepare => vec![ExecuteResponseKind::Prepare], + PlanKind::Raise => vec![ExecuteResponseKind::Raise], + PlanKind::SetVariable | ResetVariable => vec![ExecuteResponseKind::SetVariable], + Tail => vec![Tailing, CopyTo], + StartTransaction => vec![StartedTransaction], + } + } } +/// This implementation is meant to ensure that we maintain updated information +/// about which types of `ExecuteResponse`s are permitted to be sent, which will +/// be a function of which plan we're executing. +impl Transmittable for ExecuteResponse { + type Allowed = ExecuteResponseKind; + fn to_allowed(&self) -> Self::Allowed { + ExecuteResponseKind::from(self) + } +} /// The response to [`SessionClient::simple_execute`](crate::SessionClient::simple_execute). #[derive(Debug, Serialize)] pub struct SimpleExecuteResponse { diff --git a/src/adapter/src/coord/sequencer.rs b/src/adapter/src/coord/sequencer.rs index 3efc8f24a1545..e36e5622e1585 100644 --- a/src/adapter/src/coord/sequencer.rs +++ b/src/adapter/src/coord/sequencer.rs @@ -52,8 +52,9 @@ use mz_sql::plan::{ DropComputeInstanceReplicaPlan, DropComputeInstancesPlan, DropDatabasePlan, DropItemsPlan, DropRolesPlan, DropSchemaPlan, ExecutePlan, ExplainPlan, ExplainPlanNew, ExplainPlanOld, FetchPlan, HirRelationExpr, IndexOption, InsertPlan, MaterializedView, MutationKind, - OptimizerConfig, PeekPlan, Plan, QueryWhen, RaisePlan, ReadThenWritePlan, ResetVariablePlan, - RotateKeysPlan, SendDiffsPlan, SetVariablePlan, ShowVariablePlan, TailFrom, TailPlan, View, + OptimizerConfig, PeekPlan, Plan, PlanKind, QueryWhen, RaisePlan, ReadThenWritePlan, + ResetVariablePlan, RotateKeysPlan, SendDiffsPlan, SetVariablePlan, ShowVariablePlan, TailFrom, + TailPlan, View, }; use mz_stash::Append; use mz_storage::controller::{CollectionDescription, ReadPolicy, StorageError}; @@ -85,12 +86,15 @@ impl Coordinator { #[tracing::instrument(level = "debug", skip_all)] pub(crate) async fn sequence_plan( &mut self, - tx: ClientTransmitter, + mut tx: ClientTransmitter, mut session: Session, plan: Plan, depends_on: Vec, ) { event!(Level::TRACE, plan = format!("{:?}", plan)); + let responses = ExecuteResponse::generated_from(PlanKind::from(&plan)); + tx.set_allowed(responses); + match plan { Plan::CreateSource(_) => unreachable!("handled separately"), Plan::CreateConnection(plan) => { diff --git a/src/adapter/src/util.rs b/src/adapter/src/util.rs index 2a31bfc0f8fad..bc6c92bbf6ef0 100644 --- a/src/adapter/src/util.rs +++ b/src/adapter/src/util.rs @@ -11,6 +11,7 @@ use tokio::sync::mpsc::UnboundedSender; use tokio::sync::oneshot; use mz_compute_client::controller::ComputeInstanceId; +use mz_ore::soft_assert; use mz_repr::{RelationDesc, Row, ScalarType}; use mz_sql::names::FullObjectName; use mz_sql::plan::StatementDesc; @@ -29,12 +30,15 @@ use crate::{ExecuteResponse, PeekResponseUnary}; /// Handles responding to clients. #[derive(Debug)] -pub struct ClientTransmitter { +pub struct ClientTransmitter { tx: Option>>, internal_cmd_tx: UnboundedSender, + /// Expresses an optional [`soft_assert`] on the set of values allowed to be + /// sent from `self`. + allowed: Option>, } -impl ClientTransmitter { +impl ClientTransmitter { /// Creates a new client transmitter. pub fn new( tx: oneshot::Sender>, @@ -43,12 +47,27 @@ impl ClientTransmitter { ClientTransmitter { tx: Some(tx), internal_cmd_tx, + allowed: None, } } /// Transmits `result` to the client, returning ownership of the session /// `session` as well. + /// + /// # Panics + /// - If in `soft_assert`, `result.is_ok()`, `self.allowed.is_some()`, and + /// the result value is not in the set of allowed values. pub fn send(mut self, result: Result, session: Session) { + // Guarantee that the value sent is + soft_assert!( + match (&result, self.allowed.take()) { + (Ok(ref t), Some(allowed)) => allowed.contains(&t.to_allowed()), + _ => true, + }, + "tried to send disallowed value through ClientTransmitter; \ + see ClientTransmitter::set_allowed" + ); + // If we were not able to send a message, we must clean up the session // ourselves. Return it to the caller for disposal. if let Err(res) = self.tx.take().unwrap().send(Response { result, session }) { @@ -63,18 +82,41 @@ impl ClientTransmitter { pub fn take(mut self) -> oneshot::Sender> { self.tx.take().unwrap() } + + /// Sets `self` so that the next call to [`Self::send`] will [`soft_assert`] + /// that, if `Ok`, the value is one of `allowed`, as determined by + /// [`Transmittable::to_allowed`]. + pub fn set_allowed(&mut self, allowed: Vec) { + self.allowed = Some(allowed); + } +} + +/// A helper trait for [`ClientTransmitter`]. +pub trait Transmittable { + /// The type of values used to express which set of values are allowed. + type Allowed: Eq + PartialEq + std::fmt::Debug; + /// The conversion from the [`ClientTransmitter`]'s type to `Allowed`. + /// + /// The benefit of this style of trait, rather than relying on a bound on + /// `Allowed`, are: + /// - Not requiring a clone + /// - The flexibility for facile implementations that do not plan to make + /// use of the `allowed` feature. Those types can simply implement this + /// trait for `bool`, and return `true`. However, it might not be + /// semantically appropriate to expose `From<&Self> for bool`. + fn to_allowed(&self) -> Self::Allowed; } /// `ClientTransmitter` with a response to send. #[derive(Debug)] -pub struct CompletedClientTransmitter { +pub struct CompletedClientTransmitter { client_transmitter: ClientTransmitter, response: Result, session: Session, action: EndTransactionAction, } -impl CompletedClientTransmitter { +impl CompletedClientTransmitter { /// Creates a new completed client transmitter. pub fn new( client_transmitter: ClientTransmitter, @@ -98,7 +140,7 @@ impl CompletedClientTransmitter { } } -impl Drop for ClientTransmitter { +impl Drop for ClientTransmitter { fn drop(&mut self) { if self.tx.is_some() { panic!("client transmitter dropped without send") diff --git a/src/avro/src/types.rs b/src/avro/src/types.rs index 2b9d542b67d58..38c265468dbb4 100644 --- a/src/avro/src/types.rs +++ b/src/avro/src/types.rs @@ -23,7 +23,8 @@ //! Logic handling the intermediate representation of Avro values. -// `EnumKind` unconditionally introduces a lifetime. +// `EnumKind` unconditionally introduces a lifetime. TODO: remove this once +// https://github.com/rust-lang/rust-clippy/pull/9037 makes it into stable #![allow(clippy::extra_unused_lifetimes)] use std::collections::HashMap; diff --git a/src/environmentd/tests/server.rs b/src/environmentd/tests/server.rs index de940af00ce06..0fd786fe146b2 100644 --- a/src/environmentd/tests/server.rs +++ b/src/environmentd/tests/server.rs @@ -179,6 +179,17 @@ fn test_http_sql() -> Result<(), Box> { status: StatusCode::OK, body: r#"{"results":[{"error":"CREATE VIEW v1 AS SELECT 1 cannot be run inside a transaction block"}]}"#, }, + // Unsupported statement types + TestCase { + query: "tail (select * from v1)", + status: StatusCode::OK, + body: r#"{"results":[{"error":"unsupported via this API: TAIL (SELECT * FROM v1)"}]}"#, + }, + TestCase { + query: "set var = 1", + status: StatusCode::OK, + body: r#"{"results":[{"error":"unsupported via this API: SET var = 1"}]}"#, + }, // Syntax errors fail the request. TestCase { query: "'", diff --git a/src/repr/src/scalar.rs b/src/repr/src/scalar.rs index 509b26d34f948..d098f03a263ad 100644 --- a/src/repr/src/scalar.rs +++ b/src/repr/src/scalar.rs @@ -7,7 +7,8 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -// `EnumKind` and various macros unconditionally introduce lifetimes. +// `EnumKind` unconditionally introduces a lifetime. TODO: remove this once +// https://github.com/rust-lang/rust-clippy/pull/9037 makes it into stable #![allow(clippy::extra_unused_lifetimes)] use std::fmt::{self, Write}; diff --git a/src/sql-parser/Cargo.toml b/src/sql-parser/Cargo.toml index 3a2354a833c7e..7c7d3423ebbc3 100644 --- a/src/sql-parser/Cargo.toml +++ b/src/sql-parser/Cargo.toml @@ -8,6 +8,7 @@ rust-version = "1.63.0" publish = false [dependencies] +enum-kinds = "0.5.1" itertools = "0.10.3" mz-ore = { path = "../ore", default-features = false, features = ["stack"] } phf = { version = "0.11.1", features = ["uncased"] } diff --git a/src/sql-parser/src/ast/defs/statement.rs b/src/sql-parser/src/ast/defs/statement.rs index a965de0cd8306..438ce8c5ef431 100644 --- a/src/sql-parser/src/ast/defs/statement.rs +++ b/src/sql-parser/src/ast/defs/statement.rs @@ -18,8 +18,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +// `EnumKind` unconditionally introduces a lifetime. TODO: remove this once +// https://github.com/rust-lang/rust-clippy/pull/9037 makes it into stable +#![allow(clippy::extra_unused_lifetimes)] + use std::fmt; +use enum_kinds::EnumKind; + use crate::ast::display::{self, AstDisplay, AstFormatter}; use crate::ast::{ AstInfo, ColumnDef, CreateConnection, CreateSinkConnection, CreateSourceConnection, @@ -30,7 +36,8 @@ use crate::ast::{ /// A top-level statement (SELECT, INSERT, CREATE, etc.) #[allow(clippy::large_enum_variant)] -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, EnumKind)] +#[enum_kind(StatementKind)] pub enum Statement { Select(SelectStatement), Insert(InsertStatement), diff --git a/src/sql/Cargo.toml b/src/sql/Cargo.toml index b6b6f37101870..b53dad475d886 100644 --- a/src/sql/Cargo.toml +++ b/src/sql/Cargo.toml @@ -11,6 +11,7 @@ anyhow = "1.0.64" aws-arn = "0.3.1" aws-sdk-sts = { version = "0.18.0", default-features = false, features = ["native-tls", "rt-tokio"] } chrono = { version = "0.4.21", default-features = false, features = ["clock", "std"] } +enum-kinds = "0.5.1" globset = "0.4.9" hex = "0.4.3" itertools = "0.10.3" diff --git a/src/sql/src/plan.rs b/src/sql/src/plan.rs index a88bb5be53cd3..70840516bb55e 100644 --- a/src/sql/src/plan.rs +++ b/src/sql/src/plan.rs @@ -26,25 +26,31 @@ // `plan_root_query` and fanning out based on the contents of the `SELECT` // statement. +// `EnumKind` unconditionally introduces a lifetime. TODO: remove this once +// https://github.com/rust-lang/rust-clippy/pull/9037 makes it into stable +#![allow(clippy::extra_unused_lifetimes)] + use std::collections::{BTreeSet, HashMap, HashSet}; use std::num::NonZeroUsize; use std::time::Duration; use chrono::{DateTime, Utc}; -use mz_repr::explain_new::{ExplainConfig, ExplainFormat}; +use enum_kinds::EnumKind; use serde::{Deserialize, Serialize}; use mz_compute_client::controller::ComputeInstanceId; use mz_expr::{MirRelationExpr, MirScalarExpr, RowSetFinishing}; use mz_ore::now::{self, NOW_ZERO}; use mz_pgcopy::CopyFormatParams; +use mz_repr::explain_new::{ExplainConfig, ExplainFormat}; use mz_repr::{ColumnName, Diff, GlobalId, RelationDesc, Row, ScalarType}; use mz_storage::types::sinks::{SinkEnvelope, StorageSinkConnectionBuilder}; use mz_storage::types::sources::{SourceDesc, Timeline}; use crate::ast::{ ExplainOptions, ExplainStageNew, ExplainStageOld, Expr, FetchDirection, IndexOptionName, - NoticeSeverity, ObjectType, Raw, SetVariableValue, Statement, TransactionAccessMode, + NoticeSeverity, ObjectType, Raw, SetVariableValue, Statement, StatementKind, + TransactionAccessMode, }; use crate::catalog::{CatalogType, IdReference}; use crate::names::{ @@ -76,7 +82,8 @@ pub use query::{QueryContext, QueryLifetime}; pub use statement::{describe, plan, plan_copy_from, StatementContext, StatementDesc}; /// Instructions for executing a SQL query. -#[derive(Debug)] +#[derive(Debug, EnumKind)] +#[enum_kind(PlanKind)] pub enum Plan { CreateConnection(CreateConnectionPlan), CreateDatabase(CreateDatabasePlan), @@ -136,6 +143,86 @@ pub enum Plan { RotateKeys(RotateKeysPlan), } +impl Plan { + /// Expresses which [`StatementKind`] can generate which set of + /// [`PlanKind`]. + pub fn generated_from(stmt: StatementKind) -> Vec { + match dbg!(stmt) { + StatementKind::AlterConnection => vec![PlanKind::AlterNoop, PlanKind::RotateKeys], + StatementKind::AlterIndex => vec![ + PlanKind::AlterIndexResetOptions, + PlanKind::AlterIndexSetOptions, + PlanKind::AlterNoop, + ], + StatementKind::AlterObjectRename => { + vec![PlanKind::AlterItemRename, PlanKind::AlterNoop] + } + StatementKind::AlterSecret => vec![PlanKind::AlterNoop, PlanKind::AlterSecret], + StatementKind::AlterSource => vec![PlanKind::AlterNoop, PlanKind::AlterSource], + StatementKind::AlterSystemReset => { + vec![PlanKind::AlterNoop, PlanKind::AlterSystemReset] + } + StatementKind::AlterSystemResetAll => { + vec![PlanKind::AlterNoop, PlanKind::AlterSystemResetAll] + } + StatementKind::AlterSystemSet => vec![PlanKind::AlterNoop, PlanKind::AlterSystemSet], + StatementKind::Close => vec![PlanKind::Close], + StatementKind::Commit => vec![PlanKind::CommitTransaction], + StatementKind::Copy => vec![ + PlanKind::CopyFrom, + PlanKind::Peek, + PlanKind::SendDiffs, + PlanKind::Tail, + ], + StatementKind::CreateCluster => vec![PlanKind::CreateComputeInstance], + StatementKind::CreateClusterReplica => vec![PlanKind::CreateComputeInstanceReplica], + StatementKind::CreateConnection => vec![PlanKind::CreateConnection], + StatementKind::CreateDatabase => vec![PlanKind::CreateDatabase], + StatementKind::CreateIndex => vec![PlanKind::CreateIndex], + StatementKind::CreateMaterializedView => vec![PlanKind::CreateMaterializedView], + StatementKind::CreateRole => vec![PlanKind::CreateRole], + StatementKind::CreateSchema => vec![PlanKind::CreateSchema], + StatementKind::CreateSecret => vec![PlanKind::CreateSecret], + StatementKind::CreateSink => vec![PlanKind::CreateSink], + StatementKind::CreateSource => vec![PlanKind::CreateSource], + StatementKind::CreateTable => vec![PlanKind::CreateTable], + StatementKind::CreateType => vec![PlanKind::CreateType], + StatementKind::CreateView => vec![PlanKind::CreateView], + StatementKind::CreateViews => vec![PlanKind::CreateViews], + StatementKind::Deallocate => vec![PlanKind::Deallocate], + StatementKind::Declare => vec![PlanKind::Declare], + StatementKind::Delete => vec![PlanKind::ReadThenWrite], + StatementKind::Discard => vec![PlanKind::DiscardAll, PlanKind::DiscardTemp], + StatementKind::DropClusterReplicas => vec![PlanKind::DropComputeInstanceReplica], + StatementKind::DropClusters => vec![PlanKind::DropComputeInstances], + StatementKind::DropDatabase => vec![PlanKind::DropDatabase], + StatementKind::DropObjects => vec![PlanKind::DropItems], + StatementKind::DropRoles => vec![PlanKind::DropRoles], + StatementKind::DropSchema => vec![PlanKind::DropSchema], + StatementKind::Execute => vec![PlanKind::Execute], + StatementKind::Explain => vec![PlanKind::Explain], + StatementKind::Fetch => vec![PlanKind::Fetch], + StatementKind::Insert => vec![PlanKind::Insert], + StatementKind::Prepare => vec![PlanKind::Prepare], + StatementKind::Raise => vec![PlanKind::Raise], + StatementKind::ResetVariable => vec![PlanKind::ResetVariable], + StatementKind::Rollback => vec![PlanKind::AbortTransaction], + StatementKind::Select => vec![PlanKind::Peek], + StatementKind::SetTransaction => vec![], + StatementKind::SetVariable => vec![PlanKind::SetVariable], + StatementKind::Show => vec![ + PlanKind::Peek, + PlanKind::SendRows, + PlanKind::ShowVariable, + PlanKind::ShowAllVariables, + ], + StatementKind::StartTransaction => vec![PlanKind::StartTransaction], + StatementKind::Tail => vec![PlanKind::Tail], + StatementKind::Update => vec![PlanKind::ReadThenWrite, PlanKind::SendRows], + } + } +} + #[derive(Debug)] pub struct StartTransactionPlan { pub access: Option, diff --git a/src/sql/src/plan/statement.rs b/src/sql/src/plan/statement.rs index 2cf51dce4f50f..35db724a5a2f1 100644 --- a/src/sql/src/plan/statement.rs +++ b/src/sql/src/plan/statement.rs @@ -31,7 +31,7 @@ use crate::names::{ }; use crate::plan::error::PlanError; use crate::plan::query; -use crate::plan::{Params, Plan, PlanContext}; +use crate::plan::{Params, Plan, PlanContext, PlanKind}; use crate::{normalize, DEFAULT_SCHEMA}; mod ddl; @@ -236,13 +236,15 @@ pub fn plan( .map(|(i, ty)| (i + 1, ty.clone())) .collect(); + let permitted_plans = Plan::generated_from((&stmt).into()); + let scx = &mut StatementContext { pcx, catalog, param_types: RefCell::new(param_types), }; - match stmt { + let plan = match stmt { // DDL statements. Statement::AlterConnection(stmt) => ddl::plan_alter_connection(scx, stmt), Statement::AlterIndex(stmt) => ddl::plan_alter_index_options(scx, stmt), @@ -335,7 +337,18 @@ pub fn plan( // Other statements. Statement::Raise(stmt) => raise::plan_raise(scx, stmt), + }; + + if let Ok(plan) = &plan { + mz_ore::soft_assert!( + permitted_plans.contains(&PlanKind::from(plan)), + "plan {:?}, permitted plans {:?}", + plan, + permitted_plans + ); } + + plan } pub fn plan_copy_from(