Skip to content

Commit fff9408

Browse files
perf: Generate SQL with fewer CTEs
1 parent 30aaae5 commit fff9408

File tree

3 files changed

+63
-21
lines changed

3 files changed

+63
-21
lines changed

bigframes/core/compile/compiled.py

Lines changed: 50 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import ibis.backends.bigquery as ibis_bigquery
2525
import ibis.common.deferred # type: ignore
2626
import ibis.expr.datatypes as ibis_dtypes
27+
import ibis.expr.operations as ibis_ops
2728
import ibis.expr.types as ibis_types
2829
import pandas
2930

@@ -71,19 +72,16 @@ def __init__(
7172
# Allow creating a DataFrame directly from an Ibis table expression.
7273
# TODO(swast): Validate that each column references the same table (or
7374
# no table for literal values).
74-
self._columns = tuple(columns)
75+
self._columns = tuple(
76+
column.resolve(table)
77+
# TODO(https://github.com/ibis-project/ibis/issues/7613): use
78+
# public API to refer to Deferred type.
79+
if isinstance(column, ibis.common.deferred.Deferred) else column
80+
for column in columns
81+
)
7582
# To allow for more efficient lookup by column name, create a
7683
# dictionary mapping names to column values.
77-
self._column_names = {
78-
(
79-
column.resolve(table)
80-
# TODO(https://github.com/ibis-project/ibis/issues/7613): use
81-
# public API to refer to Deferred type.
82-
if isinstance(column, ibis.common.deferred.Deferred)
83-
else column
84-
).get_name(): column
85-
for column in self._columns
86-
}
84+
self._column_names = {column.get_name(): column for column in self._columns}
8785

8886
@property
8987
def columns(self) -> typing.Tuple[ibis_types.Value, ...]:
@@ -139,10 +137,6 @@ def projection(
139137
for expression, id in expression_id_pairs
140138
]
141139
result = self._select(tuple(values)) # type: ignore
142-
143-
# Need to reproject to convert ibis Scalar to ibis Column object
144-
if any(exp_id[0].is_const for exp_id in expression_id_pairs):
145-
result = result._reproject_to_table()
146140
return result
147141

148142
@abc.abstractmethod
@@ -166,6 +160,13 @@ def _get_ibis_column(self, key: str) -> ibis_types.Value:
166160
),
167161
)
168162

163+
def is_scalar_expr(self, key: str) -> bool:
164+
# sometimes need to determine if column expression is a scalar expression.
165+
# For instance, cannot filter on an analytic expression, or nest analytic expressions.
166+
# Literals are excluded because ibis itself doesn't work well with them, not because of sql limitations.
167+
ibis_expr = self._get_ibis_column(key)
168+
return not is_literal(ibis_expr) and not is_window(ibis_expr)
169+
169170
def get_column_type(self, key: str) -> bigframes.dtypes.Dtype:
170171
ibis_type = typing.cast(
171172
bigframes.core.compile.ibis_types.IbisDtype,
@@ -355,6 +356,9 @@ def _to_ibis_expr(
355356
return table
356357

357358
def filter(self, predicate: ex.Expression) -> UnorderedIR:
359+
if any(map(is_window, map(self._get_ibis_column, predicate.unbound_variables))):
360+
# ibis doesn't support qualify syntax
361+
return self._reproject_to_table().filter(predicate)
358362
bindings = {col: self._get_ibis_column(col) for col in self.column_ids}
359363
condition = op_compiler.compile_expression(predicate, bindings)
360364
return self._filter(condition)
@@ -806,7 +810,6 @@ def project_window_op(
806810
output_name=None,
807811
*,
808812
never_skip_nulls=False,
809-
skip_reproject_unsafe: bool = False,
810813
) -> OrderedIR:
811814
"""
812815
Creates a new expression based on this expression with unary operation applied to one column.
@@ -815,8 +818,18 @@ def project_window_op(
815818
window_spec: a specification of the window over which to apply the operator
816819
output_name: the id to assign to the output of the operator, by default will replace input col if distinct output id not provided
817820
never_skip_nulls: will disable null skipping for operators that would otherwise do so
818-
skip_reproject_unsafe: skips the reprojection step, can be used when performing many non-dependent window operations, user responsible for not nesting window expressions, or using outputs as join, filter or aggregation keys before a reprojection
819821
"""
822+
used_vars = [column_name, *window_spec.all_referenced_columns]
823+
# Cannot nest analytic expressions, so reproject to cte first if needed.
824+
if not all(map(self.is_scalar_expr, used_vars)):
825+
return self._reproject_to_table().project_window_op(
826+
column_name,
827+
op,
828+
window_spec,
829+
output_name,
830+
never_skip_nulls=never_skip_nulls,
831+
)
832+
820833
column = typing.cast(ibis_types.Column, self._get_ibis_column(column_name))
821834
window = self._ibis_window_from_spec(
822835
window_spec, require_total_order=op.uses_total_row_ordering
@@ -861,8 +874,7 @@ def project_window_op(
861874
window_op = case_statement
862875

863876
result = self._set_or_replace_by_id(output_name or column_name, window_op)
864-
# TODO(tbergeron): Automatically track analytic expression usage and defer reprojection until required for valid query generation.
865-
return result._reproject_to_table() if not skip_reproject_unsafe else result
877+
return result
866878

867879
def _reproject_to_table(self) -> OrderedIR:
868880
table = self._to_ibis_expr(
@@ -1034,6 +1046,9 @@ def _to_ibis_expr(
10341046
return table
10351047

10361048
def filter(self, predicate: ex.Expression) -> OrderedIR:
1049+
if any(map(is_window, map(self._get_ibis_column, predicate.unbound_variables))):
1050+
# ibis doesn't support qualify syntax
1051+
return self._reproject_to_table().filter(predicate)
10371052
bindings = {col: self._get_ibis_column(col) for col in self.column_ids}
10381053
condition = op_compiler.compile_expression(predicate, bindings)
10391054
return self._filter(condition)
@@ -1328,6 +1343,22 @@ def build(self) -> OrderedIR:
13281343
)
13291344

13301345

1346+
def is_literal(column: ibis_types.Value) -> bool:
1347+
# Unfortunately, Literals in ibis are not "Columns"s and therefore can't be aggregated.
1348+
return not isinstance(column, ibis_types.Column)
1349+
1350+
1351+
def is_window(column: ibis_types.Value) -> bool:
1352+
matches = (
1353+
(column)
1354+
.op()
1355+
.find_topmost(
1356+
lambda x: isinstance(x, (ibis_ops.WindowFunction, ibis_ops.Relation))
1357+
)
1358+
)
1359+
return any(isinstance(op, ibis_ops.WindowFunction) for op in matches)
1360+
1361+
13311362
def _reduce_predicate_list(
13321363
predicate_list: typing.Collection[ibis_types.BooleanValue],
13331364
) -> ibis_types.BooleanValue:

bigframes/core/compile/compiler.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,6 @@ def compile_window(self, node: nodes.WindowOpNode, ordered: bool = True):
304304
node.window_spec,
305305
node.output_name,
306306
never_skip_nulls=node.never_skip_nulls,
307-
skip_reproject_unsafe=node.skip_reproject_unsafe,
308307
)
309308
return result if ordered else result.to_unordered()
310309

bigframes/core/window_spec.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@
1414
from __future__ import annotations
1515

1616
from dataclasses import dataclass
17-
from typing import Optional, Tuple, Union
17+
import itertools
18+
from typing import Optional, Set, Tuple, Union
1819

1920
import bigframes.core.ordering as orderings
2021

@@ -162,3 +163,14 @@ def row_bounded(self):
162163
to calculate deterministically.
163164
"""
164165
return isinstance(self.bounds, RowsWindowBounds)
166+
167+
@property
168+
def all_referenced_columns(self) -> Set[str]:
169+
"""
170+
Return list of all variables reference ind the window.
171+
"""
172+
ordering_vars = itertools.chain.from_iterable(
173+
item.scalar_expression.unbound_variables for item in self.ordering
174+
)
175+
itertools.chain(self.grouping_keys, ordering_vars)
176+
return set(itertools.chain(self.grouping_keys, ordering_vars))

0 commit comments

Comments
 (0)