Skip to content
24 changes: 19 additions & 5 deletions bigframes/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,15 @@ def promote_offsets(self, col_id: str) -> ArrayValue:
"""
Convenience function to promote copy of column offsets to a value column. Can be used to reset index.
"""
if self.node.order_ambiguous and not self.session._strictly_ordered:
raise ValueError("Generating offsets not supported in unordered mode")
if self.node.order_ambiguous and not (self.session._strictly_ordered):
if not self.session.allow_ambiguity:
raise ValueError("Generating offsets not supported in unordered mode")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: partial ordered mode

Let's try to be consistent with our naming of the feature.

else:
warnings.warn(
"Window ordering may be ambiguous, this can cause unstable results.",
bigframes.exceptions.AmbiguousWindowWarning,
)

return ArrayValue(nodes.PromoteOffsetsNode(child=self.node, col_id=col_id))

def concat(self, other: typing.Sequence[ArrayValue]) -> ArrayValue:
Expand Down Expand Up @@ -347,9 +354,16 @@ def project_window_op(
# TODO: Support non-deterministic windowing
if window_spec.row_bounded or not op.order_independent:
if self.node.order_ambiguous and not self.session._strictly_ordered:
raise ValueError(
"Order-dependent windowed ops not supported in unordered mode"
)
if not self.session.allow_ambiguity:
raise ValueError(
"Generating offsets not supported in unordered mode"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: partial ordered mode

)
else:
warnings.warn(
"Window ordering may be ambiguous, this can cause unstable results.",
bigframes.exceptions.AmbiguousWindowWarning,
)

return ArrayValue(
nodes.WindowOpNode(
child=self.node,
Expand Down
4 changes: 4 additions & 0 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,10 @@ def index_name_to_col_id(self) -> typing.Mapping[Label, typing.Sequence[str]]:
mapping[label] = (*mapping.get(label, ()), id)
return mapping

@property
def explicitly_ordered(self) -> bool:
return self.expr.node.explicitly_ordered

def cols_matching_label(self, partial_label: Label) -> typing.Sequence[str]:
"""
Unlike label_to_col_id, this works with partial labels for multi-index.
Expand Down
38 changes: 19 additions & 19 deletions bigframes/core/groupby/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def __getitem__(
dropna=self._dropna,
)

@validations.requires_strict_ordering()
@validations.requires_ordering()
def head(self, n: int = 5) -> df.DataFrame:
block = self._block
if self._dropna:
Expand Down Expand Up @@ -235,25 +235,25 @@ def count(self) -> df.DataFrame:
def nunique(self) -> df.DataFrame:
return self._aggregate_all(agg_ops.nunique_op)

@validations.requires_strict_ordering()
@validations.requires_ordering()
def cumsum(self, *args, numeric_only: bool = False, **kwargs) -> df.DataFrame:
if not numeric_only:
self._raise_on_non_numeric("cumsum")
return self._apply_window_op(agg_ops.sum_op, numeric_only=True)

@validations.requires_strict_ordering()
@validations.requires_ordering()
def cummin(self, *args, numeric_only: bool = False, **kwargs) -> df.DataFrame:
return self._apply_window_op(agg_ops.min_op, numeric_only=numeric_only)

@validations.requires_strict_ordering()
@validations.requires_ordering()
def cummax(self, *args, numeric_only: bool = False, **kwargs) -> df.DataFrame:
return self._apply_window_op(agg_ops.max_op, numeric_only=numeric_only)

@validations.requires_strict_ordering()
@validations.requires_ordering()
def cumprod(self, *args, **kwargs) -> df.DataFrame:
return self._apply_window_op(agg_ops.product_op, numeric_only=True)

@validations.requires_strict_ordering()
@validations.requires_ordering()
def shift(self, periods=1) -> series.Series:
window = window_specs.rows(
grouping_keys=tuple(self._by_col_ids),
Expand All @@ -262,7 +262,7 @@ def shift(self, periods=1) -> series.Series:
)
return self._apply_window_op(agg_ops.ShiftOp(periods), window=window)

@validations.requires_strict_ordering()
@validations.requires_ordering()
def diff(self, periods=1) -> series.Series:
window = window_specs.rows(
grouping_keys=tuple(self._by_col_ids),
Expand All @@ -271,7 +271,7 @@ def diff(self, periods=1) -> series.Series:
)
return self._apply_window_op(agg_ops.DiffOp(periods), window=window)

@validations.requires_strict_ordering()
@validations.requires_ordering()
def rolling(self, window: int, min_periods=None) -> windows.Window:
# To get n size window, need current row and n-1 preceding rows.
window_spec = window_specs.rows(
Expand All @@ -287,7 +287,7 @@ def rolling(self, window: int, min_periods=None) -> windows.Window:
block, window_spec, self._selected_cols, drop_null_groups=self._dropna
)

@validations.requires_strict_ordering()
@validations.requires_ordering()
def expanding(self, min_periods: int = 1) -> windows.Window:
window_spec = window_specs.cumulative_rows(
grouping_keys=tuple(self._by_col_ids),
Expand Down Expand Up @@ -532,7 +532,7 @@ def __init__(
def _session(self) -> core.Session:
return self._block.session

@validations.requires_strict_ordering()
@validations.requires_ordering()
def head(self, n: int = 5) -> series.Series:
block = self._block
if self._dropna:
Expand Down Expand Up @@ -650,31 +650,31 @@ def agg(self, func=None) -> typing.Union[df.DataFrame, series.Series]:

aggregate = agg

@validations.requires_strict_ordering()
@validations.requires_ordering()
def cumsum(self, *args, **kwargs) -> series.Series:
return self._apply_window_op(
agg_ops.sum_op,
)

@validations.requires_strict_ordering()
@validations.requires_ordering()
def cumprod(self, *args, **kwargs) -> series.Series:
return self._apply_window_op(
agg_ops.product_op,
)

@validations.requires_strict_ordering()
@validations.requires_ordering()
def cummax(self, *args, **kwargs) -> series.Series:
return self._apply_window_op(
agg_ops.max_op,
)

@validations.requires_strict_ordering()
@validations.requires_ordering()
def cummin(self, *args, **kwargs) -> series.Series:
return self._apply_window_op(
agg_ops.min_op,
)

@validations.requires_strict_ordering()
@validations.requires_ordering()
def cumcount(self, *args, **kwargs) -> series.Series:
return (
self._apply_window_op(
Expand All @@ -684,7 +684,7 @@ def cumcount(self, *args, **kwargs) -> series.Series:
- 1
)

@validations.requires_strict_ordering()
@validations.requires_ordering()
def shift(self, periods=1) -> series.Series:
"""Shift index by desired number of periods."""
window = window_specs.rows(
Expand All @@ -694,7 +694,7 @@ def shift(self, periods=1) -> series.Series:
)
return self._apply_window_op(agg_ops.ShiftOp(periods), window=window)

@validations.requires_strict_ordering()
@validations.requires_ordering()
def diff(self, periods=1) -> series.Series:
window = window_specs.rows(
grouping_keys=tuple(self._by_col_ids),
Expand All @@ -703,7 +703,7 @@ def diff(self, periods=1) -> series.Series:
)
return self._apply_window_op(agg_ops.DiffOp(periods), window=window)

@validations.requires_strict_ordering()
@validations.requires_ordering()
def rolling(self, window: int, min_periods=None) -> windows.Window:
# To get n size window, need current row and n-1 preceding rows.
window_spec = window_specs.rows(
Expand All @@ -723,7 +723,7 @@ def rolling(self, window: int, min_periods=None) -> windows.Window:
is_series=True,
)

@validations.requires_strict_ordering()
@validations.requires_ordering()
def expanding(self, min_periods: int = 1) -> windows.Window:
window_spec = window_specs.cumulative_rows(
grouping_keys=tuple(self._by_col_ids),
Expand Down
8 changes: 4 additions & 4 deletions bigframes/core/indexes/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ def empty(self) -> bool:
return self.shape[0] == 0

@property
@validations.requires_strict_ordering()
@validations.requires_ordering()
def is_monotonic_increasing(self) -> bool:
"""
Return a boolean if the values are equal or increasing.
Expand All @@ -198,7 +198,7 @@ def is_monotonic_increasing(self) -> bool:
)

@property
@validations.requires_strict_ordering()
@validations.requires_ordering()
def is_monotonic_decreasing(self) -> bool:
"""
Return a boolean if the values are equal or decreasing.
Expand Down Expand Up @@ -348,7 +348,7 @@ def max(self) -> typing.Any:
def min(self) -> typing.Any:
return self._apply_aggregation(agg_ops.min_op)

@validations.requires_strict_ordering()
@validations.requires_ordering()
def argmax(self) -> int:
block, row_nums = self._block.promote_offsets()
block = block.order_by(
Expand All @@ -361,7 +361,7 @@ def argmax(self) -> int:

return typing.cast(int, series.Series(block.select_column(row_nums)).iloc[0])

@validations.requires_strict_ordering()
@validations.requires_ordering()
def argmin(self) -> int:
block, row_nums = self._block.promote_offsets()
block = block.order_by(
Expand Down
42 changes: 42 additions & 0 deletions bigframes/core/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,14 @@ def order_ambiguous(self) -> bool:
"""
...

@property
@abc.abstractmethod
def explicitly_ordered(self) -> bool:
"""
Whether row ordering is potentially ambiguous. For example, ReadTable (without a primary key) could be ordered in different ways.
"""
...

@functools.cached_property
def total_variables(self) -> int:
return self.variables_introduced + sum(
Expand Down Expand Up @@ -180,6 +188,10 @@ def child_nodes(self) -> typing.Sequence[BigFrameNode]:
def schema(self) -> schemata.ArraySchema:
return self.child.schema

@property
def explicitly_ordered(self) -> bool:
return self.child.explicitly_ordered

def transform_children(
self, t: Callable[[BigFrameNode], BigFrameNode]
) -> BigFrameNode:
Expand Down Expand Up @@ -212,6 +224,10 @@ def child_nodes(self) -> typing.Sequence[BigFrameNode]:
def order_ambiguous(self) -> bool:
return True

@property
def explicitly_ordered(self) -> bool:
return False

def __hash__(self):
return self._node_hash

Expand Down Expand Up @@ -267,6 +283,10 @@ def child_nodes(self) -> typing.Sequence[BigFrameNode]:
def order_ambiguous(self) -> bool:
return any(child.order_ambiguous for child in self.children)

@property
def explicitly_ordered(self) -> bool:
return all(child.explicitly_ordered for child in self.children)

def __hash__(self):
return self._node_hash

Expand Down Expand Up @@ -317,6 +337,10 @@ def variables_introduced(self) -> int:
def order_ambiguous(self) -> bool:
return False

@property
def explicitly_ordered(self) -> bool:
return True

def transform_children(
self, t: Callable[[BigFrameNode], BigFrameNode]
) -> BigFrameNode:
Expand Down Expand Up @@ -378,6 +402,10 @@ def relation_ops_created(self) -> int:
def order_ambiguous(self) -> bool:
return len(self.total_order_cols) == 0

@property
def explicitly_ordered(self) -> bool:
return len(self.total_order_cols) > 0

@functools.cached_property
def variables_introduced(self) -> int:
return len(self.schema.items) + 1
Expand Down Expand Up @@ -449,6 +477,12 @@ def hidden_columns(self) -> typing.Tuple[str, ...]:
def order_ambiguous(self) -> bool:
return not isinstance(self.ordering, orderings.TotalOrdering)

@property
def explicitly_ordered(self) -> bool:
return (self.ordering is not None) and len(
self.ordering.all_ordering_columns
) > 0

def transform_children(
self, t: Callable[[BigFrameNode], BigFrameNode]
) -> BigFrameNode:
Expand Down Expand Up @@ -523,6 +557,10 @@ def relation_ops_created(self) -> int:
# Doesnt directly create any relational operations
return 0

@property
def explicitly_ordered(self) -> bool:
return True


@dataclass(frozen=True)
class ReversedNode(UnaryNode):
Expand Down Expand Up @@ -636,6 +674,10 @@ def variables_introduced(self) -> int:
def order_ambiguous(self) -> bool:
return False

@property
def explicitly_ordered(self) -> bool:
return True


@dataclass(frozen=True)
class WindowOpNode(UnaryNode):
Expand Down
17 changes: 15 additions & 2 deletions bigframes/core/validations.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,20 @@

if TYPE_CHECKING:
from bigframes import Session
from bigframes.core.blocks import Block


class HasSession(Protocol):
@property
def _session(self) -> Session:
...

@property
def _block(self) -> Block:
...


def requires_strict_ordering(suggestion: Optional[str] = None):
def requires_ordering(suggestion: Optional[str] = None):
def decorator(meth):
@functools.wraps(meth)
def guarded_meth(object: HasSession, *args, **kwargs):
Expand All @@ -47,8 +52,16 @@ def guarded_meth(object: HasSession, *args, **kwargs):
def enforce_ordered(
object: HasSession, opname: str, suggestion: Optional[str] = None
) -> None:
if not object._session._strictly_ordered:
session = object._session
if session._strictly_ordered or not object._block.expr.node.order_ambiguous:
# No ambiguity for how to calculate ordering, so no error or warning
return None
if not session.allow_ambiguity:
suggestion_substr = suggestion + " " if suggestion else ""
raise bigframes.exceptions.OrderRequiredError(
f"Op {opname} not supported when strict ordering is disabled. {suggestion_substr}{bigframes.constants.FEEDBACK_LINK}"
)
if not object._block.explicitly_ordered:
raise bigframes.exceptions.OrderRequiredError(
f"Op {opname} requires an ordering. Use .sort_values or .sort_index to provide an ordering. {bigframes.constants.FEEDBACK_LINK}"
)
Loading