2323import com .google .api .core .ApiFutures ;
2424import com .google .cloud .bigquery .BigQuery ;
2525import com .google .cloud .bigquery .BigQueryOptions ;
26+ import com .google .cloud .bigquery .QueryJobConfiguration ;
2627import com .google .cloud .bigquery .Schema ;
2728import com .google .cloud .bigquery .Table ;
29+ import com .google .cloud .bigquery .TableResult ;
2830import com .google .cloud .bigquery .storage .v1 .AppendRowsResponse ;
2931import com .google .cloud .bigquery .storage .v1 .Exceptions ;
32+ import com .google .cloud .bigquery .storage .v1 .Exceptions .AppendSerializtionError ;
3033import com .google .cloud .bigquery .storage .v1 .Exceptions .StorageException ;
3134import com .google .cloud .bigquery .storage .v1 .JsonStreamWriter ;
3235import com .google .cloud .bigquery .storage .v1 .TableName ;
3740import io .grpc .Status ;
3841import io .grpc .Status .Code ;
3942import java .io .IOException ;
43+ import java .util .Map ;
4044import java .util .concurrent .Phaser ;
4145import javax .annotation .concurrent .GuardedBy ;
4246import org .json .JSONArray ;
@@ -69,7 +73,11 @@ public static void writeToDefaultStream(String projectId, String datasetName, St
6973 JSONArray jsonArr = new JSONArray ();
7074 for (int j = 0 ; j < 10 ; j ++) {
7175 JSONObject record = new JSONObject ();
72- record .put ("test_string" , String .format ("record %03d-%03d" , i , j ));
76+ StringBuilder sbSuffix = new StringBuilder ();
77+ for (int k = 0 ; k < j ; k ++) {
78+ sbSuffix .append (k );
79+ }
80+ record .put ("test_string" , String .format ("record %03d-%03d %s" , i , j , sbSuffix .toString ()));
7381 jsonArr .put (record );
7482 }
7583
@@ -78,9 +86,31 @@ public static void writeToDefaultStream(String projectId, String datasetName, St
7886
7987 // Final cleanup for the stream during worker teardown.
8088 writer .cleanup ();
89+ verifyExpectedRowCount (parentTable , 12 );
8190 System .out .println ("Appended records successfully." );
8291 }
8392
93+ private static void verifyExpectedRowCount (TableName parentTable , int expectedRowCount )
94+ throws InterruptedException {
95+ String queryRowCount =
96+ "SELECT COUNT(*) FROM `"
97+ + parentTable .getProject ()
98+ + "."
99+ + parentTable .getDataset ()
100+ + "."
101+ + parentTable .getTable ()
102+ + "`" ;
103+ QueryJobConfiguration queryConfig = QueryJobConfiguration .newBuilder (queryRowCount ).build ();
104+ BigQuery bigquery = BigQueryOptions .getDefaultInstance ().getService ();
105+ TableResult results = bigquery .query (queryConfig );
106+ int countRowsActual =
107+ Integer .parseInt (results .getValues ().iterator ().next ().get ("f0_" ).getStringValue ());
108+ if (countRowsActual != expectedRowCount ) {
109+ throw new RuntimeException (
110+ "Unexpected row count. Expected: " + expectedRowCount + ". Actual: " + countRowsActual );
111+ }
112+ }
113+
84114 private static class AppendContext {
85115
86116 JSONArray data ;
@@ -170,7 +200,7 @@ public AppendCompleteCallback(DataWriter parent, AppendContext appendContext) {
170200 }
171201
172202 public void onSuccess (AppendRowsResponse response ) {
173- System .out .format ("Append success\ n " );
203+ System .out .format ("Append success% n" );
174204 done ();
175205 }
176206
@@ -182,16 +212,56 @@ public void onFailure(Throwable throwable) {
182212 if (appendContext .retryCount < MAX_RETRY_COUNT
183213 && RETRIABLE_ERROR_CODES .contains (status .getCode ())) {
184214 appendContext .retryCount ++;
185- try {
186- // Since default stream appends are not ordered, we can simply retry the appends.
187- // Retrying with exclusive streams requires more careful consideration.
188- this .parent .append (appendContext );
189- // Mark the existing attempt as done since it's being retried.
215+ // Use a separate thread to avoid potentially blocking while we are in a callback.
216+ new Thread (
217+ () -> {
218+ try {
219+ // Since default stream appends are not ordered, we can simply retry the
220+ // appends.
221+ // Retrying with exclusive streams requires more careful consideration.
222+ this .parent .append (appendContext );
223+ } catch (Exception e ) {
224+ // Fall through to return error.
225+ System .out .format ("Failed to retry append: %s%n" , e );
226+ }
227+ })
228+ .start ();
229+ // Mark the existing attempt as done since it's being retried.
230+ done ();
231+ return ;
232+ }
233+
234+ if (throwable instanceof AppendSerializtionError ) {
235+ AppendSerializtionError ase = (AppendSerializtionError ) throwable ;
236+ Map <Integer , String > rowIndexToErrorMessage = ase .getRowIndexToErrorMessage ();
237+ if (rowIndexToErrorMessage .size () > 0 ) {
238+ // Omit the faulty rows
239+ JSONArray dataNew = new JSONArray ();
240+ for (int i = 0 ; i < appendContext .data .length (); i ++) {
241+ if (!rowIndexToErrorMessage .containsKey (i )) {
242+ dataNew .put (appendContext .data .get (i ));
243+ } else {
244+ // process faulty rows by placing them on a dead-letter-queue, for instance
245+ }
246+ }
247+
248+ // Mark the existing attempt as done since we got a response for it
190249 done ();
250+
251+ // Retry the remaining valid rows, but using a separate thread to
252+ // avoid potentially blocking while we are in a callback.
253+ if (dataNew .length () > 0 ) {
254+ new Thread (
255+ () -> {
256+ try {
257+ this .parent .append (new AppendContext (dataNew , 0 ));
258+ } catch (Exception e2 ) {
259+ System .out .format ("Failed to retry append with filtered rows: %s%n" , e2 );
260+ }
261+ })
262+ .start ();
263+ }
191264 return ;
192- } catch (Exception e ) {
193- // Fall through to return error.
194- System .out .format ("Failed to retry append: %s\n " , e );
195265 }
196266 }
197267
@@ -202,7 +272,7 @@ public void onFailure(Throwable throwable) {
202272 (storageException != null ) ? storageException : new RuntimeException (throwable );
203273 }
204274 }
205- System .out .format ("Error: %s\ n " , throwable );
275+ System .out .format ("Error that arrived : %s% n" , throwable );
206276 done ();
207277 }
208278
0 commit comments