diff --git a/bigframes/core/compile/compiled.py b/bigframes/core/compile/compiled.py index 538789f9d7..cae527931c 100644 --- a/bigframes/core/compile/compiled.py +++ b/bigframes/core/compile/compiled.py @@ -24,6 +24,7 @@ import ibis.backends.bigquery as ibis_bigquery import ibis.common.deferred # type: ignore import ibis.expr.datatypes as ibis_dtypes +import ibis.expr.operations as ibis_ops import ibis.expr.types as ibis_types import pandas @@ -36,7 +37,6 @@ from bigframes.core.ordering import ( ascending_over, encode_order_string, - IntegerEncoding, join_orderings, OrderingExpression, RowOrdering, @@ -71,19 +71,16 @@ def __init__( # Allow creating a DataFrame directly from an Ibis table expression. # TODO(swast): Validate that each column references the same table (or # no table for literal values). - self._columns = tuple(columns) + self._columns = tuple( + column.resolve(table) + # TODO(https://github.com/ibis-project/ibis/issues/7613): use + # public API to refer to Deferred type. + if isinstance(column, ibis.common.deferred.Deferred) else column + for column in columns + ) # To allow for more efficient lookup by column name, create a # dictionary mapping names to column values. - self._column_names = { - ( - column.resolve(table) - # TODO(https://github.com/ibis-project/ibis/issues/7613): use - # public API to refer to Deferred type. - if isinstance(column, ibis.common.deferred.Deferred) - else column - ).get_name(): column - for column in self._columns - } + self._column_names = {column.get_name(): column for column in self._columns} @property def columns(self) -> typing.Tuple[ibis_types.Value, ...]: @@ -139,10 +136,6 @@ def projection( for expression, id in expression_id_pairs ] result = self._select(tuple(values)) # type: ignore - - # Need to reproject to convert ibis Scalar to ibis Column object - if any(exp_id[0].is_const for exp_id in expression_id_pairs): - result = result._reproject_to_table() return result @abc.abstractmethod @@ -300,8 +293,6 @@ def _to_ibis_expr( ArrayValue objects are sorted, so the following options are available to reflect this in the ibis expression. - * "offset_col": Zero-based offsets are generated as a column, this will - not sort the rows however. * "string_encoded": An ordered string column is provided in output table. * "unordered": No ordering information will be provided in output. Only value columns are projected. @@ -355,6 +346,10 @@ def _to_ibis_expr( return table def filter(self, predicate: ex.Expression) -> UnorderedIR: + if any(map(is_window, map(self._get_ibis_column, predicate.unbound_variables))): + # ibis doesn't support qualify syntax, so create CTE if filtering over window expression + # https://github.com/ibis-project/ibis/issues/9775 + return self._reproject_to_table().filter(predicate) bindings = {col: self._get_ibis_column(col) for col in self.column_ids} condition = op_compiler.compile_expression(predicate, bindings) return self._filter(condition) @@ -785,15 +780,33 @@ def promote_offsets(self, col_id: str) -> OrderedIR: """ # Special case: offsets already exist ordering = self._ordering + # Case 1, already have offsets, just create column from them + if ordering.is_sequential and (ordering.total_order_col is not None): + expr_builder = self.builder() + expr_builder.columns = [ + self._compile_expression( + ordering.total_order_col.scalar_expression + ).name(col_id), + *self.columns, + ] + return expr_builder.build() + # Cannot nest analytic expressions, so reproject to cte first if needed. + # Also ibis cannot window literals, so need to reproject those (even though this is legal in googlesql) + # Seee: https://github.com/ibis-project/ibis/issues/9773 + can_directly_window = not any( + map(lambda x: is_literal(x) or is_window(x), self._ibis_order) + ) + if not can_directly_window: + return self._reproject_to_table().promote_offsets(col_id) - if (not ordering.is_sequential) or (not ordering.total_order_col): - return self._project_offsets().promote_offsets(col_id) + window = ibis.window(order_by=self._ibis_order) + if self._predicates: + window = window.group_by(self._reduced_predicate) + offsets = ibis.row_number().over(window) expr_builder = self.builder() expr_builder.columns = [ - self._compile_expression(ordering.total_order_col.scalar_expression).name( - col_id - ), *self.columns, + offsets.name(col_id), ] return expr_builder.build() @@ -806,7 +819,6 @@ def project_window_op( output_name=None, *, never_skip_nulls=False, - skip_reproject_unsafe: bool = False, ) -> OrderedIR: """ Creates a new expression based on this expression with unary operation applied to one column. @@ -815,8 +827,25 @@ def project_window_op( window_spec: a specification of the window over which to apply the operator output_name: the id to assign to the output of the operator, by default will replace input col if distinct output id not provided never_skip_nulls: will disable null skipping for operators that would otherwise do so - skip_reproject_unsafe: skips the reprojection step, can be used when performing many non-dependent window operations, user responsible for not nesting window expressions, or using outputs as join, filter or aggregation keys before a reprojection """ + # Cannot nest analytic expressions, so reproject to cte first if needed. + # Also ibis cannot window literals, so need to reproject those (even though this is legal in googlesql) + # See: https://github.com/ibis-project/ibis/issues/9773 + used_exprs = map( + self._get_any_column, [column_name, *window_spec.all_referenced_columns] + ) + can_directly_window = not any( + map(lambda x: is_literal(x) or is_window(x), used_exprs) + ) + if not can_directly_window: + return self._reproject_to_table().project_window_op( + column_name, + op, + window_spec, + output_name, + never_skip_nulls=never_skip_nulls, + ) + column = typing.cast(ibis_types.Column, self._get_ibis_column(column_name)) window = self._ibis_window_from_spec( window_spec, require_total_order=op.uses_total_row_ordering @@ -861,8 +890,7 @@ def project_window_op( window_op = case_statement result = self._set_or_replace_by_id(output_name or column_name, window_op) - # TODO(tbergeron): Automatically track analytic expression usage and defer reprojection until required for valid query generation. - return result._reproject_to_table() if not skip_reproject_unsafe else result + return result def _reproject_to_table(self) -> OrderedIR: table = self._to_ibis_expr( @@ -944,7 +972,7 @@ def _to_ibis_expr( expose_hidden_cols: bool = False, fraction: Optional[float] = None, col_id_overrides: typing.Mapping[str, str] = {}, - ordering_mode: Literal["string_encoded", "offset_col", "unordered"], + ordering_mode: Literal["string_encoded", "unordered"], order_col_name: Optional[str] = ORDER_ID_COLUMN, ): """ @@ -953,8 +981,7 @@ def _to_ibis_expr( ArrayValue objects are sorted, so the following options are available to reflect this in the ibis expression. - * "offset_col": Zero-based offsets are generated as a column, this will - not sort the rows however. + * "string_encoded": An ordered string column is provided in output table. * "unordered": No ordering information will be provided in output. Only value columns are projected. @@ -981,10 +1008,9 @@ def _to_ibis_expr( """ assert ordering_mode in ( "string_encoded", - "offset_col", "unordered", ) - if expose_hidden_cols and ordering_mode in ("ordered_col", "offset_col"): + if expose_hidden_cols and ordering_mode in ("ordered_col"): raise ValueError( f"Cannot expose hidden ordering columns with ordering_mode {ordering_mode}" ) @@ -1034,6 +1060,10 @@ def _to_ibis_expr( return table def filter(self, predicate: ex.Expression) -> OrderedIR: + if any(map(is_window, map(self._get_ibis_column, predicate.unbound_variables))): + # ibis doesn't support qualify syntax, so create CTE if filtering over window expression + # https://github.com/ibis-project/ibis/issues/9775 + return self._reproject_to_table().filter(predicate) bindings = {col: self._get_ibis_column(col) for col in self.column_ids} condition = op_compiler.compile_expression(predicate, bindings) return self._filter(condition) @@ -1174,27 +1204,6 @@ def _bake_ordering(self) -> OrderedIR: predicates=self._predicates, ) - def _project_offsets(self) -> OrderedIR: - """Create a new expression that contains offsets. Should only be executed when - offsets are needed for an operations. Has no effect on expression semantics.""" - if self._ordering.is_sequential: - return self - table = self._to_ibis_expr( - ordering_mode="offset_col", order_col_name=ORDER_ID_COLUMN - ) - columns = [table[column_name] for column_name in self._column_names] - ordering = TotalOrdering( - ordering_value_columns=tuple([ascending_over(ORDER_ID_COLUMN)]), - total_ordering_columns=frozenset([ORDER_ID_COLUMN]), - integer_encoding=IntegerEncoding(True, is_sequential=True), - ) - return OrderedIR( - table, - columns=columns, - hidden_ordering_columns=[table[ORDER_ID_COLUMN]], - ordering=ordering, - ) - def _create_order_columns( self, ordering_mode: str, @@ -1202,9 +1211,7 @@ def _create_order_columns( expose_hidden_cols: bool, ) -> typing.Sequence[ibis_types.Value]: # Generate offsets if current ordering id semantics are not sufficiently strict - if ordering_mode == "offset_col": - return (self._create_offset_column().name(order_col_name),) - elif ordering_mode == "string_encoded": + if ordering_mode == "string_encoded": return (self._create_string_ordering_column().name(order_col_name),) elif expose_hidden_cols: return self._hidden_ordering_columns @@ -1328,6 +1335,22 @@ def build(self) -> OrderedIR: ) +def is_literal(column: ibis_types.Value) -> bool: + # Unfortunately, Literals in ibis are not "Columns"s and therefore can't be aggregated. + return not isinstance(column, ibis_types.Column) + + +def is_window(column: ibis_types.Value) -> bool: + matches = ( + (column) + .op() + .find_topmost( + lambda x: isinstance(x, (ibis_ops.WindowFunction, ibis_ops.Relation)) + ) + ) + return any(isinstance(op, ibis_ops.WindowFunction) for op in matches) + + def _reduce_predicate_list( predicate_list: typing.Collection[ibis_types.BooleanValue], ) -> ibis_types.BooleanValue: diff --git a/bigframes/core/compile/compiler.py b/bigframes/core/compile/compiler.py index c7f8c5ab59..8fb1f7ab3a 100644 --- a/bigframes/core/compile/compiler.py +++ b/bigframes/core/compile/compiler.py @@ -304,7 +304,6 @@ def compile_window(self, node: nodes.WindowOpNode, ordered: bool = True): node.window_spec, node.output_name, never_skip_nulls=node.never_skip_nulls, - skip_reproject_unsafe=node.skip_reproject_unsafe, ) return result if ordered else result.to_unordered() diff --git a/bigframes/core/window_spec.py b/bigframes/core/window_spec.py index 57c57b451a..f011e2848d 100644 --- a/bigframes/core/window_spec.py +++ b/bigframes/core/window_spec.py @@ -14,7 +14,8 @@ from __future__ import annotations from dataclasses import dataclass -from typing import Optional, Tuple, Union +import itertools +from typing import Optional, Set, Tuple, Union import bigframes.core.ordering as orderings @@ -162,3 +163,13 @@ def row_bounded(self): to calculate deterministically. """ return isinstance(self.bounds, RowsWindowBounds) + + @property + def all_referenced_columns(self) -> Set[str]: + """ + Return list of all variables reference ind the window. + """ + ordering_vars = itertools.chain.from_iterable( + item.scalar_expression.unbound_variables for item in self.ordering + ) + return set(itertools.chain(self.grouping_keys, ordering_vars)) diff --git a/tests/unit/session/test_session.py b/tests/unit/session/test_session.py index 31029abd67..2f7eaa567a 100644 --- a/tests/unit/session/test_session.py +++ b/tests/unit/session/test_session.py @@ -246,7 +246,8 @@ def test_default_index_warning_not_raised_by_read_gbq_index_col_sequential_int64 index_col=bigframes.enums.DefaultIndexKind.SEQUENTIAL_INT64, ) - # We expect a window operation because we specificaly requested a sequential index. + # We expect a window operation because we specificaly requested a sequential index and named it. + df.index.name = "named_index" generated_sql = df.sql.casefold() assert "OVER".casefold() in generated_sql assert "ROW_NUMBER()".casefold() in generated_sql