Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 23 additions & 6 deletions bigframes/session/read_api_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from google.cloud import bigquery_storage_v1
import pyarrow as pa

from bigframes.core import bigframe_node, rewrite
from bigframes.core import bigframe_node, nodes, pyarrow_utils, rewrite
from bigframes.session import executor, semi_executor


Expand All @@ -39,14 +39,11 @@ def execute(
ordered: bool,
peek: Optional[int] = None,
) -> Optional[executor.ExecuteResult]:
node = rewrite.try_reduce_to_table_scan(plan)
node = self._try_adapt_plan(plan, ordered)
if not node:
return None
if node.explicitly_ordered and ordered:
return None
if peek:
# TODO: Support peeking
return None

import google.cloud.bigquery_storage_v1.types as bq_storage_types
from google.protobuf import timestamp_pb2
Expand Down Expand Up @@ -92,16 +89,36 @@ def execute(

def process_page(page):
pa_batch = page.to_arrow()
pa_batch = pa_batch.select(
[item.source_id for item in node.scan_list.items]
)
return pa.RecordBatch.from_arrays(
pa_batch.columns, names=[id.sql for id in node.ids]
)

batches = map(process_page, rowstream.pages)

if peek:
batches = pyarrow_utils.truncate_pyarrow_iterable(batches, max_results=peek)

rows = node.source.n_rows
if peek and rows:
rows = min(peek, rows)

return executor.ExecuteResult(
arrow_batches=batches,
schema=plan.schema,
query_job=None,
total_bytes=None,
total_rows=node.source.n_rows,
total_rows=rows,
)

def _try_adapt_plan(
self,
plan: bigframe_node.BigFrameNode,
ordered: bool,
) -> Optional[nodes.ReadTableNode]:
if not ordered:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we add some more comments/docs on how plan adaptation works? Thanks!

# gets rid of order_by ops
plan = rewrite.bake_order(plan)
return rewrite.try_reduce_to_table_scan(plan)
14 changes: 14 additions & 0 deletions tests/system/large/test_dataframe_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,20 @@ def test_to_pandas_batches_raise_when_large_result_not_allowed(session):
next(df.to_pandas_batches(page_size=500, max_results=1500))


def test_large_df_peek_no_job(session):
execution_count_before = session._metrics.execution_count

# only works with null index, as sequential index requires row_number over full table scan.
df = session.read_gbq(
WIKIPEDIA_TABLE, index_col=bigframes.enums.DefaultIndexKind.NULL
)
result = df.peek(50)
execution_count_after = session._metrics.execution_count

assert len(result) == 50
assert execution_count_after == execution_count_before


def test_to_pandas_batches_override_global_option(
session,
):
Expand Down