Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 40 additions & 6 deletions bigframes/functions/remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,13 @@ def __init__(
self._cloud_function_docker_repository = cloud_function_docker_repository

def create_bq_remote_function(
self, input_args, input_types, output_type, endpoint, bq_function_name
self,
input_args,
input_types,
output_type,
endpoint,
bq_function_name,
max_batching_rows,
):
"""Create a BigQuery remote function given the artifacts of a user defined
function and the http endpoint of a corresponding cloud function."""
Expand All @@ -169,14 +175,25 @@ def create_bq_remote_function(
bq_function_args.append(
f"{name} {third_party_ibis_bqtypes.BigQueryType.from_ibis(input_types[idx])}"
)

remote_function_options = {
"endpoint": endpoint,
"max_batching_rows": max_batching_rows,
}

remote_function_options_str = ", ".join(
[
f'{key}="{val}"' if isinstance(val, str) else f"{key}={val}"
for key, val in remote_function_options.items()
if val is not None
]
)

create_function_ddl = f"""
CREATE OR REPLACE FUNCTION `{self._gcp_project_id}.{self._bq_dataset}`.{bq_function_name}({','.join(bq_function_args)})
RETURNS {bq_function_return_type}
REMOTE WITH CONNECTION `{self._gcp_project_id}.{self._bq_location}.{self._bq_connection_id}`
OPTIONS (
endpoint = "{endpoint}",
max_batching_rows = 1000
)"""
OPTIONS ({remote_function_options_str})"""

logger.info(f"Creating BQ remote function: {create_function_ddl}")

Expand Down Expand Up @@ -438,6 +455,7 @@ def provision_bq_remote_function(
reuse,
name,
package_requirements,
max_batching_rows,
):
"""Provision a BigQuery remote function."""
# If reuse of any existing function with the same name (indicated by the
Expand Down Expand Up @@ -485,7 +503,12 @@ def provision_bq_remote_function(
"Exactly one type should be provided for every input arg."
)
self.create_bq_remote_function(
input_args, input_types, output_type, cf_endpoint, remote_function_name
input_args,
input_types,
output_type,
cf_endpoint,
remote_function_name,
max_batching_rows,
)
else:
logger.info(f"Remote function {remote_function_name} already exists.")
Expand Down Expand Up @@ -607,6 +630,7 @@ def remote_function(
cloud_function_service_account: Optional[str] = None,
cloud_function_kms_key_name: Optional[str] = None,
cloud_function_docker_repository: Optional[str] = None,
max_batching_rows: Optional[int] = 1000,
):
"""Decorator to turn a user defined function into a BigQuery remote function.

Expand Down Expand Up @@ -723,6 +747,15 @@ def remote_function(
projects/PROJECT_ID/locations/LOCATION/repositories/REPOSITORY_NAME.
For more details see
https://cloud.google.com/functions/docs/securing/cmek#before_you_begin.
max_batching_rows (str, Optional):
The maximum number of rows to be batched for processing in the
BQ remote function. Default value is 1000. A lower number can be
passed to avoid timeouts in case the user code is too complex to
process large number of rows fast enough. A higher number can be
used to increase throughput in case the user code is fast enough.
`None` can be passed to let BQ remote functions service apply
default batching. See for more details
https://cloud.google.com/bigquery/docs/remote-functions#limiting_number_of_rows_in_a_batch_request.
"""
import bigframes.pandas as bpd

Expand Down Expand Up @@ -846,6 +879,7 @@ def wrapper(f):
reuse,
name,
packages,
max_batching_rows,
)

# TODO: Move ibis logic to compiler step
Expand Down
2 changes: 2 additions & 0 deletions bigframes/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,7 @@ def remote_function(
cloud_function_service_account: Optional[str] = None,
cloud_function_kms_key_name: Optional[str] = None,
cloud_function_docker_repository: Optional[str] = None,
max_batching_rows: Optional[int] = 1000,
):
return global_session.with_default_session(
bigframes.session.Session.remote_function,
Expand All @@ -656,6 +657,7 @@ def remote_function(
cloud_function_service_account=cloud_function_service_account,
cloud_function_kms_key_name=cloud_function_kms_key_name,
cloud_function_docker_repository=cloud_function_docker_repository,
max_batching_rows=max_batching_rows,
)


Expand Down
11 changes: 11 additions & 0 deletions bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1541,6 +1541,7 @@ def remote_function(
cloud_function_service_account: Optional[str] = None,
cloud_function_kms_key_name: Optional[str] = None,
cloud_function_docker_repository: Optional[str] = None,
max_batching_rows: Optional[int] = 1000,
):
"""Decorator to turn a user defined function into a BigQuery remote function. Check out
the code samples at: https://cloud.google.com/bigquery/docs/remote-functions#bigquery-dataframes.
Expand Down Expand Up @@ -1635,6 +1636,15 @@ def remote_function(
projects/PROJECT_ID/locations/LOCATION/repositories/REPOSITORY_NAME.
For more details see
https://cloud.google.com/functions/docs/securing/cmek#before_you_begin.
max_batching_rows (str, Optional):
The maximum number of rows to be batched for processing in the
BQ remote function. Default value is 1000. A lower number can be
passed to avoid timeouts in case the user code is too complex to
process large number of rows fast enough. A higher number can be
used to increase throughput in case the user code is fast enough.
`None` can be passed to let BQ remote functions service apply
default batching. See for more details
https://cloud.google.com/bigquery/docs/remote-functions#limiting_number_of_rows_in_a_batch_request.
Returns:
callable: A remote function object pointing to the cloud assets created
in the background to support the remote execution. The cloud assets can be
Expand All @@ -1656,6 +1666,7 @@ def remote_function(
cloud_function_service_account=cloud_function_service_account,
cloud_function_kms_key_name=cloud_function_kms_key_name,
cloud_function_docker_repository=cloud_function_docker_repository,
max_batching_rows=max_batching_rows,
)

def read_gbq_function(
Expand Down
31 changes: 31 additions & 0 deletions tests/system/large/test_remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -1300,3 +1300,34 @@ def square_num(x):
cleanup_remote_function_assets(
session.bqclient, session.cloudfunctionsclient, square_num
)


@pytest.mark.parametrize(
("max_batching_rows"),
[
10_000,
None,
],
)
@pytest.mark.flaky(retries=2, delay=120)
def test_remote_function_max_batching_rows(session, scalars_dfs, max_batching_rows):
try:

def square(x):
return x * x

square_remote = session.remote_function(
[int], int, reuse=False, max_batching_rows=max_batching_rows
)(square)

scalars_df, scalars_pandas_df = scalars_dfs

bf_result = scalars_df["int64_too"].apply(square_remote).to_pandas()
pd_result = scalars_pandas_df["int64_too"].apply(square)

pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False)
finally:
# clean up the gcp assets created for the remote function
cleanup_remote_function_assets(
session.bqclient, session.cloudfunctionsclient, square_remote
)