1616
1717from __future__ import annotations
1818
19+ import copy
1920import datetime
2021import itertools
2122import logging
@@ -283,9 +284,10 @@ def read_gbq(
283284 * ,
284285 index_col : Iterable [str ] | str = (),
285286 columns : Iterable [str ] = (),
287+ configuration : Optional [Dict ] = None ,
286288 max_results : Optional [int ] = None ,
287289 filters : third_party_pandas_gbq .FiltersType = (),
288- use_cache : bool = True ,
290+ use_cache : Optional [ bool ] = None ,
289291 col_order : Iterable [str ] = (),
290292 # Add a verify index argument that fails if the index is not unique.
291293 ) -> dataframe .DataFrame :
@@ -306,6 +308,7 @@ def read_gbq(
306308 query_or_table ,
307309 index_col = index_col ,
308310 columns = columns ,
311+ configuration = configuration ,
309312 max_results = max_results ,
310313 api_name = "read_gbq" ,
311314 use_cache = use_cache ,
@@ -314,13 +317,20 @@ def read_gbq(
314317 # TODO(swast): Query the snapshot table but mark it as a
315318 # deterministic query so we can avoid serializing if we have a
316319 # unique index.
320+ if configuration is not None :
321+ raise ValueError (
322+ "The 'configuration' argument is not allowed when "
323+ "directly reading from a table. Please remove "
324+ "'configuration' or use a query."
325+ )
326+
317327 return self ._read_gbq_table (
318328 query_or_table ,
319329 index_col = index_col ,
320330 columns = columns ,
321331 max_results = max_results ,
322332 api_name = "read_gbq" ,
323- use_cache = use_cache ,
333+ use_cache = use_cache if use_cache is not None else True ,
324334 )
325335
326336 def _to_query (
@@ -405,7 +415,7 @@ def _query_to_destination(
405415 query : str ,
406416 index_cols : List [str ],
407417 api_name : str ,
408- use_cache : bool = True ,
418+ configuration : dict = { "query" : { "useQueryCache" : True }} ,
409419 ) -> Tuple [Optional [bigquery .TableReference ], Optional [bigquery .QueryJob ]]:
410420 # If a dry_run indicates this is not a query type job, then don't
411421 # bother trying to do a CREATE TEMP TABLE ... AS SELECT ... statement.
@@ -427,23 +437,35 @@ def _query_to_destination(
427437 ][:_MAX_CLUSTER_COLUMNS ]
428438 temp_table = self ._create_empty_temp_table (schema , cluster_cols )
429439
430- job_config = bigquery .QueryJobConfig ()
440+ timeout_ms = configuration .get ("jobTimeoutMs" ) or configuration ["query" ].get (
441+ "timeoutMs"
442+ )
443+
444+ # Convert timeout_ms to seconds, ensuring a minimum of 0.1 seconds to avoid
445+ # the program getting stuck on too-short timeouts.
446+ timeout = max (int (timeout_ms ) * 1e-3 , 0.1 ) if timeout_ms else None
447+
448+ job_config = typing .cast (
449+ bigquery .QueryJobConfig ,
450+ bigquery .QueryJobConfig .from_api_repr (configuration ),
451+ )
431452 job_config .labels ["bigframes-api" ] = api_name
432453 job_config .destination = temp_table
433- job_config .use_query_cache = use_cache
434454
435455 try :
436456 # Write to temp table to workaround BigQuery 10 GB query results
437457 # limit. See: internal issue 303057336.
438458 job_config .labels ["error_caught" ] = "true"
439- _ , query_job = self ._start_query (query , job_config = job_config )
459+ _ , query_job = self ._start_query (
460+ query , job_config = job_config , timeout = timeout
461+ )
440462 return query_job .destination , query_job
441463 except google .api_core .exceptions .BadRequest :
442464 # Some SELECT statements still aren't compatible with cluster
443465 # tables as the destination. For example, if the query has a
444466 # top-level ORDER BY, this conflicts with our ability to cluster
445467 # the table by the index column(s).
446- _ , query_job = self ._start_query (query )
468+ _ , query_job = self ._start_query (query , timeout = timeout )
447469 return query_job .destination , query_job
448470
449471 def read_gbq_query (
@@ -452,8 +474,9 @@ def read_gbq_query(
452474 * ,
453475 index_col : Iterable [str ] | str = (),
454476 columns : Iterable [str ] = (),
477+ configuration : Optional [Dict ] = None ,
455478 max_results : Optional [int ] = None ,
456- use_cache : bool = True ,
479+ use_cache : Optional [ bool ] = None ,
457480 col_order : Iterable [str ] = (),
458481 ) -> dataframe .DataFrame :
459482 """Turn a SQL query into a DataFrame.
@@ -517,6 +540,7 @@ def read_gbq_query(
517540 query = query ,
518541 index_col = index_col ,
519542 columns = columns ,
543+ configuration = configuration ,
520544 max_results = max_results ,
521545 api_name = "read_gbq_query" ,
522546 use_cache = use_cache ,
@@ -528,10 +552,34 @@ def _read_gbq_query(
528552 * ,
529553 index_col : Iterable [str ] | str = (),
530554 columns : Iterable [str ] = (),
555+ configuration : Optional [Dict ] = None ,
531556 max_results : Optional [int ] = None ,
532557 api_name : str = "read_gbq_query" ,
533- use_cache : bool = True ,
558+ use_cache : Optional [ bool ] = None ,
534559 ) -> dataframe .DataFrame :
560+ configuration = _transform_read_gbq_configuration (configuration )
561+
562+ if "query" not in configuration :
563+ configuration ["query" ] = {}
564+
565+ if "query" in configuration ["query" ]:
566+ raise ValueError (
567+ "The query statement must not be included in the " ,
568+ "'configuration' because it is already provided as" ,
569+ " a separate parameter." ,
570+ )
571+
572+ if "useQueryCache" in configuration ["query" ]:
573+ if use_cache is not None :
574+ raise ValueError (
575+ "'useQueryCache' in 'configuration' conflicts with"
576+ " 'use_cache' parameter. Please specify only one."
577+ )
578+ else :
579+ configuration ["query" ]["useQueryCache" ] = (
580+ True if use_cache is None else use_cache
581+ )
582+
535583 if isinstance (index_col , str ):
536584 index_cols = [index_col ]
537585 else :
@@ -541,7 +589,7 @@ def _read_gbq_query(
541589 query ,
542590 index_cols ,
543591 api_name = api_name ,
544- use_cache = use_cache ,
592+ configuration = configuration ,
545593 )
546594
547595 # If there was no destination table, that means the query must have
@@ -565,7 +613,7 @@ def _read_gbq_query(
565613 index_col = index_cols ,
566614 columns = columns ,
567615 max_results = max_results ,
568- use_cache = use_cache ,
616+ use_cache = configuration [ "query" ][ "useQueryCache" ] ,
569617 )
570618
571619 def read_gbq_table (
@@ -1656,13 +1704,14 @@ def _start_query(
16561704 sql : str ,
16571705 job_config : Optional [bigquery .job .QueryJobConfig ] = None ,
16581706 max_results : Optional [int ] = None ,
1707+ timeout : Optional [float ] = None ,
16591708 ) -> Tuple [bigquery .table .RowIterator , bigquery .QueryJob ]:
16601709 """
16611710 Starts BigQuery query job and waits for results.
16621711 """
16631712 job_config = self ._prepare_query_job_config (job_config )
16641713 return bigframes .session ._io .bigquery .start_query_with_client (
1665- self .bqclient , sql , job_config , max_results
1714+ self .bqclient , sql , job_config , max_results , timeout
16661715 )
16671716
16681717 def _start_query_ml_ddl (
@@ -1876,3 +1925,25 @@ def _convert_to_nonnull_string(column: ibis_types.Column) -> ibis_types.StringVa
18761925 # Escape backslashes and use backslash as delineator
18771926 escaped = typing .cast (ibis_types .StringColumn , result .fillna ("" )).replace ("\\ " , "\\ \\ " ) # type: ignore
18781927 return typing .cast (ibis_types .StringColumn , ibis .literal ("\\ " )).concat (escaped )
1928+
1929+
1930+ def _transform_read_gbq_configuration (configuration : Optional [dict ]) -> dict :
1931+ """
1932+ For backwards-compatibility, convert any previously client-side only
1933+ parameters such as timeoutMs to the property name expected by the REST API.
1934+
1935+ Makes a copy of configuration if changes are needed.
1936+ """
1937+
1938+ if configuration is None :
1939+ return {}
1940+
1941+ timeout_ms = configuration .get ("query" , {}).get ("timeoutMs" )
1942+ if timeout_ms is not None :
1943+ # Transform timeoutMs to an actual server-side configuration.
1944+ # https://github.com/googleapis/python-bigquery-pandas/issues/479
1945+ configuration = copy .deepcopy (configuration )
1946+ del configuration ["query" ]["timeoutMs" ]
1947+ configuration ["jobTimeoutMs" ] = timeout_ms
1948+
1949+ return configuration
0 commit comments