Skip to content

Commit 6243ad5

Browse files
fix: update storageError support due to server side enhancement (#1456)
* fix: update storageError support due to server side enhancement client-side changes corresponding to cl/408735437 * update
1 parent 4616adb commit 6243ad5

File tree

4 files changed

+9
-98
lines changed

4 files changed

+9
-98
lines changed

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java

Lines changed: 3 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,7 @@
1919
import com.google.common.collect.ImmutableMap;
2020
import com.google.protobuf.Any;
2121
import com.google.protobuf.InvalidProtocolBufferException;
22-
import io.grpc.Status;
23-
import io.grpc.Status.Code;
24-
import java.util.regex.Matcher;
25-
import java.util.regex.Pattern;
22+
import io.grpc.protobuf.StatusProto;
2623
import javax.annotation.Nullable;
2724

2825
/** Exceptions for Storage Client Libraries. */
@@ -124,30 +121,8 @@ public static StorageException toStorageException(
124121
*/
125122
@Nullable
126123
public static StorageException toStorageException(Throwable exception) {
127-
// TODO: switch to using rpcStatus when cl/408735437 is rolled out
128-
// com.google.rpc.Status rpcStatus = StatusProto.fromThrowable(exception);
129-
Status grpcStatus = Status.fromThrowable(exception);
130-
String message = exception.getMessage();
131-
String streamPatternString = "projects/[^/]+/datasets/[^/]+/tables/[^/]+/streams/[^/]+";
132-
Pattern streamPattern = Pattern.compile(streamPatternString);
133-
if (message == null) {
134-
return null;
135-
}
136-
// TODO: SWTICH TO CHECK SCHEMA_MISMATCH_EXTRA_FIELDS IN THE ERROR CODE
137-
if (grpcStatus.getCode().equals(Code.INVALID_ARGUMENT)
138-
&& message.toLowerCase().contains("input schema has more fields than bigquery schema")) {
139-
Matcher streamMatcher = streamPattern.matcher(message);
140-
String entity = streamMatcher.find() ? streamMatcher.group() : "streamName unkown";
141-
return new SchemaMismatchedException(entity, message, exception);
142-
}
143-
// TODO: SWTICH TO CHECK STREAM_FINALIZED IN THE ERROR CODE
144-
if (grpcStatus.getCode().equals(Code.INVALID_ARGUMENT)
145-
&& message.toLowerCase().contains("stream has been finalized and cannot be appended")) {
146-
Matcher streamMatcher = streamPattern.matcher(message);
147-
String entity = streamMatcher.find() ? streamMatcher.group() : "streamName unkown";
148-
return new StreamFinalizedException(entity, message, exception);
149-
}
150-
return null;
124+
com.google.rpc.Status rpcStatus = StatusProto.fromThrowable(exception);
125+
return toStorageException(rpcStatus, exception);
151126
}
152127

153128
private Exceptions() {}

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -590,18 +590,15 @@ private void doneCallback(Throwable finalStatus) {
590590
+ " for stream "
591591
+ streamName);
592592
} else {
593-
this.connectionFinalStatus = finalStatus;
593+
Exceptions.StorageException storageException = Exceptions.toStorageException(finalStatus);
594+
this.connectionFinalStatus = storageException != null ? storageException : finalStatus;
594595
log.info(
595596
"Stream finished with error " + finalStatus.toString() + " for stream " + streamName);
596597
}
597598
}
598599
} finally {
599600
this.lock.unlock();
600601
}
601-
Exceptions.StorageException storageException = Exceptions.toStorageException(finalStatus);
602-
if (storageException != null) {
603-
this.connectionFinalStatus = storageException;
604-
}
605602
}
606603

607604
@GuardedBy("lock")

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java

Lines changed: 0 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -343,64 +343,6 @@ public void testAppendFailedSchemaError() throws Exception {
343343
writer.close();
344344
}
345345

346-
@Test
347-
public void testAppendFailedOnDone() throws Exception {
348-
StreamWriter writer = getTestStreamWriter();
349-
350-
StatusRuntimeException exception =
351-
new StatusRuntimeException(
352-
io.grpc.Status.INVALID_ARGUMENT.withDescription(
353-
"io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Input schema has more fields than BigQuery schema"));
354-
355-
testBigQueryWrite.addResponse(createAppendResponse(0));
356-
testBigQueryWrite.addException(exception);
357-
358-
ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
359-
ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer, new String[] {"B"});
360-
361-
assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue());
362-
Exceptions.SchemaMismatchedException actualError =
363-
assertFutureException(Exceptions.SchemaMismatchedException.class, appendFuture2);
364-
assertTrue(
365-
actualError
366-
.getMessage()
367-
.contains(
368-
"io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Input schema has more fields than BigQuery schema"));
369-
370-
writer.close();
371-
}
372-
373-
// TODO(stephwang): update test case to below when toStorageException is updated
374-
// @Test
375-
// public void testAppendFailedOnDone2() throws Exception {
376-
// StreamWriter writer = getTestStreamWriter();
377-
//
378-
// StorageError storageError =
379-
// StorageError.newBuilder()
380-
// .setCode(StorageErrorCode.SCHEMA_MISMATCH_EXTRA_FIELDS)
381-
// .setEntity("foobar")
382-
// .build();
383-
// com.google.rpc.Status statusProto =
384-
// com.google.rpc.Status.newBuilder()
385-
// .addDetails(Any.pack(storageError))
386-
// .build();
387-
//
388-
// StatusRuntimeException exception = StatusProto.toStatusRuntimeException(statusProto);
389-
//
390-
// testBigQueryWrite.addResponse(createAppendResponse(0));
391-
// testBigQueryWrite.addException(exception);
392-
//
393-
// ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
394-
// ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer, new String[] {"B"});
395-
//
396-
// assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue());
397-
// Exceptions.SchemaMismatchedException actualError =
398-
// assertFutureException(Exceptions.SchemaMismatchedException.class, appendFuture2);
399-
// assertEquals("foobar", actualError.getStreamName());
400-
//
401-
// writer.close();
402-
// }
403-
404346
@Test
405347
public void longIdleBetweenAppends() throws Exception {
406348
StreamWriter writer = getTestStreamWriter();

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -745,10 +745,9 @@ public void testStreamSchemaMisMatchError() throws IOException, InterruptedExcep
745745
response.get();
746746
Assert.fail("Should fail");
747747
} catch (ExecutionException e) {
748-
// TODO(stephwang): update test case when toStroageException is updated
748+
assertEquals(Exceptions.SchemaMismatchedException.class, e.getCause().getClass());
749749
assertThat(e.getCause().getMessage())
750-
.contains(
751-
"io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Input schema has more fields than BigQuery schema");
750+
.contains("Schema mismatch due to extra fields in user schema");
752751
}
753752
}
754753
}
@@ -777,10 +776,8 @@ public void testStreamFinalizedError()
777776
response.get();
778777
Assert.fail("Should fail");
779778
} catch (ExecutionException e) {
780-
// //TODO(stephwang): update test case when toStroageException is updated
781-
assertThat(e.getCause().getMessage())
782-
.contains(
783-
"io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Stream has been finalized and cannot be appended");
779+
assertEquals(Exceptions.StreamFinalizedException.class, e.getCause().getClass());
780+
assertThat(e.getCause().getMessage()).contains("Stream is finalized");
784781
}
785782
}
786783
}

0 commit comments

Comments
 (0)