Skip to content
37 changes: 26 additions & 11 deletions bigframes/remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,10 +202,22 @@ def create_bq_remote_function(
OPTIONS (
endpoint = "{endpoint}"
)"""

logger.info(f"Creating BQ remote function: {create_function_ddl}")

# Make sure the dataset exists
dataset = bigquery.Dataset(
bigquery.DatasetReference.from_string(
self._bq_dataset, default_project=self._gcp_project_id
)
)
dataset.location = self._bq_location
self._bq_client.create_dataset(dataset, exists_ok=True)

# TODO: Use session._start_query() so we get progress bar
query_job = self._bq_client.query(create_function_ddl) # Make an API request.
query_job.result() # Wait for the job to complete.

logger.info(f"Created remote function {query_job.ddl_target_routine}")

def get_cloud_function_fully_qualified_parent(self):
Expand Down Expand Up @@ -465,17 +477,20 @@ def get_remote_function_specs(self, remote_function_name):
routines = self._bq_client.list_routines(
f"{self._gcp_project_id}.{self._bq_dataset}"
)
for routine in routines:
if routine.reference.routine_id == remote_function_name:
# TODO(shobs): Use first class properties when they are available
# https://github.com/googleapis/python-bigquery/issues/1552
rf_options = routine._properties.get("remoteFunctionOptions")
if rf_options:
http_endpoint = rf_options.get("endpoint")
bq_connection = rf_options.get("connection")
if bq_connection:
bq_connection = os.path.basename(bq_connection)
break
try:
for routine in routines:
if routine.reference.routine_id == remote_function_name:
# TODO(shobs): Use first class properties when they are available
# https://github.com/googleapis/python-bigquery/issues/1552
rf_options = routine._properties.get("remoteFunctionOptions")
if rf_options:
http_endpoint = rf_options.get("endpoint")
bq_connection = rf_options.get("connection")
if bq_connection:
bq_connection = os.path.basename(bq_connection)
break
except google.api_core.exceptions.NotFound:
pass
return (http_endpoint, bq_connection)


Expand Down
7 changes: 1 addition & 6 deletions bigframes/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,17 +377,12 @@ def _create_and_bind_bq_session(self):
]
)

# Dataset for storing BQML models and remote functions, which don't yet
# Dataset for storing remote functions, which don't yet
# support proper session temporary storage yet
self._session_dataset = bigquery.Dataset(
f"{self.bqclient.project}.bigframes_temp_{self._location.lower().replace('-', '_')}"
)
self._session_dataset.location = self._location
self._session_dataset.default_table_expiration_ms = 24 * 60 * 60 * 1000

# TODO: handle case when the dataset does not exist and the user does
# not have permission to create one (bigquery.datasets.create IAM)
self.bqclient.create_dataset(self._session_dataset, exists_ok=True)

def close(self):
"""Terminated the BQ session, otherwises the session will be terminated automatically after
Expand Down
5 changes: 0 additions & 5 deletions tests/system/small/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -894,11 +894,6 @@ def test_session_id(session):
# TODO(chelsealin): Verify the session id can be binded with a load job.


def test_session_dataset_exists_and_configured(session: bigframes.Session):
dataset = session.bqclient.get_dataset(session._session_dataset_id)
assert dataset.default_table_expiration_ms == 24 * 60 * 60 * 1000


@pytest.mark.flaky(retries=2)
def test_to_close_session():
session = bigframes.Session()
Expand Down