2323import typing
2424from typing import (
2525 Dict ,
26+ Generator ,
2627 Hashable ,
2728 IO ,
2829 Iterable ,
3637import bigframes_vendored .constants as constants
3738import bigframes_vendored .pandas .io .gbq as third_party_pandas_gbq
3839import google .api_core .exceptions
40+ from google .cloud import bigquery_storage_v1
3941import google .cloud .bigquery as bigquery
40- import google .cloud .bigquery . table
42+ from google .cloud .bigquery_storage_v1 import types as bq_storage_types
4143import pandas
4244import pyarrow as pa
4345
44- from bigframes .core import local_data , utils
46+ from bigframes .core import guid , local_data , utils
4547import bigframes .core as core
4648import bigframes .core .blocks as blocks
4749import bigframes .core .schema as schemata
@@ -142,13 +144,15 @@ def __init__(
142144 self ,
143145 session : bigframes .session .Session ,
144146 bqclient : bigquery .Client ,
147+ write_client : bigquery_storage_v1 .BigQueryWriteClient ,
145148 storage_manager : bigframes .session .temporary_storage .TemporaryStorageManager ,
146149 default_index_type : bigframes .enums .DefaultIndexKind ,
147150 scan_index_uniqueness : bool ,
148151 force_total_order : bool ,
149152 metrics : Optional [bigframes .session .metrics .ExecutionMetrics ] = None ,
150153 ):
151154 self ._bqclient = bqclient
155+ self ._write_client = write_client
152156 self ._storage_manager = storage_manager
153157 self ._default_index_type = default_index_type
154158 self ._scan_index_uniqueness = scan_index_uniqueness
@@ -165,7 +169,7 @@ def __init__(
165169 def read_pandas (
166170 self ,
167171 pandas_dataframe : pandas .DataFrame ,
168- method : Literal ["load" , "stream" ],
172+ method : Literal ["load" , "stream" , "write" ],
169173 api_name : str ,
170174 ) -> dataframe .DataFrame :
171175 # TODO: Push this into from_pandas, along with index flag
@@ -183,6 +187,8 @@ def read_pandas(
183187 array_value = self .load_data (managed_data , api_name = api_name )
184188 elif method == "stream" :
185189 array_value = self .stream_data (managed_data )
190+ elif method == "write" :
191+ array_value = self .write_data (managed_data )
186192 else :
187193 raise ValueError (f"Unsupported read method { method } " )
188194
@@ -198,7 +204,7 @@ def load_data(
198204 self , data : local_data .ManagedArrowTable , api_name : Optional [str ] = None
199205 ) -> core .ArrayValue :
200206 """Load managed data into bigquery"""
201- ordering_col = "bf_load_job_offsets"
207+ ordering_col = guid . generate_guid ( "load_offsets_" )
202208
203209 # JSON support incomplete
204210 for item in data .schema .items :
@@ -244,7 +250,7 @@ def load_data(
244250
245251 def stream_data (self , data : local_data .ManagedArrowTable ) -> core .ArrayValue :
246252 """Load managed data into bigquery"""
247- ordering_col = "bf_stream_job_offsets"
253+ ordering_col = guid . generate_guid ( "stream_offsets_" )
248254 schema_w_offsets = data .schema .append (
249255 schemata .SchemaItem (ordering_col , bigframes .dtypes .INT_DTYPE )
250256 )
@@ -277,6 +283,53 @@ def stream_data(self, data: local_data.ManagedArrowTable) -> core.ArrayValue:
277283 n_rows = data .data .num_rows ,
278284 ).drop_columns ([ordering_col ])
279285
286+ def write_data (self , data : local_data .ManagedArrowTable ) -> core .ArrayValue :
287+ """Load managed data into bigquery"""
288+ ordering_col = guid .generate_guid ("stream_offsets_" )
289+ schema_w_offsets = data .schema .append (
290+ schemata .SchemaItem (ordering_col , bigframes .dtypes .INT_DTYPE )
291+ )
292+ bq_schema = schema_w_offsets .to_bigquery (_STREAM_JOB_TYPE_OVERRIDES )
293+ bq_table_ref = self ._storage_manager .create_temp_table (
294+ bq_schema , [ordering_col ]
295+ )
296+
297+ requested_stream = bq_storage_types .stream .WriteStream ()
298+ requested_stream .type_ = bq_storage_types .stream .WriteStream .Type .COMMITTED # type: ignore
299+
300+ stream_request = bq_storage_types .CreateWriteStreamRequest (
301+ parent = bq_table_ref .to_bqstorage (), write_stream = requested_stream
302+ )
303+ stream = self ._write_client .create_write_stream (request = stream_request )
304+
305+ def request_gen () -> Generator [bq_storage_types .AppendRowsRequest , None , None ]:
306+ schema , batches = data .to_arrow (
307+ offsets_col = ordering_col , duration_type = "int"
308+ )
309+ for batch in batches :
310+ request = bq_storage_types .AppendRowsRequest (write_stream = stream .name )
311+ request .arrow_rows .writer_schema .serialized_schema = (
312+ schema .serialize ().to_pybytes ()
313+ )
314+ request .arrow_rows .rows .serialized_record_batch = (
315+ batch .serialize ().to_pybytes ()
316+ )
317+ yield request
318+
319+ for response in self ._write_client .append_rows (requests = request_gen ()):
320+ if response .row_errors :
321+ raise ValueError (
322+ f"Problem loading at least one row from DataFrame: { response .row_errors } . { constants .FEEDBACK_LINK } "
323+ )
324+ destination_table = self ._bqclient .get_table (bq_table_ref )
325+ return core .ArrayValue .from_table (
326+ table = destination_table ,
327+ schema = schema_w_offsets ,
328+ session = self ._session ,
329+ offsets_col = ordering_col ,
330+ n_rows = data .data .num_rows ,
331+ ).drop_columns ([ordering_col ])
332+
280333 def _start_generic_job (self , job : formatting_helpers .GenericJob ):
281334 if bigframes .options .display .progress_bar is not None :
282335 formatting_helpers .wait_for_job (
0 commit comments