Skip to content

Commit 584bc93

Browse files
committed
fix: include index_col when selecting columns and filters in read_gbq_table
Fixes internal issue 339430305 test: refactor `read_gbq` / `read_gbq_table` tests to test with all parameters combined refactor: move query generation code to BigQuery I/O module
1 parent c6c487f commit 584bc93

File tree

5 files changed

+306
-212
lines changed

5 files changed

+306
-212
lines changed

bigframes/session/__init__.py

Lines changed: 32 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import datetime
2121
import logging
2222
import os
23-
import re
2423
import secrets
2524
import typing
2625
from typing import (
@@ -89,7 +88,7 @@
8988
import bigframes.formatting_helpers as formatting_helpers
9089
from bigframes.functions.remote_function import read_gbq_function as bigframes_rgf
9190
from bigframes.functions.remote_function import remote_function as bigframes_rf
92-
import bigframes.session._io.bigquery as bigframes_io
91+
import bigframes.session._io.bigquery as bf_io_bigquery
9392
import bigframes.session._io.bigquery.read_gbq_table as bf_read_gbq_table
9493
import bigframes.session.clients
9594
import bigframes.version
@@ -145,14 +144,18 @@
145144
)
146145

147146

148-
def _is_query(query_or_table: str) -> bool:
149-
"""Determine if `query_or_table` is a table ID or a SQL string"""
150-
return re.search(r"\s", query_or_table.strip(), re.MULTILINE) is not None
147+
def _to_index_cols(
148+
index_col: Iterable[str] | str | bigframes.enums.DefaultIndexKind = (),
149+
) -> List[str]:
150+
"""Convert index_col into a list of column names."""
151+
if isinstance(index_col, bigframes.enums.DefaultIndexKind):
152+
index_cols: List[str] = []
153+
elif isinstance(index_col, str):
154+
index_cols = [index_col]
155+
else:
156+
index_cols = list(index_col)
151157

152-
153-
def _is_table_with_wildcard_suffix(query_or_table: str) -> bool:
154-
"""Determine if `query_or_table` is a table and contains a wildcard suffix."""
155-
return not _is_query(query_or_table) and query_or_table.endswith("*")
158+
return index_cols
156159

157160

158161
class Session(
@@ -322,13 +325,19 @@ def read_gbq(
322325
columns = col_order
323326

324327
filters = list(filters)
325-
if len(filters) != 0 or _is_table_with_wildcard_suffix(query_or_table):
328+
if len(filters) != 0 or bf_io_bigquery.is_table_with_wildcard_suffix(
329+
query_or_table
330+
):
326331
# TODO(b/338111344): This appears to be missing index_cols, which
327332
# are necessary to be selected.
328-
# TODO(b/338039517): Also, need to account for primary keys.
329-
query_or_table = self._to_query(query_or_table, columns, filters)
333+
# TODO(b/338039517): Refactor this to be called inside both
334+
# _read_gbq_query and _read_gbq_table (after detecting primary keys)
335+
# so we can make sure index_col/index_cols reflects primary keys.
336+
query_or_table = bf_io_bigquery.to_query(
337+
query_or_table, _to_index_cols(index_col), columns, filters
338+
)
330339

331-
if _is_query(query_or_table):
340+
if bf_io_bigquery.is_query(query_or_table):
332341
return self._read_gbq_query(
333342
query_or_table,
334343
index_col=index_col,
@@ -355,85 +364,6 @@ def read_gbq(
355364
use_cache=use_cache if use_cache is not None else True,
356365
)
357366

358-
def _to_query(
359-
self,
360-
query_or_table: str,
361-
columns: Iterable[str],
362-
filters: third_party_pandas_gbq.FiltersType,
363-
) -> str:
364-
"""Compile query_or_table with conditions(filters, wildcards) to query."""
365-
filters = list(filters)
366-
sub_query = (
367-
f"({query_or_table})"
368-
if _is_query(query_or_table)
369-
else f"`{query_or_table}`"
370-
)
371-
372-
# TODO(b/338111344): Generate an index based on DefaultIndexKind if we
373-
# don't have index columns specified.
374-
select_clause = "SELECT " + (
375-
", ".join(f"`{column}`" for column in columns) if columns else "*"
376-
)
377-
378-
where_clause = ""
379-
if filters:
380-
valid_operators: Mapping[third_party_pandas_gbq.FilterOps, str] = {
381-
"in": "IN",
382-
"not in": "NOT IN",
383-
"LIKE": "LIKE",
384-
"==": "=",
385-
">": ">",
386-
"<": "<",
387-
">=": ">=",
388-
"<=": "<=",
389-
"!=": "!=",
390-
}
391-
392-
# If single layer filter, add another pseudo layer. So the single layer represents "and" logic.
393-
if isinstance(filters[0], tuple) and (
394-
len(filters[0]) == 0 or not isinstance(list(filters[0])[0], tuple)
395-
):
396-
filters = typing.cast(third_party_pandas_gbq.FiltersType, [filters])
397-
398-
or_expressions = []
399-
for group in filters:
400-
if not isinstance(group, Iterable):
401-
group = [group]
402-
403-
and_expressions = []
404-
for filter_item in group:
405-
if not isinstance(filter_item, tuple) or (len(filter_item) != 3):
406-
raise ValueError(
407-
f"Filter condition should be a tuple of length 3, {filter_item} is not valid."
408-
)
409-
410-
column, operator, value = filter_item
411-
412-
if not isinstance(column, str):
413-
raise ValueError(
414-
f"Column name should be a string, but received '{column}' of type {type(column).__name__}."
415-
)
416-
417-
if operator not in valid_operators:
418-
raise ValueError(f"Operator {operator} is not valid.")
419-
420-
operator_str = valid_operators[operator]
421-
422-
if operator_str in ["IN", "NOT IN"]:
423-
value_list = ", ".join([repr(v) for v in value])
424-
expression = f"`{column}` {operator_str} ({value_list})"
425-
else:
426-
expression = f"`{column}` {operator_str} {repr(value)}"
427-
and_expressions.append(expression)
428-
429-
or_expressions.append(" AND ".join(and_expressions))
430-
431-
if or_expressions:
432-
where_clause = " WHERE " + " OR ".join(or_expressions)
433-
434-
full_query = f"{select_clause} FROM {sub_query} AS sub{where_clause}"
435-
return full_query
436-
437367
def _query_to_destination(
438368
self,
439369
query: str,
@@ -610,12 +540,7 @@ def _read_gbq_query(
610540
True if use_cache is None else use_cache
611541
)
612542

613-
if isinstance(index_col, bigframes.enums.DefaultIndexKind):
614-
index_cols = []
615-
elif isinstance(index_col, str):
616-
index_cols = [index_col]
617-
else:
618-
index_cols = list(index_col)
543+
index_cols = _to_index_cols(index_col)
619544

620545
destination, query_job = self._query_to_destination(
621546
query,
@@ -682,8 +607,13 @@ def read_gbq_table(
682607
columns = col_order
683608

684609
filters = list(filters)
685-
if len(filters) != 0 or _is_table_with_wildcard_suffix(query):
686-
query = self._to_query(query, columns, filters)
610+
if len(filters) != 0 or bf_io_bigquery.is_table_with_wildcard_suffix(query):
611+
# TODO(b/338039517): Refactor this to be called inside both
612+
# _read_gbq_query and _read_gbq_table (after detecting primary keys)
613+
# so we can make sure index_col/index_cols reflects primary keys.
614+
query = bf_io_bigquery.to_query(
615+
query, _to_index_cols(index_col), columns, filters
616+
)
687617

688618
return self._read_gbq_query(
689619
query,
@@ -838,12 +768,7 @@ def _read_bigquery_load_job(
838768
index_col: Iterable[str] | str | bigframes.enums.DefaultIndexKind = (),
839769
columns: Iterable[str] = (),
840770
) -> dataframe.DataFrame:
841-
if isinstance(index_col, bigframes.enums.DefaultIndexKind):
842-
index_cols = []
843-
elif isinstance(index_col, str):
844-
index_cols = [index_col]
845-
else:
846-
index_cols = list(index_col)
771+
index_cols = _to_index_cols(index_col)
847772

848773
if not job_config.clustering_fields and index_cols:
849774
job_config.clustering_fields = index_cols[:_MAX_CLUSTER_COLUMNS]
@@ -1430,7 +1355,7 @@ def _create_empty_temp_table(
14301355
datetime.datetime.now(datetime.timezone.utc) + constants.DEFAULT_EXPIRATION
14311356
)
14321357

1433-
table = bigframes_io.create_temp_table(
1358+
table = bf_io_bigquery.create_temp_table(
14341359
self,
14351360
expiration,
14361361
schema=schema,

bigframes/session/_io/bigquery/__init__.py

Lines changed: 96 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,13 @@
1919
import datetime
2020
import itertools
2121
import os
22+
import re
2223
import textwrap
2324
import types
24-
from typing import Dict, Iterable, Optional, Sequence, Tuple, Union
25+
import typing
26+
from typing import Dict, Iterable, Mapping, Optional, Sequence, Tuple, Union
2527

28+
import bigframes_vendored.pandas.io.gbq as third_party_pandas_gbq
2629
import google.api_core.exceptions
2730
import google.cloud.bigquery as bigquery
2831

@@ -311,3 +314,95 @@ def create_bq_dataset_reference(
311314
query_destination.project,
312315
query_destination.dataset_id,
313316
)
317+
318+
319+
def is_query(query_or_table: str) -> bool:
320+
"""Determine if `query_or_table` is a table ID or a SQL string"""
321+
return re.search(r"\s", query_or_table.strip(), re.MULTILINE) is not None
322+
323+
324+
def is_table_with_wildcard_suffix(query_or_table: str) -> bool:
325+
"""Determine if `query_or_table` is a table and contains a wildcard suffix."""
326+
return not is_query(query_or_table) and query_or_table.endswith("*")
327+
328+
329+
def to_query(
330+
query_or_table: str,
331+
index_cols: Iterable[str],
332+
columns: Iterable[str],
333+
filters: third_party_pandas_gbq.FiltersType,
334+
) -> str:
335+
"""Compile query_or_table with conditions(filters, wildcards) to query."""
336+
filters = list(filters)
337+
sub_query = (
338+
f"({query_or_table})" if is_query(query_or_table) else f"`{query_or_table}`"
339+
)
340+
341+
# TODO(b/338111344): Generate an index based on DefaultIndexKind if we
342+
# don't have index columns specified.
343+
if columns:
344+
# We only reduce the selection if columns is set, but we always
345+
# want to make sure index_cols is also included.
346+
all_columns = itertools.chain(index_cols, columns)
347+
select_clause = "SELECT " + ", ".join(f"`{column}`" for column in all_columns)
348+
else:
349+
select_clause = "SELECT *"
350+
351+
where_clause = ""
352+
if filters:
353+
valid_operators: Mapping[third_party_pandas_gbq.FilterOps, str] = {
354+
"in": "IN",
355+
"not in": "NOT IN",
356+
"LIKE": "LIKE",
357+
"==": "=",
358+
">": ">",
359+
"<": "<",
360+
">=": ">=",
361+
"<=": "<=",
362+
"!=": "!=",
363+
}
364+
365+
# If single layer filter, add another pseudo layer. So the single layer represents "and" logic.
366+
if isinstance(filters[0], tuple) and (
367+
len(filters[0]) == 0 or not isinstance(list(filters[0])[0], tuple)
368+
):
369+
filters = typing.cast(third_party_pandas_gbq.FiltersType, [filters])
370+
371+
or_expressions = []
372+
for group in filters:
373+
if not isinstance(group, Iterable):
374+
group = [group]
375+
376+
and_expressions = []
377+
for filter_item in group:
378+
if not isinstance(filter_item, tuple) or (len(filter_item) != 3):
379+
raise ValueError(
380+
f"Filter condition should be a tuple of length 3, {filter_item} is not valid."
381+
)
382+
383+
column, operator, value = filter_item
384+
385+
if not isinstance(column, str):
386+
raise ValueError(
387+
f"Column name should be a string, but received '{column}' of type {type(column).__name__}."
388+
)
389+
390+
if operator not in valid_operators:
391+
raise ValueError(f"Operator {operator} is not valid.")
392+
393+
operator_str = valid_operators[operator]
394+
395+
if operator_str in ["IN", "NOT IN"]:
396+
value_list = ", ".join([repr(v) for v in value])
397+
expression = f"`{column}` {operator_str} ({value_list})"
398+
else:
399+
expression = f"`{column}` {operator_str} {repr(value)}"
400+
and_expressions.append(expression)
401+
402+
or_expressions.append(" AND ".join(and_expressions))
403+
404+
if or_expressions:
405+
where_clause = " WHERE " + " OR ".join(or_expressions)
406+
407+
full_query = f"{select_clause} FROM {sub_query} AS sub{where_clause}"
408+
return full_query

0 commit comments

Comments
 (0)