Skip to content

Commit d56e1ca

Browse files
authored
feat: retry certain RESOURCE_EXHAUSTED errors observed during ReadRows and report retry attempts (#1257)
Bq Storage Read service will start returning a retryable RESOURCE_EXHAUSTED error in the next few weeks when a read session's parallelism is considered to be excessive, so this PR expands retry handling logic for ReadRows with 2 changes: 1. If a ReadRows request fails with a RESOURCE_EXHAUSTED error and the error has an associated RetryInfo, it is now considered to be retryable and retry delay is set according to the RetryInfo. 1. If the client decides to retry, it now notifies the user with the provided RetryAttemptListener object. This will be useful as a negative feedback mechanism for future SplitReadStream requests which in return will reduce the likelihood of receiving the new retryable RESOURCE_EXHAUSTED error.
1 parent 0edb25d commit d56e1ca

File tree

17 files changed

+731
-26
lines changed

17 files changed

+731
-26
lines changed

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/util/Errors.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,53 @@
1515
*/
1616
package com.google.cloud.bigquery.storage.util;
1717

18+
import com.google.rpc.RetryInfo;
19+
import io.grpc.Metadata;
1820
import io.grpc.Status;
21+
import io.grpc.protobuf.ProtoUtils;
22+
import org.threeten.bp.Duration;
1923

2024
/** Static utility methods for working with Errors returned from the service. */
2125
public class Errors {
2226
private Errors() {};
2327

28+
public static class IsRetryableStatusResult {
29+
public boolean isRetryable = false;
30+
public Duration retryDelay = null;
31+
}
32+
33+
private static final Metadata.Key<RetryInfo> KEY_RETRY_INFO =
34+
ProtoUtils.keyForProto(RetryInfo.getDefaultInstance());
35+
36+
/**
37+
* Returns true iff the Status indicates an error that is retryable.
38+
*
39+
* <p>Generally, internal errors are not considered retryable, however there are certain transient
40+
* network issues that appear as internal but are in fact retryable.
41+
*
42+
* <p>Resource exhausted errors are only considered retryable if metadata contains a serialized
43+
* RetryInfo object.
44+
*/
45+
public static IsRetryableStatusResult isRetryableStatus(Status status, Metadata metadata) {
46+
IsRetryableStatusResult result = new IsRetryableStatusResult();
47+
48+
result.isRetryable = isRetryableInternalStatus(status);
49+
if (!result.isRetryable
50+
&& status.getCode() == Status.Code.RESOURCE_EXHAUSTED
51+
&& metadata != null
52+
&& metadata.containsKey(KEY_RETRY_INFO)) {
53+
RetryInfo retryInfo = metadata.get(KEY_RETRY_INFO);
54+
if (retryInfo.hasRetryDelay()) {
55+
result.isRetryable = true;
56+
result.retryDelay =
57+
Duration.ofSeconds(
58+
retryInfo.getRetryDelay().getSeconds(), retryInfo.getRetryDelay().getNanos());
59+
}
60+
}
61+
62+
return result;
63+
}
64+
2465
/**
2566
* Returns true iff the Status indicates and internal error that is retryable.
2667
*

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/BigQueryReadClient.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,9 @@ public static final BigQueryReadClient create(EnhancedBigQueryReadStub stub) {
126126
*/
127127
protected BigQueryReadClient(BigQueryReadSettings settings) throws IOException {
128128
this.settings = settings;
129-
this.stub = EnhancedBigQueryReadStub.create(settings.getTypedStubSettings());
129+
this.stub =
130+
EnhancedBigQueryReadStub.create(
131+
settings.getTypedStubSettings(), settings.getReadRowsRetryAttemptListener());
130132
}
131133

132134
@BetaApi("A restructuring of stub classes is planned, so this may break in the future")

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/BigQueryReadSettings.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import com.google.api.gax.rpc.TransportChannelProvider;
2828
import com.google.api.gax.rpc.UnaryCallSettings;
2929
import com.google.cloud.bigquery.storage.v1.stub.EnhancedBigQueryReadStubSettings;
30+
import io.grpc.Metadata;
31+
import io.grpc.Status;
3032
import java.io.IOException;
3133
import java.util.List;
3234

@@ -69,6 +71,26 @@ public ServerStreamingCallSettings<ReadRowsRequest, ReadRowsResponse> readRowsSe
6971
return getTypedStubSettings().readRowsSettings();
7072
}
7173

74+
public static interface RetryAttemptListener {
75+
public void onRetryAttempt(Status prevStatus, Metadata prevMetadata);
76+
}
77+
78+
private RetryAttemptListener readRowsRetryAttemptListener = null;
79+
80+
/**
81+
* If a non null readRowsRetryAttemptListener is provided, client will call onRetryAttempt
82+
* function before a failed ReadRows request is retried. This can be used as negative feedback
83+
* mechanism for future decision to split read streams because some retried failures are due to
84+
* resource exhaustion that increased parallelism only makes it worse.
85+
*/
86+
public void setReadRowsRetryAttemptListener(RetryAttemptListener readRowsRetryAttemptListener) {
87+
this.readRowsRetryAttemptListener = readRowsRetryAttemptListener;
88+
}
89+
90+
public RetryAttemptListener getReadRowsRetryAttemptListener() {
91+
return readRowsRetryAttemptListener;
92+
}
93+
7294
/** Returns the object with the settings used for calls to splitReadStream. */
7395
public UnaryCallSettings<SplitReadStreamRequest, SplitReadStreamResponse>
7496
splitReadStreamSettings() {
@@ -176,6 +198,14 @@ public Builder applyToAllUnaryMethods(
176198
return this;
177199
}
178200

201+
private RetryAttemptListener readRowsRetryAttemptListener = null;
202+
203+
public Builder setReadRowsRetryAttemptListener(
204+
RetryAttemptListener readRowsRetryAttemptListener) {
205+
this.readRowsRetryAttemptListener = readRowsRetryAttemptListener;
206+
return this;
207+
}
208+
179209
/** Returns the builder for the settings used for calls to createReadSession. */
180210
public UnaryCallSettings.Builder<CreateReadSessionRequest, ReadSession>
181211
createReadSessionSettings() {
@@ -196,7 +226,9 @@ public Builder applyToAllUnaryMethods(
196226

197227
@Override
198228
public BigQueryReadSettings build() throws IOException {
199-
return new BigQueryReadSettings(this);
229+
BigQueryReadSettings settings = new BigQueryReadSettings(this);
230+
settings.setReadRowsRetryAttemptListener(readRowsRetryAttemptListener);
231+
return settings;
200232
}
201233
}
202234
}

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/EnhancedBigQueryReadStub.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import com.google.api.gax.tracing.SpanName;
3232
import com.google.api.gax.tracing.TracedServerStreamingCallable;
3333
import com.google.cloud.bigquery.storage.v1.BigQueryReadGrpc;
34+
import com.google.cloud.bigquery.storage.v1.BigQueryReadSettings;
3435
import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest;
3536
import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
3637
import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
@@ -54,10 +55,18 @@ public class EnhancedBigQueryReadStub implements BackgroundResource {
5455
private static final String TRACING_OUTER_CLIENT_NAME = "BigQueryStorage";
5556
private final GrpcBigQueryReadStub stub;
5657
private final BigQueryReadStubSettings stubSettings;
58+
private final BigQueryReadSettings.RetryAttemptListener readRowsRetryAttemptListener;
5759
private final ClientContext context;
5860

5961
public static EnhancedBigQueryReadStub create(EnhancedBigQueryReadStubSettings settings)
6062
throws IOException {
63+
return create(settings, null);
64+
}
65+
66+
public static EnhancedBigQueryReadStub create(
67+
EnhancedBigQueryReadStubSettings settings,
68+
BigQueryReadSettings.RetryAttemptListener readRowsRetryAttemptListener)
69+
throws IOException {
6170
// Configure the base settings.
6271
BigQueryReadStubSettings.Builder baseSettingsBuilder =
6372
BigQueryReadStubSettings.newBuilder()
@@ -88,14 +97,19 @@ public static EnhancedBigQueryReadStub create(EnhancedBigQueryReadStubSettings s
8897
BigQueryReadStubSettings baseSettings = baseSettingsBuilder.build();
8998
ClientContext clientContext = ClientContext.create(baseSettings);
9099
GrpcBigQueryReadStub stub = new GrpcBigQueryReadStub(baseSettings, clientContext);
91-
return new EnhancedBigQueryReadStub(stub, baseSettings, clientContext);
100+
return new EnhancedBigQueryReadStub(
101+
stub, baseSettings, readRowsRetryAttemptListener, clientContext);
92102
}
93103

94104
@InternalApi("Visible for testing")
95105
EnhancedBigQueryReadStub(
96-
GrpcBigQueryReadStub stub, BigQueryReadStubSettings stubSettings, ClientContext context) {
106+
GrpcBigQueryReadStub stub,
107+
BigQueryReadStubSettings stubSettings,
108+
BigQueryReadSettings.RetryAttemptListener readRowsRetryAttemptListener,
109+
ClientContext context) {
97110
this.stub = stub;
98111
this.stubSettings = stubSettings;
112+
this.readRowsRetryAttemptListener = readRowsRetryAttemptListener;
99113
this.context = context;
100114
}
101115

@@ -123,7 +137,7 @@ public Map<String, String> extract(ReadRowsRequest request) {
123137

124138
StreamingRetryAlgorithm<Void> retryAlgorithm =
125139
new StreamingRetryAlgorithm<>(
126-
new ApiResultRetryAlgorithm<Void>(),
140+
new ApiResultRetryAlgorithm<Void>(readRowsRetryAttemptListener),
127141
new ExponentialRetryAlgorithm(callSettings.getRetrySettings(), context.getClock()));
128142

129143
ScheduledRetryingExecutor<Void> retryingExecutor =

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/readrows/ApiResultRetryAlgorithm.java

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import com.google.api.gax.retrying.TimedAttemptSettings;
2222
import com.google.api.gax.rpc.ApiException;
2323
import com.google.cloud.bigquery.storage.util.Errors;
24+
import com.google.cloud.bigquery.storage.v1.BigQueryReadSettings;
25+
import io.grpc.Metadata;
2426
import io.grpc.Status;
2527
import org.threeten.bp.Duration;
2628

@@ -30,17 +32,41 @@ public class ApiResultRetryAlgorithm<ResponseT> implements ResultRetryAlgorithm<
3032
// Duration to sleep on if the error is DEADLINE_EXCEEDED.
3133
public static final Duration DEADLINE_SLEEP_DURATION = Duration.ofMillis(1);
3234

35+
private final BigQueryReadSettings.RetryAttemptListener retryAttemptListener;
36+
37+
public ApiResultRetryAlgorithm() {
38+
this(null);
39+
}
40+
41+
public ApiResultRetryAlgorithm(BigQueryReadSettings.RetryAttemptListener retryAttemptListener) {
42+
super();
43+
this.retryAttemptListener = retryAttemptListener;
44+
}
45+
3346
@Override
3447
public TimedAttemptSettings createNextAttempt(
3548
Throwable prevThrowable, ResponseT prevResponse, TimedAttemptSettings prevSettings) {
3649
if (prevThrowable != null) {
3750
Status status = Status.fromThrowable(prevThrowable);
38-
if (Errors.isRetryableInternalStatus(status)) {
51+
Metadata metadata = Status.trailersFromThrowable(prevThrowable);
52+
Errors.IsRetryableStatusResult result = Errors.isRetryableStatus(status, metadata);
53+
if (result.isRetryable) {
54+
// If result.retryDelay isn't null, we know exactly how long we must wait, so both regular
55+
// and randomized delays are the same.
56+
Duration retryDelay = result.retryDelay;
57+
Duration randomizedRetryDelay = result.retryDelay;
58+
if (retryDelay == null) {
59+
retryDelay = prevSettings.getRetryDelay();
60+
randomizedRetryDelay = DEADLINE_SLEEP_DURATION;
61+
}
62+
if (retryAttemptListener != null) {
63+
retryAttemptListener.onRetryAttempt(status, metadata);
64+
}
3965
return TimedAttemptSettings.newBuilder()
4066
.setGlobalSettings(prevSettings.getGlobalSettings())
41-
.setRetryDelay(prevSettings.getRetryDelay())
67+
.setRetryDelay(retryDelay)
4268
.setRpcTimeout(prevSettings.getRpcTimeout())
43-
.setRandomizedRetryDelay(DEADLINE_SLEEP_DURATION)
69+
.setRandomizedRetryDelay(randomizedRetryDelay)
4470
.setAttemptCount(prevSettings.getAttemptCount() + 1)
4571
.setFirstAttemptStartTimeNanos(prevSettings.getFirstAttemptStartTimeNanos())
4672
.build();
@@ -53,7 +79,8 @@ public TimedAttemptSettings createNextAttempt(
5379
public boolean shouldRetry(Throwable prevThrowable, ResponseT prevResponse) {
5480
if (prevThrowable != null) {
5581
Status status = Status.fromThrowable(prevThrowable);
56-
if (Errors.isRetryableInternalStatus(status)) {
82+
Metadata metadata = Status.trailersFromThrowable(prevThrowable);
83+
if (Errors.isRetryableStatus(status, metadata).isRetryable) {
5784
return true;
5885
}
5986
}

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/BigQueryStorageClient.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,9 @@ public static final BigQueryStorageClient create(EnhancedBigQueryStorageStub stu
141141
*/
142142
protected BigQueryStorageClient(BigQueryStorageSettings settings) throws IOException {
143143
this.settings = settings;
144-
this.stub = EnhancedBigQueryStorageStub.create(settings.getTypedStubSettings());
144+
this.stub =
145+
EnhancedBigQueryStorageStub.create(
146+
settings.getTypedStubSettings(), settings.getReadRowsRetryAttemptListener());
145147
}
146148

147149
@BetaApi("A restructuring of stub classes is planned, so this may break in the future")

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/BigQueryStorageSettings.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
import com.google.cloud.bigquery.storage.v1beta1.Storage.SplitReadStreamResponse;
3838
import com.google.cloud.bigquery.storage.v1beta1.stub.EnhancedBigQueryStorageStubSettings;
3939
import com.google.protobuf.Empty;
40+
import io.grpc.Metadata;
41+
import io.grpc.Status;
4042
import java.io.IOException;
4143
import java.util.List;
4244

@@ -78,6 +80,26 @@ public ServerStreamingCallSettings<ReadRowsRequest, ReadRowsResponse> readRowsSe
7880
return getTypedStubSettings().readRowsSettings();
7981
}
8082

83+
public static interface RetryAttemptListener {
84+
public void onRetryAttempt(Status prevStatus, Metadata prevMetadata);
85+
}
86+
87+
private RetryAttemptListener readRowsRetryAttemptListener = null;
88+
89+
/**
90+
* If a non null readRowsRetryAttemptListener is provided, client will call onRetryAttempt
91+
* function before a failed ReadRows request is retried. This can be used as negative feedback
92+
* mechanism for future decision to split read streams because some retried failures are due to
93+
* resource exhaustion that increased parallelism only makes it worse.
94+
*/
95+
public void setReadRowsRetryAttemptListener(RetryAttemptListener readRowsRetryAttemptListener) {
96+
this.readRowsRetryAttemptListener = readRowsRetryAttemptListener;
97+
}
98+
99+
public RetryAttemptListener getReadRowsRetryAttemptListener() {
100+
return readRowsRetryAttemptListener;
101+
}
102+
81103
/** Returns the object with the settings used for calls to batchCreateReadSessionStreams. */
82104
public UnaryCallSettings<
83105
BatchCreateReadSessionStreamsRequest, BatchCreateReadSessionStreamsResponse>
@@ -197,6 +219,14 @@ public Builder applyToAllUnaryMethods(
197219
return this;
198220
}
199221

222+
private RetryAttemptListener readRowsRetryAttemptListener = null;
223+
224+
public Builder setReadRowsRetryAttemptListener(
225+
RetryAttemptListener readRowsRetryAttemptListener) {
226+
this.readRowsRetryAttemptListener = readRowsRetryAttemptListener;
227+
return this;
228+
}
229+
200230
/** Returns the builder for the settings used for calls to createReadSession. */
201231
public UnaryCallSettings.Builder<CreateReadSessionRequest, ReadSession>
202232
createReadSessionSettings() {
@@ -229,7 +259,9 @@ public UnaryCallSettings.Builder<FinalizeStreamRequest, Empty> finalizeStreamSet
229259

230260
@Override
231261
public BigQueryStorageSettings build() throws IOException {
232-
return new BigQueryStorageSettings(this);
262+
BigQueryStorageSettings settings = new BigQueryStorageSettings(this);
263+
settings.setReadRowsRetryAttemptListener(readRowsRetryAttemptListener);
264+
return settings;
233265
}
234266
}
235267
}

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/stub/EnhancedBigQueryStorageStub.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import com.google.api.gax.tracing.SpanName;
3232
import com.google.api.gax.tracing.TracedServerStreamingCallable;
3333
import com.google.cloud.bigquery.storage.v1beta1.BigQueryStorageGrpc;
34+
import com.google.cloud.bigquery.storage.v1beta1.BigQueryStorageSettings;
3435
import com.google.cloud.bigquery.storage.v1beta1.Storage.BatchCreateReadSessionStreamsRequest;
3536
import com.google.cloud.bigquery.storage.v1beta1.Storage.BatchCreateReadSessionStreamsResponse;
3637
import com.google.cloud.bigquery.storage.v1beta1.Storage.CreateReadSessionRequest;
@@ -58,10 +59,18 @@ public class EnhancedBigQueryStorageStub implements BackgroundResource {
5859
private static final String TRACING_OUTER_CLIENT_NAME = "BigQueryStorage";
5960
private final GrpcBigQueryStorageStub stub;
6061
private final BigQueryStorageStubSettings stubSettings;
62+
private final BigQueryStorageSettings.RetryAttemptListener readRowsRetryAttemptListener;
6163
private final ClientContext context;
6264

6365
public static EnhancedBigQueryStorageStub create(EnhancedBigQueryStorageStubSettings settings)
6466
throws IOException {
67+
return create(settings, null);
68+
}
69+
70+
public static EnhancedBigQueryStorageStub create(
71+
EnhancedBigQueryStorageStubSettings settings,
72+
BigQueryStorageSettings.RetryAttemptListener readRowsRetryAttemptListener)
73+
throws IOException {
6574
// Configure the base settings.
6675
BigQueryStorageStubSettings.Builder baseSettingsBuilder =
6776
BigQueryStorageStubSettings.newBuilder()
@@ -107,16 +116,19 @@ public static EnhancedBigQueryStorageStub create(EnhancedBigQueryStorageStubSett
107116
BigQueryStorageStubSettings baseSettings = baseSettingsBuilder.build();
108117
ClientContext clientContext = ClientContext.create(baseSettings);
109118
GrpcBigQueryStorageStub stub = new GrpcBigQueryStorageStub(baseSettings, clientContext);
110-
return new EnhancedBigQueryStorageStub(stub, baseSettings, clientContext);
119+
return new EnhancedBigQueryStorageStub(
120+
stub, baseSettings, readRowsRetryAttemptListener, clientContext);
111121
}
112122

113123
@InternalApi("Visible for testing")
114124
EnhancedBigQueryStorageStub(
115125
GrpcBigQueryStorageStub stub,
116126
BigQueryStorageStubSettings stubSettings,
127+
BigQueryStorageSettings.RetryAttemptListener readRowsRetryAttemptListener,
117128
ClientContext context) {
118129
this.stub = stub;
119130
this.stubSettings = stubSettings;
131+
this.readRowsRetryAttemptListener = readRowsRetryAttemptListener;
120132
this.context = context;
121133
}
122134

@@ -145,7 +157,7 @@ public Map<String, String> extract(ReadRowsRequest request) {
145157

146158
StreamingRetryAlgorithm<Void> retryAlgorithm =
147159
new StreamingRetryAlgorithm<>(
148-
new ApiResultRetryAlgorithm<Void>(),
160+
new ApiResultRetryAlgorithm<Void>(readRowsRetryAttemptListener),
149161
new ExponentialRetryAlgorithm(callSettings.getRetrySettings(), context.getClock()));
150162

151163
ScheduledRetryingExecutor<Void> retryingExecutor =

0 commit comments

Comments
 (0)