Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/adapter/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const_format = "0.2.26"
dec = "0.4.8"
derivative = "2.2.0"
differential-dataflow = { git = "https://github.com/TimelyDataflow/differential-dataflow.git" }
enum-kinds = "0.5.1"
fail = { version = "0.5.0", features = ["failpoints"] }
futures = "0.3.24"
itertools = "0.10.3"
Expand Down
38 changes: 35 additions & 3 deletions src/adapter/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,21 @@ use std::future::Future;
use std::sync::Arc;
use std::time::Instant;

use mz_sql::ast::display::AstDisplay;
use tokio::sync::{mpsc, oneshot, watch};
use uuid::Uuid;

use mz_ore::collections::CollectionExt;
use mz_ore::id_gen::IdAllocator;
use mz_ore::thread::JoinOnDropHandle;
use mz_repr::{GlobalId, Row, ScalarType};
use mz_sql::ast::{Raw, Statement};
use mz_sql::ast::{Raw, SelectStatement, Statement};
use mz_sql::plan::Plan;

use crate::catalog::SYSTEM_USER;
use crate::command::{
Canceled, Command, ExecuteResponse, Response, SimpleExecuteResponse, SimpleResult,
StartupResponse,
Canceled, Command, ExecuteResponse, ExecuteResponseKind, Response, SimpleExecuteResponse,
SimpleResult, StartupResponse,
};
use crate::coord::peek::PeekResponseUnary;
use crate::error::AdapterError;
Expand Down Expand Up @@ -535,6 +537,36 @@ impl SessionClient {
let mut results = vec![];

for stmt in stmts {
let execute_responses = Plan::generated_from((&stmt).into())
.into_iter()
.map(ExecuteResponse::generated_from)
.flatten()
.collect::<Vec<_>>();

if execute_responses.iter().any(|execute_response| {
matches!(
execute_response,
ExecuteResponseKind::Fetch
| ExecuteResponseKind::SetVariable
| ExecuteResponseKind::Tailing
| ExecuteResponseKind::CopyTo
| ExecuteResponseKind::CopyFrom
| ExecuteResponseKind::Raise
| ExecuteResponseKind::DeclaredCursor
| ExecuteResponseKind::ClosedCursor
)
}) && !matches!(
stmt,
// Both `SelectStatement` and `CopyStatement` generate
// `PeekPlan`, but `SELECT` should be permitted and `COPY` not.
Statement::Select(SelectStatement { query: _, as_of: _ })
) {
results.push(SimpleResult::err(format!(
"unsupported via this API: {}",
stmt.to_ast_string()
)));
}

if matches!(self.session().transaction(), TransactionStatus::Failed(_)) {
break;
}
Expand Down
99 changes: 98 additions & 1 deletion src/adapter/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,18 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

// `EnumKind` unconditionally introduces a lifetime. TODO: remove this once
// https://github.com/rust-lang/rust-clippy/pull/9037 makes it into stable
#![allow(clippy::extra_unused_lifetimes)]

use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;

use derivative::Derivative;
use enum_kinds::EnumKind;
use mz_sql::plan::PlanKind;
use serde::Serialize;
use tokio::sync::oneshot;
use tokio::sync::watch;
Expand All @@ -29,6 +35,7 @@ use crate::coord::peek::PeekResponseUnary;
use crate::error::AdapterError;
use crate::session::ClientSeverity;
use crate::session::{EndTransactionAction, RowBatchStream, Session};
use crate::util::Transmittable;

#[derive(Debug)]
pub enum Command {
Expand Down Expand Up @@ -120,6 +127,15 @@ pub struct StartupResponse {
pub messages: Vec<StartupMessage>,
}

// Facile implementation for `StartupResponse`, which does not use the `allowed`
// feature of `ClientTransmitter`.
impl Transmittable for StartupResponse {
type Allowed = bool;
fn to_allowed(&self) -> Self::Allowed {
true
}
}

/// Messages in a [`StartupResponse`].
#[derive(Debug)]
pub enum StartupMessage {
Expand Down Expand Up @@ -165,8 +181,9 @@ pub struct ExecuteResponsePartialError {
}

/// The response to [`SessionClient::execute`](crate::SessionClient::execute).
#[derive(Derivative)]
#[derive(EnumKind, Derivative)]
#[derivative(Debug)]
#[enum_kind(ExecuteResponseKind)]
pub enum ExecuteResponse {
/// The active transaction was exited.
TransactionExited {
Expand Down Expand Up @@ -592,8 +609,88 @@ impl ExecuteResponse {

r
}

/// Expresses which [`PlanKind`] generate which set of
/// `ExecuteResponseKind`.
///
/// Empty results indicate that the type of response is not known.
pub fn generated_from(plan: PlanKind) -> Vec<ExecuteResponseKind> {
use ExecuteResponseKind::*;
use PlanKind::*;

match plan {
AbortTransaction | CommitTransaction => vec![TransactionExited],
AlterItemRename | AlterNoop | AlterSecret | AlterSource | RotateKeys => {
vec![AlteredObject]
}
AlterIndexSetOptions | AlterIndexResetOptions => {
vec![AlteredObject, AlteredIndexLogicalCompaction]
}
AlterSystemSet | AlterSystemReset | AlterSystemResetAll => {
vec![AlteredSystemConfiguraion]
}
Close => vec![ClosedCursor],
PlanKind::CopyFrom => vec![ExecuteResponseKind::CopyFrom],
CreateConnection => vec![CreatedConnection],
CreateDatabase => vec![CreatedDatabase],
CreateSchema => vec![CreatedSchema],
CreateRole => vec![CreatedRole],
CreateComputeInstance => vec![CreatedComputeInstance],
CreateComputeInstanceReplica => vec![CreatedComputeInstanceReplica],
CreateSource => vec![CreatedSource, CreatedSources],
CreateSecret => vec![CreatedSecret],
CreateSink => vec![CreatedSink],
CreateTable => vec![CreatedTable],
CreateView => vec![CreatedView],
CreateViews => vec![CreatedViews],
CreateMaterializedView => vec![CreatedMaterializedView],
CreateIndex => vec![CreatedIndex],
CreateType => vec![CreatedType],
PlanKind::Deallocate => vec![ExecuteResponseKind::Deallocate],
Declare => vec![DeclaredCursor],
DiscardTemp => vec![DiscardedTemp],
DiscardAll => vec![DiscardedAll],
DropDatabase => vec![DroppedDatabase],
DropSchema => vec![DroppedSchema],
DropRoles => vec![DroppedRole],
DropComputeInstances => vec![DroppedComputeInstance],
DropComputeInstanceReplica => vec![DroppedComputeInstanceReplicas],
DropItems => vec![
DroppedConnection,
DroppedSource,
DroppedTable,
DroppedView,
DroppedMaterializedView,
DroppedIndex,
DroppedSink,
DroppedType,
DroppedSecret,
],
PlanKind::EmptyQuery => vec![ExecuteResponseKind::EmptyQuery],
Explain | Peek | SendRows | ShowAllVariables | ShowVariable => {
vec![CopyTo, SendingRows]
}
Execute | ReadThenWrite | SendDiffs => vec![Deleted, Inserted, SendingRows, Updated],
PlanKind::Fetch => vec![ExecuteResponseKind::Fetch],
Insert => vec![Inserted, SendingRows],
PlanKind::Prepare => vec![ExecuteResponseKind::Prepare],
PlanKind::Raise => vec![ExecuteResponseKind::Raise],
PlanKind::SetVariable | ResetVariable => vec![ExecuteResponseKind::SetVariable],
Tail => vec![Tailing, CopyTo],
StartTransaction => vec![StartedTransaction],
}
}
}

/// This implementation is meant to ensure that we maintain updated information
/// about which types of `ExecuteResponse`s are permitted to be sent, which will
/// be a function of which plan we're executing.
impl Transmittable for ExecuteResponse {
type Allowed = ExecuteResponseKind;
fn to_allowed(&self) -> Self::Allowed {
ExecuteResponseKind::from(self)
}
}
/// The response to [`SessionClient::simple_execute`](crate::SessionClient::simple_execute).
#[derive(Debug, Serialize)]
pub struct SimpleExecuteResponse {
Expand Down
10 changes: 7 additions & 3 deletions src/adapter/src/coord/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ use mz_sql::plan::{
DropComputeInstanceReplicaPlan, DropComputeInstancesPlan, DropDatabasePlan, DropItemsPlan,
DropRolesPlan, DropSchemaPlan, ExecutePlan, ExplainPlan, ExplainPlanNew, ExplainPlanOld,
FetchPlan, HirRelationExpr, IndexOption, InsertPlan, MaterializedView, MutationKind,
OptimizerConfig, PeekPlan, Plan, QueryWhen, RaisePlan, ReadThenWritePlan, ResetVariablePlan,
RotateKeysPlan, SendDiffsPlan, SetVariablePlan, ShowVariablePlan, TailFrom, TailPlan, View,
OptimizerConfig, PeekPlan, Plan, PlanKind, QueryWhen, RaisePlan, ReadThenWritePlan,
ResetVariablePlan, RotateKeysPlan, SendDiffsPlan, SetVariablePlan, ShowVariablePlan, TailFrom,
TailPlan, View,
};
use mz_stash::Append;
use mz_storage::controller::{CollectionDescription, ReadPolicy, StorageError};
Expand Down Expand Up @@ -85,12 +86,15 @@ impl<S: Append + 'static> Coordinator<S> {
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) async fn sequence_plan(
&mut self,
tx: ClientTransmitter<ExecuteResponse>,
mut tx: ClientTransmitter<ExecuteResponse>,
mut session: Session,
plan: Plan,
depends_on: Vec<GlobalId>,
) {
event!(Level::TRACE, plan = format!("{:?}", plan));
let responses = ExecuteResponse::generated_from(PlanKind::from(&plan));
tx.set_allowed(responses);

match plan {
Plan::CreateSource(_) => unreachable!("handled separately"),
Plan::CreateConnection(plan) => {
Expand Down
52 changes: 47 additions & 5 deletions src/adapter/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::oneshot;

use mz_compute_client::controller::ComputeInstanceId;
use mz_ore::soft_assert;
use mz_repr::{RelationDesc, Row, ScalarType};
use mz_sql::names::FullObjectName;
use mz_sql::plan::StatementDesc;
Expand All @@ -29,12 +30,15 @@ use crate::{ExecuteResponse, PeekResponseUnary};

/// Handles responding to clients.
#[derive(Debug)]
pub struct ClientTransmitter<T> {
pub struct ClientTransmitter<T: Transmittable> {
tx: Option<oneshot::Sender<Response<T>>>,
internal_cmd_tx: UnboundedSender<Message>,
/// Expresses an optional [`soft_assert`] on the set of values allowed to be
/// sent from `self`.
allowed: Option<Vec<T::Allowed>>,
}

impl<T> ClientTransmitter<T> {
impl<T: Transmittable> ClientTransmitter<T> {
/// Creates a new client transmitter.
pub fn new(
tx: oneshot::Sender<Response<T>>,
Expand All @@ -43,12 +47,27 @@ impl<T> ClientTransmitter<T> {
ClientTransmitter {
tx: Some(tx),
internal_cmd_tx,
allowed: None,
}
}

/// Transmits `result` to the client, returning ownership of the session
/// `session` as well.
///
/// # Panics
/// - If in `soft_assert`, `result.is_ok()`, `self.allowed.is_some()`, and
/// the result value is not in the set of allowed values.
pub fn send(mut self, result: Result<T, AdapterError>, session: Session) {
// Guarantee that the value sent is
soft_assert!(
match (&result, self.allowed.take()) {
(Ok(ref t), Some(allowed)) => allowed.contains(&t.to_allowed()),
_ => true,
},
"tried to send disallowed value through ClientTransmitter; \
see ClientTransmitter::set_allowed"
);

// If we were not able to send a message, we must clean up the session
// ourselves. Return it to the caller for disposal.
if let Err(res) = self.tx.take().unwrap().send(Response { result, session }) {
Expand All @@ -63,18 +82,41 @@ impl<T> ClientTransmitter<T> {
pub fn take(mut self) -> oneshot::Sender<Response<T>> {
self.tx.take().unwrap()
}

/// Sets `self` so that the next call to [`Self::send`] will [`soft_assert`]
/// that, if `Ok`, the value is one of `allowed`, as determined by
/// [`Transmittable::to_allowed`].
pub fn set_allowed(&mut self, allowed: Vec<T::Allowed>) {
self.allowed = Some(allowed);
}
}

/// A helper trait for [`ClientTransmitter`].
pub trait Transmittable {
/// The type of values used to express which set of values are allowed.
type Allowed: Eq + PartialEq + std::fmt::Debug;
/// The conversion from the [`ClientTransmitter`]'s type to `Allowed`.
///
/// The benefit of this style of trait, rather than relying on a bound on
/// `Allowed`, are:
/// - Not requiring a clone
/// - The flexibility for facile implementations that do not plan to make
/// use of the `allowed` feature. Those types can simply implement this
/// trait for `bool`, and return `true`. However, it might not be
/// semantically appropriate to expose `From<&Self> for bool`.
fn to_allowed(&self) -> Self::Allowed;
}

/// `ClientTransmitter` with a response to send.
#[derive(Debug)]
pub struct CompletedClientTransmitter<T> {
pub struct CompletedClientTransmitter<T: Transmittable> {
client_transmitter: ClientTransmitter<T>,
response: Result<T, AdapterError>,
session: Session,
action: EndTransactionAction,
}

impl<T> CompletedClientTransmitter<T> {
impl<T: Transmittable> CompletedClientTransmitter<T> {
/// Creates a new completed client transmitter.
pub fn new(
client_transmitter: ClientTransmitter<T>,
Expand All @@ -98,7 +140,7 @@ impl<T> CompletedClientTransmitter<T> {
}
}

impl<T> Drop for ClientTransmitter<T> {
impl<T: Transmittable> Drop for ClientTransmitter<T> {
fn drop(&mut self) {
if self.tx.is_some() {
panic!("client transmitter dropped without send")
Expand Down
3 changes: 2 additions & 1 deletion src/avro/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@

//! Logic handling the intermediate representation of Avro values.

// `EnumKind` unconditionally introduces a lifetime.
// `EnumKind` unconditionally introduces a lifetime. TODO: remove this once
// https://github.com/rust-lang/rust-clippy/pull/9037 makes it into stable
#![allow(clippy::extra_unused_lifetimes)]

use std::collections::HashMap;
Expand Down
Loading