1616
1717package com .example .bigquerystorage ;
1818
19+ import com .google .api .client .util .*;
1920import com .google .api .core .ApiFuture ;
2021import com .google .cloud .bigquery .*;
2122import com .google .cloud .bigquery .storage .v1beta2 .*;
2728
2829public class WriteCommittedStream {
2930
30- public static Status .Code getStatusCode (StatusRuntimeException e ) {
31+ static Status .Code getStatusCode (StatusRuntimeException e ) {
3132 return e .getStatus ().getCode ();
3233 }
3334
35+ // Returns true if the operation should be retried.
36+ static Boolean isRetryable (ExecutionException e ) {
37+ Throwable cause = e .getCause ();
38+ if (cause instanceof StatusRuntimeException ) {
39+ Status status = ((StatusRuntimeException )cause ).getStatus ();
40+ return (status == Status .ABORTED );
41+ }
42+ return false ;
43+ }
44+
3445 public static void runWriteCommittedStream () {
3546 // TODO(developer): Replace these variables before running the sample.
3647 String projectId = "MY_PROJECT_ID" ;
@@ -85,7 +96,7 @@ public static void writeCommittedStream(String projectId, String datasetName, St
8596 System .out .println ("Appended records successfully." );
8697
8798 } catch (Exception e ) {
88- System .out .println ("Failed to append records. \n " + e .toString ());
99+ System .out .println ("Failed to append records.\n " + e .toString ());
89100 }
90101 }
91102
@@ -100,17 +111,41 @@ public static void writeToDefaultStream(String projectId, String datasetName, St
100111 try (JsonStreamWriter writer =
101112 JsonStreamWriter .newBuilder (parent .toString (), schema ).createDefaultStream ().build ()) {
102113
114+ ExponentialBackOff backoff = new ExponentialBackOff ();
115+
103116 for (int i = 0 ; i < 10 ; i ++) {
104117 JSONObject record = new JSONObject ();
105118 record .put ("col1" , String .format ("record %03d" , i ));
106119 JSONArray jsonArr = new JSONArray ();
107120 jsonArr .put (record );
108121
109- ApiFuture <AppendRowsResponse > future = writer .append (jsonArr );
110- AppendRowsResponse response = future .get ();
122+ backoff .reset ();
123+ Boolean retry = true ;
124+ while (retry ) {
125+ try {
126+
127+ ApiFuture <AppendRowsResponse > future = writer .append (jsonArr );
128+ AppendRowsResponse response = future .get ();
129+ retry = false ;
130+
131+ } catch (ExecutionException ex ) {
132+ // If the error is retryable, retry the operation with exponential backoff.
133+ // Don't retry past the maximum retry interval.
134+ long backOffMillis = backoff .nextBackOffMillis ();
135+ if (isRetryable (ex ) && backOffMillis != BackOff .STOP ) {
136+ Thread .sleep (backOffMillis );
137+ }
138+ else {
139+ throw ex ;
140+ }
141+ }
142+ }
111143 }
144+
145+ System .out .println ("Appended records successfully." );
146+
112147 } catch (Exception e ) {
113- System .out .println (e );
148+ System .out .println ("Failed to append records. \n " + e . toString () );
114149 }
115150 }
116151}
0 commit comments