Skip to content

perf: Generate SQL with fewer CTEs #877

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Aug 6, 2024
135 changes: 79 additions & 56 deletions bigframes/core/compile/compiled.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import ibis.backends.bigquery as ibis_bigquery
import ibis.common.deferred # type: ignore
import ibis.expr.datatypes as ibis_dtypes
import ibis.expr.operations as ibis_ops
import ibis.expr.types as ibis_types
import pandas

Expand All @@ -36,7 +37,6 @@
from bigframes.core.ordering import (
ascending_over,
encode_order_string,
IntegerEncoding,
join_orderings,
OrderingExpression,
RowOrdering,
Expand Down Expand Up @@ -71,19 +71,16 @@ def __init__(
# Allow creating a DataFrame directly from an Ibis table expression.
# TODO(swast): Validate that each column references the same table (or
# no table for literal values).
self._columns = tuple(columns)
self._columns = tuple(
column.resolve(table)
# TODO(https://github.com/ibis-project/ibis/issues/7613): use
# public API to refer to Deferred type.
if isinstance(column, ibis.common.deferred.Deferred) else column
for column in columns
)
# To allow for more efficient lookup by column name, create a
# dictionary mapping names to column values.
self._column_names = {
(
column.resolve(table)
# TODO(https://github.com/ibis-project/ibis/issues/7613): use
# public API to refer to Deferred type.
if isinstance(column, ibis.common.deferred.Deferred)
else column
).get_name(): column
for column in self._columns
}
self._column_names = {column.get_name(): column for column in self._columns}

@property
def columns(self) -> typing.Tuple[ibis_types.Value, ...]:
Expand Down Expand Up @@ -139,10 +136,6 @@ def projection(
for expression, id in expression_id_pairs
]
result = self._select(tuple(values)) # type: ignore

# Need to reproject to convert ibis Scalar to ibis Column object
if any(exp_id[0].is_const for exp_id in expression_id_pairs):
result = result._reproject_to_table()
return result

@abc.abstractmethod
Expand Down Expand Up @@ -300,8 +293,6 @@ def _to_ibis_expr(
ArrayValue objects are sorted, so the following options are available
to reflect this in the ibis expression.

* "offset_col": Zero-based offsets are generated as a column, this will
not sort the rows however.
* "string_encoded": An ordered string column is provided in output table.
* "unordered": No ordering information will be provided in output. Only
value columns are projected.
Expand Down Expand Up @@ -355,6 +346,10 @@ def _to_ibis_expr(
return table

def filter(self, predicate: ex.Expression) -> UnorderedIR:
if any(map(is_window, map(self._get_ibis_column, predicate.unbound_variables))):
# ibis doesn't support qualify syntax, so create CTE if filtering over window expression
# https://github.com/ibis-project/ibis/issues/9775
return self._reproject_to_table().filter(predicate)
bindings = {col: self._get_ibis_column(col) for col in self.column_ids}
condition = op_compiler.compile_expression(predicate, bindings)
return self._filter(condition)
Expand Down Expand Up @@ -785,15 +780,33 @@ def promote_offsets(self, col_id: str) -> OrderedIR:
"""
# Special case: offsets already exist
ordering = self._ordering
# Case 1, already have offsets, just create column from them
if ordering.is_sequential and (ordering.total_order_col is not None):
expr_builder = self.builder()
expr_builder.columns = [
self._compile_expression(
ordering.total_order_col.scalar_expression
).name(col_id),
*self.columns,
]
return expr_builder.build()
# Cannot nest analytic expressions, so reproject to cte first if needed.
# Also ibis cannot window literals, so need to reproject those (even though this is legal in googlesql)
# Seee: https://github.com/ibis-project/ibis/issues/9773
can_directly_window = not any(
map(lambda x: is_literal(x) or is_window(x), self._ibis_order)
)
if not can_directly_window:
return self._reproject_to_table().promote_offsets(col_id)

if (not ordering.is_sequential) or (not ordering.total_order_col):
return self._project_offsets().promote_offsets(col_id)
window = ibis.window(order_by=self._ibis_order)
if self._predicates:
window = window.group_by(self._reduced_predicate)
offsets = ibis.row_number().over(window)
expr_builder = self.builder()
expr_builder.columns = [
self._compile_expression(ordering.total_order_col.scalar_expression).name(
col_id
),
*self.columns,
offsets.name(col_id),
]
return expr_builder.build()

Expand All @@ -806,7 +819,6 @@ def project_window_op(
output_name=None,
*,
never_skip_nulls=False,
skip_reproject_unsafe: bool = False,
) -> OrderedIR:
"""
Creates a new expression based on this expression with unary operation applied to one column.
Expand All @@ -815,8 +827,25 @@ def project_window_op(
window_spec: a specification of the window over which to apply the operator
output_name: the id to assign to the output of the operator, by default will replace input col if distinct output id not provided
never_skip_nulls: will disable null skipping for operators that would otherwise do so
skip_reproject_unsafe: skips the reprojection step, can be used when performing many non-dependent window operations, user responsible for not nesting window expressions, or using outputs as join, filter or aggregation keys before a reprojection
"""
# Cannot nest analytic expressions, so reproject to cte first if needed.
# Also ibis cannot window literals, so need to reproject those (even though this is legal in googlesql)
# See: https://github.com/ibis-project/ibis/issues/9773
used_exprs = map(
self._get_any_column, [column_name, *window_spec.all_referenced_columns]
)
can_directly_window = not any(
map(lambda x: is_literal(x) or is_window(x), used_exprs)
)
if not can_directly_window:
return self._reproject_to_table().project_window_op(
column_name,
op,
window_spec,
output_name,
never_skip_nulls=never_skip_nulls,
)

column = typing.cast(ibis_types.Column, self._get_ibis_column(column_name))
window = self._ibis_window_from_spec(
window_spec, require_total_order=op.uses_total_row_ordering
Expand Down Expand Up @@ -861,8 +890,7 @@ def project_window_op(
window_op = case_statement

result = self._set_or_replace_by_id(output_name or column_name, window_op)
# TODO(tbergeron): Automatically track analytic expression usage and defer reprojection until required for valid query generation.
return result._reproject_to_table() if not skip_reproject_unsafe else result
return result

def _reproject_to_table(self) -> OrderedIR:
table = self._to_ibis_expr(
Expand Down Expand Up @@ -944,7 +972,7 @@ def _to_ibis_expr(
expose_hidden_cols: bool = False,
fraction: Optional[float] = None,
col_id_overrides: typing.Mapping[str, str] = {},
ordering_mode: Literal["string_encoded", "offset_col", "unordered"],
ordering_mode: Literal["string_encoded", "unordered"],
order_col_name: Optional[str] = ORDER_ID_COLUMN,
):
"""
Expand All @@ -953,8 +981,7 @@ def _to_ibis_expr(
ArrayValue objects are sorted, so the following options are available
to reflect this in the ibis expression.

* "offset_col": Zero-based offsets are generated as a column, this will
not sort the rows however.

* "string_encoded": An ordered string column is provided in output table.
* "unordered": No ordering information will be provided in output. Only
value columns are projected.
Expand All @@ -981,10 +1008,9 @@ def _to_ibis_expr(
"""
assert ordering_mode in (
"string_encoded",
"offset_col",
"unordered",
)
if expose_hidden_cols and ordering_mode in ("ordered_col", "offset_col"):
if expose_hidden_cols and ordering_mode in ("ordered_col"):
raise ValueError(
f"Cannot expose hidden ordering columns with ordering_mode {ordering_mode}"
)
Expand Down Expand Up @@ -1034,6 +1060,10 @@ def _to_ibis_expr(
return table

def filter(self, predicate: ex.Expression) -> OrderedIR:
if any(map(is_window, map(self._get_ibis_column, predicate.unbound_variables))):
# ibis doesn't support qualify syntax, so create CTE if filtering over window expression
# https://github.com/ibis-project/ibis/issues/9775
return self._reproject_to_table().filter(predicate)
bindings = {col: self._get_ibis_column(col) for col in self.column_ids}
condition = op_compiler.compile_expression(predicate, bindings)
return self._filter(condition)
Expand Down Expand Up @@ -1174,37 +1204,14 @@ def _bake_ordering(self) -> OrderedIR:
predicates=self._predicates,
)

def _project_offsets(self) -> OrderedIR:
"""Create a new expression that contains offsets. Should only be executed when
offsets are needed for an operations. Has no effect on expression semantics."""
if self._ordering.is_sequential:
return self
table = self._to_ibis_expr(
ordering_mode="offset_col", order_col_name=ORDER_ID_COLUMN
)
columns = [table[column_name] for column_name in self._column_names]
ordering = TotalOrdering(
ordering_value_columns=tuple([ascending_over(ORDER_ID_COLUMN)]),
total_ordering_columns=frozenset([ORDER_ID_COLUMN]),
integer_encoding=IntegerEncoding(True, is_sequential=True),
)
return OrderedIR(
table,
columns=columns,
hidden_ordering_columns=[table[ORDER_ID_COLUMN]],
ordering=ordering,
)

def _create_order_columns(
self,
ordering_mode: str,
order_col_name: Optional[str],
expose_hidden_cols: bool,
) -> typing.Sequence[ibis_types.Value]:
# Generate offsets if current ordering id semantics are not sufficiently strict
if ordering_mode == "offset_col":
return (self._create_offset_column().name(order_col_name),)
elif ordering_mode == "string_encoded":
if ordering_mode == "string_encoded":
return (self._create_string_ordering_column().name(order_col_name),)
elif expose_hidden_cols:
return self._hidden_ordering_columns
Expand Down Expand Up @@ -1328,6 +1335,22 @@ def build(self) -> OrderedIR:
)


def is_literal(column: ibis_types.Value) -> bool:
# Unfortunately, Literals in ibis are not "Columns"s and therefore can't be aggregated.
return not isinstance(column, ibis_types.Column)


def is_window(column: ibis_types.Value) -> bool:
matches = (
(column)
.op()
.find_topmost(
lambda x: isinstance(x, (ibis_ops.WindowFunction, ibis_ops.Relation))
)
)
return any(isinstance(op, ibis_ops.WindowFunction) for op in matches)


def _reduce_predicate_list(
predicate_list: typing.Collection[ibis_types.BooleanValue],
) -> ibis_types.BooleanValue:
Expand Down
1 change: 0 additions & 1 deletion bigframes/core/compile/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,6 @@ def compile_window(self, node: nodes.WindowOpNode, ordered: bool = True):
node.window_spec,
node.output_name,
never_skip_nulls=node.never_skip_nulls,
skip_reproject_unsafe=node.skip_reproject_unsafe,
)
return result if ordered else result.to_unordered()

Expand Down
13 changes: 12 additions & 1 deletion bigframes/core/window_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
from __future__ import annotations

from dataclasses import dataclass
from typing import Optional, Tuple, Union
import itertools
from typing import Optional, Set, Tuple, Union

import bigframes.core.ordering as orderings

Expand Down Expand Up @@ -162,3 +163,13 @@ def row_bounded(self):
to calculate deterministically.
"""
return isinstance(self.bounds, RowsWindowBounds)

@property
def all_referenced_columns(self) -> Set[str]:
"""
Return list of all variables reference ind the window.
"""
ordering_vars = itertools.chain.from_iterable(
item.scalar_expression.unbound_variables for item in self.ordering
)
return set(itertools.chain(self.grouping_keys, ordering_vars))
3 changes: 2 additions & 1 deletion tests/unit/session/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,8 @@ def test_default_index_warning_not_raised_by_read_gbq_index_col_sequential_int64
index_col=bigframes.enums.DefaultIndexKind.SEQUENTIAL_INT64,
)

# We expect a window operation because we specificaly requested a sequential index.
# We expect a window operation because we specificaly requested a sequential index and named it.
df.index.name = "named_index"
generated_sql = df.sql.casefold()
assert "OVER".casefold() in generated_sql
assert "ROW_NUMBER()".casefold() in generated_sql
Expand Down