Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 12 additions & 19 deletions bigframes/functions/_function_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ def _try_delattr(self, func: Callable, attr: str) -> None:
# https://github.com/ibis-project/ibis/blob/master/ibis/backends/bigquery/udf/__init__.py
def remote_function(
self,
*,
input_types: Union[None, type, Sequence[type]] = None,
output_type: Optional[type] = None,
session: Optional[Session] = None,
Expand All @@ -251,7 +252,7 @@ def remote_function(
reuse: bool = True,
name: Optional[str] = None,
packages: Optional[Sequence[str]] = None,
cloud_function_service_account: Optional[str] = None,
cloud_function_service_account: str,
cloud_function_kms_key_name: Optional[str] = None,
cloud_function_docker_repository: Optional[str] = None,
max_batching_rows: Optional[int] = 1000,
Expand Down Expand Up @@ -384,8 +385,8 @@ def remote_function(
Explicit name of the external package dependencies. Each dependency
is added to the `requirements.txt` as is, and can be of the form
supported in https://pip.pypa.io/en/stable/reference/requirements-file-format/.
cloud_function_service_account (str, Optional):
Service account to use for the cloud functions. If not provided then
cloud_function_service_account (str):
Service account to use for the cloud functions. If "default" provided then
the default service account would be used. See
https://cloud.google.com/functions/docs/securing/function-identity
for more details. Please make sure the service account has the
Expand Down Expand Up @@ -455,22 +456,12 @@ def remote_function(
# Some defaults may be used from the session if not provided otherwise.
session = self._resolve_session(session)

# raise a UserWarning if user does not explicitly set cloud_function_service_account to a
# user-managed cloud_function_service_account of to default
msg = bfe.format_message(
"You have not explicitly set a user-managed `cloud_function_service_account`. "
"Using the default Compute Engine service account. "
"In BigFrames 2.0 onwards, you would have to explicitly set `cloud_function_service_account` "
'either to a user-managed service account (preferred) or to `"default"` '
"to use the default Compute Engine service account (discouraged). "
"See, https://cloud.google.com/functions/docs/securing/function-identity."
)

# If the user forces the cloud function service argument to None, throw
# an exception
if cloud_function_service_account is None:
warnings.warn(msg, stacklevel=2, category=FutureWarning)

if cloud_function_service_account == "default":
cloud_function_service_account = None
raise ValueError(
'You must provide a user managed cloud_function_service_account, or "default" if you would like to let the default service account be used.'
)

# A BigQuery client is required to perform BQ operations.
bigquery_client = self._resolve_bigquery_client(session, bigquery_client)
Expand Down Expand Up @@ -615,7 +606,9 @@ def wrapper(func):
bq_connection_manager,
cloud_function_region,
cloud_functions_client,
cloud_function_service_account,
None
if cloud_function_service_account == "default"
else cloud_function_service_account,
cloud_function_kms_key_name,
cloud_function_docker_repository,
session=session, # type: ignore
Expand Down
3 changes: 2 additions & 1 deletion bigframes/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,15 @@


def remote_function(
*,
input_types: Union[None, type, Sequence[type]] = None,
output_type: Optional[type] = None,
dataset: Optional[str] = None,
bigquery_connection: Optional[str] = None,
reuse: bool = True,
name: Optional[str] = None,
packages: Optional[Sequence[str]] = None,
cloud_function_service_account: Optional[str] = None,
cloud_function_service_account: str,
cloud_function_kms_key_name: Optional[str] = None,
cloud_function_docker_repository: Optional[str] = None,
max_batching_rows: Optional[int] = 1000,
Expand Down
19 changes: 10 additions & 9 deletions bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1202,14 +1202,15 @@ def _check_file_size(self, filepath: str):

def remote_function(
self,
*,
input_types: Union[None, type, Sequence[type]] = None,
output_type: Optional[type] = None,
dataset: Optional[str] = None,
bigquery_connection: Optional[str] = None,
reuse: bool = True,
name: Optional[str] = None,
packages: Optional[Sequence[str]] = None,
cloud_function_service_account: Optional[str] = None,
cloud_function_service_account: str,
cloud_function_kms_key_name: Optional[str] = None,
cloud_function_docker_repository: Optional[str] = None,
max_batching_rows: Optional[int] = 1000,
Expand Down Expand Up @@ -1327,8 +1328,8 @@ def remote_function(
Explicit name of the external package dependencies. Each dependency
is added to the `requirements.txt` as is, and can be of the form
supported in https://pip.pypa.io/en/stable/reference/requirements-file-format/.
cloud_function_service_account (str, Optional):
Service account to use for the cloud functions. If not provided
cloud_function_service_account (str):
Service account to use for the cloud functions. If "default" provided
then the default service account would be used. See
https://cloud.google.com/functions/docs/securing/function-identity
for more details. Please make sure the service account has the
Expand Down Expand Up @@ -1406,8 +1407,8 @@ def remote_function(
`bigframes_remote_function` - The bigquery remote function capable of calling into `bigframes_cloud_function`.
"""
return self._function_session.remote_function(
input_types,
output_type,
input_types=input_types,
output_type=output_type,
session=self,
dataset=dataset,
bigquery_connection=bigquery_connection,
Expand Down Expand Up @@ -1499,8 +1500,8 @@ def udf(
deployed for the user defined code.
"""
return self._function_session.udf(
input_types,
output_type,
input_types=input_types,
output_type=output_type,
session=self,
dataset=dataset,
bigquery_connection=bigquery_connection,
Expand Down Expand Up @@ -1593,7 +1594,7 @@ def read_gbq_function(
Another use case is to define your own remote function and use it later.
For example, define the remote function:

>>> @bpd.remote_function()
>>> @bpd.remote_function(cloud_function_service_account="default")
... def tenfold(num: int) -> float:
... return num * 10

Expand All @@ -1620,7 +1621,7 @@ def read_gbq_function(
note, row processor implies that the function has only one input
parameter.

>>> @bpd.remote_function()
>>> @bpd.remote_function(cloud_function_service_account="default")
... def row_sum(s: bpd.Series) -> float:
... return s['a'] + s['b'] + s['c']

Expand Down
4 changes: 2 additions & 2 deletions notebooks/apps/synthetic_data_generation.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,8 @@
},
"outputs": [],
"source": [
"@bpd.remote_function([int], str, packages=['faker', 'pandas'])\n",
"def data_generator(id):\n",
"@bpd.remote_function(packages=['faker', 'pandas'], cloud_function_service_account=\"default\")\n",
"def data_generator(id: int) -> str:\n",
" context = {}\n",
" exec(code, context)\n",
" result_df = context.get(\"result_df\")\n",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -914,8 +914,8 @@
},
"outputs": [],
"source": [
"@bf.remote_function([str], str)\n",
"def extract_code(text: str):\n",
"@bf.remote_function(cloud_function_service_account=\"default\")\n",
"def extract_code(text: str) -> str:\n",
" try:\n",
" res = text[text.find('\\n')+1:text.find('```', 3)]\n",
" res = res.replace(\"import pandas as pd\", \"import bigframes.pandas as bf\")\n",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1485,8 +1485,8 @@
},
"outputs": [],
"source": [
"@bpd.remote_function([float], str)\n",
"def get_bucket(num):\n",
"@bpd.remote_function(cloud_function_service_account=\"default\")\n",
"def get_bucket(num: float) -> str:\n",
" if not num: return \"NA\"\n",
" boundary = 4000\n",
" return \"at_or_above_4000\" if num >= boundary else \"below_4000\""
Expand Down
4 changes: 2 additions & 2 deletions notebooks/location/regionalized.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -1475,8 +1475,8 @@
}
],
"source": [
"@bpd.remote_function([float], str, bigquery_connection='bigframes-rf-conn')\n",
"def get_bucket(num):\n",
"@bpd.remote_function(bigquery_connection='bigframes-rf-conn', cloud_function_service_account=\"default\")\n",
"def get_bucket(num: float) -> str:\n",
" if not num: return \"NA\"\n",
" boundary = 4000\n",
" return \"at_or_above_4000\" if num >= boundary else \"below_4000\""
Expand Down
8 changes: 4 additions & 4 deletions notebooks/remote_functions/remote_function.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@
"source": [
"# User defined function\n",
"# https://www.codespeedy.com/find-nth-prime-number-in-python/\n",
"def nth_prime(n):\n",
"def nth_prime(n: int) -> int:\n",
" prime_numbers = [2,3]\n",
" i=3\n",
" if(0<n<=2):\n",
Expand Down Expand Up @@ -627,8 +627,8 @@
"\n",
"# User defined function\n",
"# https://www.codespeedy.com/find-nth-prime-number-in-python/\n",
"@pd.remote_function([int], int, reuse=False)\n",
"def nth_prime(n):\n",
"@pd.remote_function(reuse=False, cloud_function_service_account=\"default\")\n",
"def nth_prime(n: int) -> int:\n",
" prime_numbers = [2,3]\n",
" i=3\n",
" if(0<n<=2):\n",
Expand Down Expand Up @@ -1179,7 +1179,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.12"
"version": "3.11.4"
}
},
"nbformat": 4,
Expand Down
14 changes: 7 additions & 7 deletions notebooks/remote_functions/remote_function_usecases.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@
}
],
"source": [
"@bpd.remote_function(reuse=False)\n",
"@bpd.remote_function(reuse=False, cloud_function_service_account=\"default\")\n",
"def duration_category(duration_minutes: int) -> str:\n",
" if duration_minutes < 90:\n",
" return \"short\"\n",
Expand Down Expand Up @@ -466,7 +466,7 @@
}
],
"source": [
"@bpd.remote_function(reuse=False)\n",
"@bpd.remote_function(reuse=False, cloud_function_service_account=\"default\")\n",
"def duration_category(duration_minutes: int) -> str:\n",
" if duration_minutes < 90:\n",
" return DURATION_CATEGORY_SHORT\n",
Expand Down Expand Up @@ -675,7 +675,7 @@
}
],
"source": [
"@bpd.remote_function(reuse=False)\n",
"@bpd.remote_function(reuse=False, cloud_function_service_account=\"default\")\n",
"def duration_category(duration_minutes: int) -> str:\n",
" duration_hours = mymath.ceil(duration_minutes / 60)\n",
" return f\"{duration_hours}h\"\n",
Expand Down Expand Up @@ -886,7 +886,7 @@
}
],
"source": [
"@bpd.remote_function(reuse=False)\n",
"@bpd.remote_function(reuse=False, cloud_function_service_account=\"default\")\n",
"def duration_category(duration_minutes: int) -> str:\n",
" duration_hours = get_hour_ceiling(duration_minutes)\n",
" return f\"{duration_hours} hrs\"\n",
Expand Down Expand Up @@ -1068,7 +1068,7 @@
}
],
"source": [
"@bpd.remote_function(reuse=False, packages=[\"cryptography\"])\n",
"@bpd.remote_function(reuse=False, packages=[\"cryptography\"], cloud_function_service_account=\"default\")\n",
"def get_hash(input: str) -> str:\n",
" from cryptography.fernet import Fernet\n",
"\n",
Expand Down Expand Up @@ -1271,7 +1271,7 @@
}
],
"source": [
"@bpd.remote_function(reuse=False, packages=[\"humanize\"])\n",
"@bpd.remote_function(reuse=False, packages=[\"humanize\"], cloud_function_service_account=\"default\")\n",
"def duration_category(duration_minutes: int) -> str:\n",
" timedelta = dt.timedelta(minutes=duration_minutes)\n",
" return humanize.naturaldelta(timedelta)\n",
Expand Down Expand Up @@ -1442,7 +1442,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.19"
"version": "3.11.4"
}
},
"nbformat": 4,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,9 @@
"source": [
"@bpd.remote_function(packages=[\"anthropic[vertex]\", \"google-auth[requests]\"],\n",
" max_batching_rows=1, \n",
" bigquery_connection=\"bigframes-dev.us-east5.bigframes-rf-conn\") # replace with your connection\n",
" bigquery_connection=\"bigframes-dev.us-east5.bigframes-rf-conn\", # replace with your connection\n",
" cloud_function_service_account=\"default\",\n",
")\n",
"def anthropic_transformer(message: str) -> str:\n",
" from anthropic import AnthropicVertex\n",
" client = AnthropicVertex(region=LOCATION, project_id=PROJECT)\n",
Expand Down
6 changes: 2 additions & 4 deletions samples/snippets/remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,8 @@ def run_remote_function_and_read_gbq_function(project_id: str) -> None:
# of the penguins, which is a real number, into a category, which is a
# string.
@bpd.remote_function(
float,
str,
reuse=False,
cloud_function_service_account="default",
)
def get_bucket(num: float) -> str:
if not num:
Expand Down Expand Up @@ -91,10 +90,9 @@ def get_bucket(num: float) -> str:
# as a remote function. The custom function in this example has external
# package dependency, which can be specified via `packages` parameter.
@bpd.remote_function(
str,
str,
reuse=False,
packages=["cryptography"],
cloud_function_service_account="default",
)
def get_hash(input: str) -> str:
from cryptography.fernet import Fernet
Expand Down
Loading