Skip to content

Commit cf1ab06

Browse files
author
Praful Makani
authored
fix: add retry logic for readrows v1beta2 (#315)
1 parent 00ddddd commit cf1ab06

File tree

6 files changed

+567
-4
lines changed

6 files changed

+567
-4
lines changed

google-cloud-bigquerystorage/pom.xml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,6 @@
174174
<dependency>
175175
<groupId>com.google.api.grpc</groupId>
176176
<artifactId>grpc-google-cloud-bigquerystorage-v1beta2</artifactId>
177-
<scope>test</scope>
178177
</dependency>
179178
<dependency>
180179
<groupId>com.google.api.grpc</groupId>

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/stub/EnhancedBigQueryReadStub.java

Lines changed: 66 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,31 @@
1717

1818
import com.google.api.core.InternalApi;
1919
import com.google.api.gax.core.BackgroundResource;
20+
import com.google.api.gax.grpc.GrpcCallSettings;
21+
import com.google.api.gax.grpc.GrpcRawCallableFactory;
22+
import com.google.api.gax.retrying.ExponentialRetryAlgorithm;
23+
import com.google.api.gax.retrying.ScheduledRetryingExecutor;
24+
import com.google.api.gax.retrying.StreamingRetryAlgorithm;
25+
import com.google.api.gax.rpc.Callables;
2026
import com.google.api.gax.rpc.ClientContext;
27+
import com.google.api.gax.rpc.RequestParamsExtractor;
28+
import com.google.api.gax.rpc.ServerStreamingCallSettings;
2129
import com.google.api.gax.rpc.ServerStreamingCallable;
2230
import com.google.api.gax.rpc.UnaryCallable;
31+
import com.google.api.gax.tracing.SpanName;
32+
import com.google.api.gax.tracing.TracedServerStreamingCallable;
33+
import com.google.cloud.bigquery.storage.v1beta2.BigQueryReadGrpc;
2334
import com.google.cloud.bigquery.storage.v1beta2.CreateReadSessionRequest;
2435
import com.google.cloud.bigquery.storage.v1beta2.ReadRowsRequest;
2536
import com.google.cloud.bigquery.storage.v1beta2.ReadRowsResponse;
2637
import com.google.cloud.bigquery.storage.v1beta2.ReadSession;
2738
import com.google.cloud.bigquery.storage.v1beta2.SplitReadStreamRequest;
2839
import com.google.cloud.bigquery.storage.v1beta2.SplitReadStreamResponse;
40+
import com.google.cloud.bigquery.storage.v1beta2.stub.readrows.ApiResultRetryAlgorithm;
41+
import com.google.cloud.bigquery.storage.v1beta2.stub.readrows.ReadRowsRetryingCallable;
42+
import com.google.common.collect.ImmutableMap;
2943
import java.io.IOException;
44+
import java.util.Map;
3045
import java.util.concurrent.TimeUnit;
3146

3247
/**
@@ -35,7 +50,11 @@
3550
* <p>This class is for advanced usage and reflects the underlying API directly.
3651
*/
3752
public class EnhancedBigQueryReadStub implements BackgroundResource {
53+
54+
private static final String TRACING_OUTER_CLIENT_NAME = "BigQueryStorage";
3855
private final GrpcBigQueryReadStub stub;
56+
private final BigQueryReadStubSettings stubSettings;
57+
private final ClientContext context;
3958

4059
public static EnhancedBigQueryReadStub create(EnhancedBigQueryReadStubSettings settings)
4160
throws IOException {
@@ -69,20 +88,64 @@ public static EnhancedBigQueryReadStub create(EnhancedBigQueryReadStubSettings s
6988
BigQueryReadStubSettings baseSettings = baseSettingsBuilder.build();
7089
ClientContext clientContext = ClientContext.create(baseSettings);
7190
GrpcBigQueryReadStub stub = new GrpcBigQueryReadStub(baseSettings, clientContext);
72-
return new EnhancedBigQueryReadStub(stub);
91+
return new EnhancedBigQueryReadStub(stub, baseSettings, clientContext);
7392
}
7493

7594
@InternalApi("Visible for testing")
76-
EnhancedBigQueryReadStub(GrpcBigQueryReadStub stub) {
95+
EnhancedBigQueryReadStub(
96+
GrpcBigQueryReadStub stub, BigQueryReadStubSettings stubSettings, ClientContext context) {
7797
this.stub = stub;
98+
this.stubSettings = stubSettings;
99+
this.context = context;
78100
}
79101

80102
public UnaryCallable<CreateReadSessionRequest, ReadSession> createReadSessionCallable() {
81103
return stub.createReadSessionCallable();
82104
}
83105

84106
public ServerStreamingCallable<ReadRowsRequest, ReadRowsResponse> readRowsCallable() {
85-
return stub.readRowsCallable();
107+
ServerStreamingCallable<ReadRowsRequest, ReadRowsResponse> innerCallable =
108+
GrpcRawCallableFactory.createServerStreamingCallable(
109+
GrpcCallSettings.<ReadRowsRequest, ReadRowsResponse>newBuilder()
110+
.setMethodDescriptor(BigQueryReadGrpc.getReadRowsMethod())
111+
.setParamsExtractor(
112+
new RequestParamsExtractor<ReadRowsRequest>() {
113+
@Override
114+
public Map<String, String> extract(ReadRowsRequest request) {
115+
return ImmutableMap.of(
116+
"read_stream", String.valueOf(request.getReadStream()));
117+
}
118+
})
119+
.build(),
120+
stubSettings.readRowsSettings().getRetryableCodes());
121+
ServerStreamingCallSettings<ReadRowsRequest, ReadRowsResponse> callSettings =
122+
stubSettings.readRowsSettings();
123+
124+
StreamingRetryAlgorithm<Void> retryAlgorithm =
125+
new StreamingRetryAlgorithm<>(
126+
new ApiResultRetryAlgorithm<Void>(),
127+
new ExponentialRetryAlgorithm(callSettings.getRetrySettings(), context.getClock()));
128+
129+
ScheduledRetryingExecutor<Void> retryingExecutor =
130+
new ScheduledRetryingExecutor<>(retryAlgorithm, context.getExecutor());
131+
132+
if (context.getStreamWatchdog() != null) {
133+
innerCallable = Callables.watched(innerCallable, callSettings, context);
134+
}
135+
136+
ReadRowsRetryingCallable outerCallable =
137+
new ReadRowsRetryingCallable(
138+
context.getDefaultCallContext(),
139+
innerCallable,
140+
retryingExecutor,
141+
callSettings.getResumptionStrategy());
142+
143+
ServerStreamingCallable<ReadRowsRequest, ReadRowsResponse> traced =
144+
new TracedServerStreamingCallable<>(
145+
outerCallable,
146+
context.getTracerFactory(),
147+
SpanName.of(TRACING_OUTER_CLIENT_NAME, "ReadRows"));
148+
return traced.withDefaultCallContext(context.getDefaultCallContext());
86149
}
87150

88151
public UnaryCallable<SplitReadStreamRequest, SplitReadStreamResponse> splitReadStreamCallable() {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Copyright 2020 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.bigquery.storage.v1beta2.stub.readrows;
18+
19+
import com.google.api.core.InternalApi;
20+
import com.google.api.gax.retrying.ResultRetryAlgorithm;
21+
import com.google.api.gax.retrying.TimedAttemptSettings;
22+
import com.google.api.gax.rpc.ApiException;
23+
import io.grpc.Status;
24+
import org.threeten.bp.Duration;
25+
26+
/** For internal use, public for technical reasons. */
27+
@InternalApi
28+
public class ApiResultRetryAlgorithm<ResponseT> implements ResultRetryAlgorithm<ResponseT> {
29+
// Duration to sleep on if the error is DEADLINE_EXCEEDED.
30+
public static final Duration DEADLINE_SLEEP_DURATION = Duration.ofMillis(1);
31+
32+
@Override
33+
public TimedAttemptSettings createNextAttempt(
34+
Throwable prevThrowable, ResponseT prevResponse, TimedAttemptSettings prevSettings) {
35+
if (prevThrowable != null) {
36+
Status status = Status.fromThrowable(prevThrowable);
37+
if (status.getCode() == Status.Code.INTERNAL
38+
&& status.getDescription() != null
39+
&& status.getDescription().equals("Received unexpected EOS on DATA frame from server")) {
40+
return TimedAttemptSettings.newBuilder()
41+
.setGlobalSettings(prevSettings.getGlobalSettings())
42+
.setRetryDelay(prevSettings.getRetryDelay())
43+
.setRpcTimeout(prevSettings.getRpcTimeout())
44+
.setRandomizedRetryDelay(DEADLINE_SLEEP_DURATION)
45+
.setAttemptCount(prevSettings.getAttemptCount() + 1)
46+
.setFirstAttemptStartTimeNanos(prevSettings.getFirstAttemptStartTimeNanos())
47+
.build();
48+
}
49+
}
50+
return null;
51+
}
52+
53+
@Override
54+
public boolean shouldRetry(Throwable prevThrowable, ResponseT prevResponse) {
55+
if (prevThrowable != null) {
56+
Status status = Status.fromThrowable(prevThrowable);
57+
if (status.getCode() == Status.Code.INTERNAL
58+
&& status.getDescription() != null
59+
&& status.getDescription().equals("Received unexpected EOS on DATA frame from server")) {
60+
return true;
61+
}
62+
}
63+
return (prevThrowable instanceof ApiException) && ((ApiException) prevThrowable).isRetryable();
64+
}
65+
}

0 commit comments

Comments
 (0)