Skip to content

Commit eb60804

Browse files
perf: Generate SQL with fewer CTEs (#877)
1 parent 6e6f9df commit eb60804

File tree

4 files changed

+93
-59
lines changed

4 files changed

+93
-59
lines changed

bigframes/core/compile/compiled.py

+79-56
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import ibis.backends.bigquery as ibis_bigquery
2525
import ibis.common.deferred # type: ignore
2626
import ibis.expr.datatypes as ibis_dtypes
27+
import ibis.expr.operations as ibis_ops
2728
import ibis.expr.types as ibis_types
2829
import pandas
2930

@@ -36,7 +37,6 @@
3637
from bigframes.core.ordering import (
3738
ascending_over,
3839
encode_order_string,
39-
IntegerEncoding,
4040
join_orderings,
4141
OrderingExpression,
4242
RowOrdering,
@@ -71,19 +71,16 @@ def __init__(
7171
# Allow creating a DataFrame directly from an Ibis table expression.
7272
# TODO(swast): Validate that each column references the same table (or
7373
# no table for literal values).
74-
self._columns = tuple(columns)
74+
self._columns = tuple(
75+
column.resolve(table)
76+
# TODO(https://github.com/ibis-project/ibis/issues/7613): use
77+
# public API to refer to Deferred type.
78+
if isinstance(column, ibis.common.deferred.Deferred) else column
79+
for column in columns
80+
)
7581
# To allow for more efficient lookup by column name, create a
7682
# dictionary mapping names to column values.
77-
self._column_names = {
78-
(
79-
column.resolve(table)
80-
# TODO(https://github.com/ibis-project/ibis/issues/7613): use
81-
# public API to refer to Deferred type.
82-
if isinstance(column, ibis.common.deferred.Deferred)
83-
else column
84-
).get_name(): column
85-
for column in self._columns
86-
}
83+
self._column_names = {column.get_name(): column for column in self._columns}
8784

8885
@property
8986
def columns(self) -> typing.Tuple[ibis_types.Value, ...]:
@@ -139,10 +136,6 @@ def projection(
139136
for expression, id in expression_id_pairs
140137
]
141138
result = self._select(tuple(values)) # type: ignore
142-
143-
# Need to reproject to convert ibis Scalar to ibis Column object
144-
if any(exp_id[0].is_const for exp_id in expression_id_pairs):
145-
result = result._reproject_to_table()
146139
return result
147140

148141
@abc.abstractmethod
@@ -300,8 +293,6 @@ def _to_ibis_expr(
300293
ArrayValue objects are sorted, so the following options are available
301294
to reflect this in the ibis expression.
302295
303-
* "offset_col": Zero-based offsets are generated as a column, this will
304-
not sort the rows however.
305296
* "string_encoded": An ordered string column is provided in output table.
306297
* "unordered": No ordering information will be provided in output. Only
307298
value columns are projected.
@@ -355,6 +346,10 @@ def _to_ibis_expr(
355346
return table
356347

357348
def filter(self, predicate: ex.Expression) -> UnorderedIR:
349+
if any(map(is_window, map(self._get_ibis_column, predicate.unbound_variables))):
350+
# ibis doesn't support qualify syntax, so create CTE if filtering over window expression
351+
# https://github.com/ibis-project/ibis/issues/9775
352+
return self._reproject_to_table().filter(predicate)
358353
bindings = {col: self._get_ibis_column(col) for col in self.column_ids}
359354
condition = op_compiler.compile_expression(predicate, bindings)
360355
return self._filter(condition)
@@ -785,15 +780,33 @@ def promote_offsets(self, col_id: str) -> OrderedIR:
785780
"""
786781
# Special case: offsets already exist
787782
ordering = self._ordering
783+
# Case 1, already have offsets, just create column from them
784+
if ordering.is_sequential and (ordering.total_order_col is not None):
785+
expr_builder = self.builder()
786+
expr_builder.columns = [
787+
self._compile_expression(
788+
ordering.total_order_col.scalar_expression
789+
).name(col_id),
790+
*self.columns,
791+
]
792+
return expr_builder.build()
793+
# Cannot nest analytic expressions, so reproject to cte first if needed.
794+
# Also ibis cannot window literals, so need to reproject those (even though this is legal in googlesql)
795+
# Seee: https://github.com/ibis-project/ibis/issues/9773
796+
can_directly_window = not any(
797+
map(lambda x: is_literal(x) or is_window(x), self._ibis_order)
798+
)
799+
if not can_directly_window:
800+
return self._reproject_to_table().promote_offsets(col_id)
788801

789-
if (not ordering.is_sequential) or (not ordering.total_order_col):
790-
return self._project_offsets().promote_offsets(col_id)
802+
window = ibis.window(order_by=self._ibis_order)
803+
if self._predicates:
804+
window = window.group_by(self._reduced_predicate)
805+
offsets = ibis.row_number().over(window)
791806
expr_builder = self.builder()
792807
expr_builder.columns = [
793-
self._compile_expression(ordering.total_order_col.scalar_expression).name(
794-
col_id
795-
),
796808
*self.columns,
809+
offsets.name(col_id),
797810
]
798811
return expr_builder.build()
799812

@@ -806,7 +819,6 @@ def project_window_op(
806819
output_name=None,
807820
*,
808821
never_skip_nulls=False,
809-
skip_reproject_unsafe: bool = False,
810822
) -> OrderedIR:
811823
"""
812824
Creates a new expression based on this expression with unary operation applied to one column.
@@ -815,8 +827,25 @@ def project_window_op(
815827
window_spec: a specification of the window over which to apply the operator
816828
output_name: the id to assign to the output of the operator, by default will replace input col if distinct output id not provided
817829
never_skip_nulls: will disable null skipping for operators that would otherwise do so
818-
skip_reproject_unsafe: skips the reprojection step, can be used when performing many non-dependent window operations, user responsible for not nesting window expressions, or using outputs as join, filter or aggregation keys before a reprojection
819830
"""
831+
# Cannot nest analytic expressions, so reproject to cte first if needed.
832+
# Also ibis cannot window literals, so need to reproject those (even though this is legal in googlesql)
833+
# See: https://github.com/ibis-project/ibis/issues/9773
834+
used_exprs = map(
835+
self._get_any_column, [column_name, *window_spec.all_referenced_columns]
836+
)
837+
can_directly_window = not any(
838+
map(lambda x: is_literal(x) or is_window(x), used_exprs)
839+
)
840+
if not can_directly_window:
841+
return self._reproject_to_table().project_window_op(
842+
column_name,
843+
op,
844+
window_spec,
845+
output_name,
846+
never_skip_nulls=never_skip_nulls,
847+
)
848+
820849
column = typing.cast(ibis_types.Column, self._get_ibis_column(column_name))
821850
window = self._ibis_window_from_spec(
822851
window_spec, require_total_order=op.uses_total_row_ordering
@@ -861,8 +890,7 @@ def project_window_op(
861890
window_op = case_statement
862891

863892
result = self._set_or_replace_by_id(output_name or column_name, window_op)
864-
# TODO(tbergeron): Automatically track analytic expression usage and defer reprojection until required for valid query generation.
865-
return result._reproject_to_table() if not skip_reproject_unsafe else result
893+
return result
866894

867895
def _reproject_to_table(self) -> OrderedIR:
868896
table = self._to_ibis_expr(
@@ -944,7 +972,7 @@ def _to_ibis_expr(
944972
expose_hidden_cols: bool = False,
945973
fraction: Optional[float] = None,
946974
col_id_overrides: typing.Mapping[str, str] = {},
947-
ordering_mode: Literal["string_encoded", "offset_col", "unordered"],
975+
ordering_mode: Literal["string_encoded", "unordered"],
948976
order_col_name: Optional[str] = ORDER_ID_COLUMN,
949977
):
950978
"""
@@ -953,8 +981,7 @@ def _to_ibis_expr(
953981
ArrayValue objects are sorted, so the following options are available
954982
to reflect this in the ibis expression.
955983
956-
* "offset_col": Zero-based offsets are generated as a column, this will
957-
not sort the rows however.
984+
958985
* "string_encoded": An ordered string column is provided in output table.
959986
* "unordered": No ordering information will be provided in output. Only
960987
value columns are projected.
@@ -981,10 +1008,9 @@ def _to_ibis_expr(
9811008
"""
9821009
assert ordering_mode in (
9831010
"string_encoded",
984-
"offset_col",
9851011
"unordered",
9861012
)
987-
if expose_hidden_cols and ordering_mode in ("ordered_col", "offset_col"):
1013+
if expose_hidden_cols and ordering_mode in ("ordered_col"):
9881014
raise ValueError(
9891015
f"Cannot expose hidden ordering columns with ordering_mode {ordering_mode}"
9901016
)
@@ -1034,6 +1060,10 @@ def _to_ibis_expr(
10341060
return table
10351061

10361062
def filter(self, predicate: ex.Expression) -> OrderedIR:
1063+
if any(map(is_window, map(self._get_ibis_column, predicate.unbound_variables))):
1064+
# ibis doesn't support qualify syntax, so create CTE if filtering over window expression
1065+
# https://github.com/ibis-project/ibis/issues/9775
1066+
return self._reproject_to_table().filter(predicate)
10371067
bindings = {col: self._get_ibis_column(col) for col in self.column_ids}
10381068
condition = op_compiler.compile_expression(predicate, bindings)
10391069
return self._filter(condition)
@@ -1174,37 +1204,14 @@ def _bake_ordering(self) -> OrderedIR:
11741204
predicates=self._predicates,
11751205
)
11761206

1177-
def _project_offsets(self) -> OrderedIR:
1178-
"""Create a new expression that contains offsets. Should only be executed when
1179-
offsets are needed for an operations. Has no effect on expression semantics."""
1180-
if self._ordering.is_sequential:
1181-
return self
1182-
table = self._to_ibis_expr(
1183-
ordering_mode="offset_col", order_col_name=ORDER_ID_COLUMN
1184-
)
1185-
columns = [table[column_name] for column_name in self._column_names]
1186-
ordering = TotalOrdering(
1187-
ordering_value_columns=tuple([ascending_over(ORDER_ID_COLUMN)]),
1188-
total_ordering_columns=frozenset([ORDER_ID_COLUMN]),
1189-
integer_encoding=IntegerEncoding(True, is_sequential=True),
1190-
)
1191-
return OrderedIR(
1192-
table,
1193-
columns=columns,
1194-
hidden_ordering_columns=[table[ORDER_ID_COLUMN]],
1195-
ordering=ordering,
1196-
)
1197-
11981207
def _create_order_columns(
11991208
self,
12001209
ordering_mode: str,
12011210
order_col_name: Optional[str],
12021211
expose_hidden_cols: bool,
12031212
) -> typing.Sequence[ibis_types.Value]:
12041213
# Generate offsets if current ordering id semantics are not sufficiently strict
1205-
if ordering_mode == "offset_col":
1206-
return (self._create_offset_column().name(order_col_name),)
1207-
elif ordering_mode == "string_encoded":
1214+
if ordering_mode == "string_encoded":
12081215
return (self._create_string_ordering_column().name(order_col_name),)
12091216
elif expose_hidden_cols:
12101217
return self._hidden_ordering_columns
@@ -1328,6 +1335,22 @@ def build(self) -> OrderedIR:
13281335
)
13291336

13301337

1338+
def is_literal(column: ibis_types.Value) -> bool:
1339+
# Unfortunately, Literals in ibis are not "Columns"s and therefore can't be aggregated.
1340+
return not isinstance(column, ibis_types.Column)
1341+
1342+
1343+
def is_window(column: ibis_types.Value) -> bool:
1344+
matches = (
1345+
(column)
1346+
.op()
1347+
.find_topmost(
1348+
lambda x: isinstance(x, (ibis_ops.WindowFunction, ibis_ops.Relation))
1349+
)
1350+
)
1351+
return any(isinstance(op, ibis_ops.WindowFunction) for op in matches)
1352+
1353+
13311354
def _reduce_predicate_list(
13321355
predicate_list: typing.Collection[ibis_types.BooleanValue],
13331356
) -> ibis_types.BooleanValue:

bigframes/core/compile/compiler.py

-1
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,6 @@ def compile_window(self, node: nodes.WindowOpNode, ordered: bool = True):
304304
node.window_spec,
305305
node.output_name,
306306
never_skip_nulls=node.never_skip_nulls,
307-
skip_reproject_unsafe=node.skip_reproject_unsafe,
308307
)
309308
return result if ordered else result.to_unordered()
310309

bigframes/core/window_spec.py

+12-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@
1414
from __future__ import annotations
1515

1616
from dataclasses import dataclass
17-
from typing import Optional, Tuple, Union
17+
import itertools
18+
from typing import Optional, Set, Tuple, Union
1819

1920
import bigframes.core.ordering as orderings
2021

@@ -162,3 +163,13 @@ def row_bounded(self):
162163
to calculate deterministically.
163164
"""
164165
return isinstance(self.bounds, RowsWindowBounds)
166+
167+
@property
168+
def all_referenced_columns(self) -> Set[str]:
169+
"""
170+
Return list of all variables reference ind the window.
171+
"""
172+
ordering_vars = itertools.chain.from_iterable(
173+
item.scalar_expression.unbound_variables for item in self.ordering
174+
)
175+
return set(itertools.chain(self.grouping_keys, ordering_vars))

tests/unit/session/test_session.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,8 @@ def test_default_index_warning_not_raised_by_read_gbq_index_col_sequential_int64
246246
index_col=bigframes.enums.DefaultIndexKind.SEQUENTIAL_INT64,
247247
)
248248

249-
# We expect a window operation because we specificaly requested a sequential index.
249+
# We expect a window operation because we specificaly requested a sequential index and named it.
250+
df.index.name = "named_index"
250251
generated_sql = df.sql.casefold()
251252
assert "OVER".casefold() in generated_sql
252253
assert "ROW_NUMBER()".casefold() in generated_sql

0 commit comments

Comments
 (0)