Skip to content
3 changes: 2 additions & 1 deletion bigframes/core/indexers.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import pandas as pd

import bigframes.constants as constants
import bigframes.core.blocks
import bigframes.core.guid as guid
import bigframes.core.indexes as indexes
import bigframes.core.scalar
Expand Down Expand Up @@ -214,7 +215,7 @@ def __getitem__(self, key: tuple) -> bigframes.core.scalar.Scalar:
raise ValueError(error_message)
if len(key) != 2:
raise TypeError(error_message)
block = self._dataframe._block
block: bigframes.core.blocks.Block = self._dataframe._block
column_block = block.select_columns([block.value_columns[key[1]]])
column = bigframes.series.Series(column_block)
return column.iloc[key[0]]
Expand Down
110 changes: 95 additions & 15 deletions bigframes/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,8 @@ def read_gbq_query(

See also: :meth:`Session.read_gbq`.
"""
# NOTE: This method doesn't (yet) exist in pandas or pandas-gbq, so
# these docstrings are inline.
return self._read_gbq_query(
query=query,
index_col=index_col,
Expand All @@ -515,8 +517,6 @@ def _read_gbq_query(
max_results: Optional[int] = None,
api_name: str,
) -> dataframe.DataFrame:
# NOTE: This method doesn't (yet) exist in pandas or pandas-gbq, so
# these docstrings are inline.
if isinstance(index_col, str):
index_cols = [index_col]
else:
Expand Down Expand Up @@ -561,6 +561,8 @@ def read_gbq_table(

See also: :meth:`Session.read_gbq`.
"""
# NOTE: This method doesn't (yet) exist in pandas or pandas-gbq, so
# these docstrings are inline.
return self._read_gbq_table(
query=query,
index_col=index_col,
Expand All @@ -569,6 +571,62 @@ def read_gbq_table(
api_name="read_gbq_table",
)

def _read_gbq_table_to_ibis_with_total_ordering(
self,
table_ref: bigquery.table.TableReference,
*,
api_name: str,
) -> Tuple[ibis_types.Table, Optional[Sequence[str]]]:
"""Create a read-only Ibis table expression representing a table.

If we can get a total ordering from the table, such as via primary key
column(s), then return those too so that ordering generation can be
avoided.
"""
if table_ref.dataset_id.upper() == "_SESSION":
# _SESSION tables aren't supported by the tables.get REST API.
return (
self.ibis_client.sql(
f"SELECT * FROM `_SESSION`.`{table_ref.table_id}`"
),
None,
)

table_expression = self.ibis_client.table(
table_ref.table_id,
database=f"{table_ref.project}.{table_ref.dataset_id}",
)

# If there are primary keys defined, the query engine assumes these
# columns are unique, even if the constraint is not enforced. We make
# the same assumption and use these columns as the total ordering keys.
table = self.bqclient.get_table(table_ref)

# TODO(b/305264153): Use public properties to fetch primary keys once
# added to google-cloud-bigquery.
primary_keys = (
table._properties.get("tableConstraints", {})
.get("primaryKey", {})
.get("columns")
)

if not primary_keys:
return table_expression, None
else:
# Read from a snapshot since we won't have to copy the table data to create a total ordering.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

job_config = bigquery.QueryJobConfig()
job_config.labels["bigframes-api"] = api_name
current_timestamp = list(
self.bqclient.query(
"SELECT CURRENT_TIMESTAMP() AS `current_timestamp`",
job_config=job_config,
).result()
)[0][0]
table_expression = self.ibis_client.sql(
bigframes_io.create_snapshot_sql(table_ref, current_timestamp)
)
return table_expression, primary_keys

def _read_gbq_table(
self,
query: str,
Expand All @@ -581,24 +639,19 @@ def _read_gbq_table(
if max_results and max_results <= 0:
raise ValueError("`max_results` should be a positive number.")

# NOTE: This method doesn't (yet) exist in pandas or pandas-gbq, so
# these docstrings are inline.
# TODO(swast): Can we re-use the temp table from other reads in the
# session, if the original table wasn't modified?
table_ref = bigquery.table.TableReference.from_string(
query, default_project=self.bqclient.project
)

if table_ref.dataset_id.upper() == "_SESSION":
# _SESSION tables aren't supported by the tables.get REST API.
table_expression = self.ibis_client.sql(
f"SELECT * FROM `_SESSION`.`{table_ref.table_id}`"
)
else:
table_expression = self.ibis_client.table(
table_ref.table_id,
database=f"{table_ref.project}.{table_ref.dataset_id}",
)
(
table_expression,
total_ordering_cols,
) = self._read_gbq_table_to_ibis_with_total_ordering(
table_ref,
api_name=api_name,
)

for key in col_order:
if key not in table_expression.columns:
Expand All @@ -624,7 +677,34 @@ def _read_gbq_table(
ordering = None
is_total_ordering = False

if len(index_cols) != 0:
if total_ordering_cols is not None:
# Note: currently, this a table has a total ordering only when the
# primary key(s) are set on a table. The query engine assumes such
# columns are unique, even if not enforced.
is_total_ordering = True
ordering = core.ExpressionOrdering(
ordering_value_columns=[
core.OrderingColumnReference(column_id)
for column_id in total_ordering_cols
],
total_ordering_columns=frozenset(total_ordering_cols),
)

if len(index_cols) != 0:
index_labels = typing.cast(List[Optional[str]], index_cols)
else:
# Use the total_ordering_cols to project offsets to use as the default index.
table_expression = table_expression.order_by(index_cols)
default_index_id = guid.generate_guid("bigframes_index_")
default_index_col = (
ibis.row_number().cast(ibis_dtypes.int64).name(default_index_id)
)
table_expression = table_expression.mutate(
**{default_index_id: default_index_col}
)
index_cols = [default_index_id]
index_labels = [None]
elif len(index_cols) != 0:
index_labels = typing.cast(List[Optional[str]], index_cols)
distinct_table = table_expression.select(*index_cols).distinct()
is_unique_sql = f"""WITH full_table AS (
Expand Down
31 changes: 31 additions & 0 deletions tests/system/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import logging
import math
import pathlib
import textwrap
import typing
from typing import Dict, Optional

Expand Down Expand Up @@ -795,6 +796,36 @@ def penguins_randomforest_classifier_model_name(
return model_name


@pytest.fixture(scope="session")
def usa_names_grouped_table(
session: bigframes.Session, dataset_id_permanent
) -> bigquery.Table:
"""Provides a table with primary key(s) set."""
table_id = f"{dataset_id_permanent}.usa_names_grouped"
try:
return session.bqclient.get_table(table_id)
except google.cloud.exceptions.NotFound:
query = textwrap.dedent(
f"""
CREATE TABLE `{dataset_id_permanent}.usa_names_grouped`
(
total_people INT64,
name STRING,
gender STRING,
year INT64,
PRIMARY KEY(name, gender, year) NOT ENFORCED
)
AS
SELECT SUM(`number`) AS total_people, name, gender, year
FROM `bigquery-public-data.usa_names.usa_1910_2013`
GROUP BY name, gender, year
"""
)
job = session.bqclient.query(query)
job.result()
return session.bqclient.get_table(table_id)


@pytest.fixture()
def deferred_repr():
bigframes.options.display.repr_mode = "deferred"
Expand Down
25 changes: 25 additions & 0 deletions tests/system/small/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from typing import List

import google.api_core.exceptions
import google.cloud.bigquery as bigquery
import numpy as np
import pandas as pd
import pytest
Expand Down Expand Up @@ -231,6 +232,30 @@ def test_read_gbq_w_anonymous_query_results_table(session: bigframes.Session):
pd.testing.assert_frame_equal(result, expected, check_dtype=False)


def test_read_gbq_w_primary_keys_table(
session: bigframes.Session, usa_names_grouped_table: bigquery.Table
):
table = usa_names_grouped_table
# TODO(b/305264153): Use public properties to fetch primary keys once
# added to google-cloud-bigquery.
primary_keys = (
table._properties.get("tableConstraints", {})
.get("primaryKey", {})
.get("columns")
)
assert len(primary_keys) != 0

df = session.read_gbq(f"{table.project}.{table.dataset_id}.{table.table_id}")
result = df.head(100).to_pandas()

# Verify that the DataFrame is already sorted by primary keys.
sorted_result = result.sort_values(primary_keys)
pd.testing.assert_frame_equal(result, sorted_result)

# Verify that we're working from a snapshot rather than a copy of the table.
assert "FOR SYSTEM_TIME AS OF TIMESTAMP" in df.sql
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM



@pytest.mark.parametrize(
("query_or_table", "max_results"),
[
Expand Down