Skip to content

Commit b1b62b1

Browse files
fix: reset send timestamp each time a request is sent (#2499)
Each time a request is removed from the inflight queue its send timestamp should be removed by setting it to null as we are no longer waiting for a response to arrive for it.
1 parent 4600976 commit b1b62b1

File tree

2 files changed

+41
-9
lines changed

2 files changed

+41
-9
lines changed

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -699,7 +699,10 @@ private void appendLoop() {
699699
hasMessageInWaitingQueue.await(100, TimeUnit.MILLISECONDS);
700700
// Check whether we should error out the current append loop.
701701
if (inflightRequestQueue.size() > 0) {
702-
throwIfWaitCallbackTooLong(inflightRequestQueue.getFirst().requestCreationTimeStamp);
702+
Instant sendInstant = inflightRequestQueue.getFirst().requestSendTimeStamp;
703+
if (sendInstant != null) {
704+
throwIfWaitCallbackTooLong(sendInstant);
705+
}
703706
}
704707

705708
// Copy the streamConnectionIsConnected guarded by lock to a local variable.
@@ -711,7 +714,9 @@ private void appendLoop() {
711714
// from inflightRequestQueue and prepent them onto the waitinRequestQueue. They need to be
712715
// prepended as they need to be sent before new requests.
713716
while (!inflightRequestQueue.isEmpty()) {
714-
waitingRequestQueue.addFirst(inflightRequestQueue.pollLast());
717+
AppendRequestAndResponse requestWrapper = inflightRequestQueue.pollLast();
718+
requestWrapper.requestSendTimeStamp = null;
719+
waitingRequestQueue.addFirst(requestWrapper);
715720
}
716721

717722
// If any of the inflight messages were meant to be ignored during requestCallback, they
@@ -721,7 +726,6 @@ private void appendLoop() {
721726
while (!this.waitingRequestQueue.isEmpty()) {
722727
AppendRequestAndResponse requestWrapper = this.waitingRequestQueue.pollFirst();
723728
waitForBackoffIfNecessary(requestWrapper);
724-
requestWrapper.trySetRequestInsertQueueTime();
725729
this.inflightRequestQueue.add(requestWrapper);
726730
localQueue.addLast(requestWrapper);
727731
}
@@ -760,6 +764,7 @@ private void appendLoop() {
760764
firstRequestForTableOrSchemaSwitch = true;
761765
}
762766
while (!localQueue.isEmpty()) {
767+
localQueue.peekFirst().setRequestSendQueueTime();
763768
AppendRowsRequest originalRequest = localQueue.pollFirst().message;
764769
AppendRowsRequest.Builder originalRequestBuilder = originalRequest.toBuilder();
765770
// Always respect the first writer schema seen by the loop.
@@ -1217,6 +1222,7 @@ private void doneCallback(Throwable finalStatus) {
12171222
private AppendRequestAndResponse pollInflightRequestQueue(boolean pollLast) {
12181223
AppendRequestAndResponse requestWrapper =
12191224
pollLast ? inflightRequestQueue.pollLast() : inflightRequestQueue.poll();
1225+
requestWrapper.requestSendTimeStamp = null;
12201226
--this.inflightRequests;
12211227
this.inflightBytes -= requestWrapper.messageSize;
12221228
this.inflightReduced.signal();
@@ -1256,7 +1262,9 @@ static final class AppendRequestAndResponse {
12561262

12571263
TimedAttemptSettings attemptSettings;
12581264

1259-
Instant requestCreationTimeStamp;
1265+
// Time at which request was last sent over the network.
1266+
// If a response is no longer expected this is set back to null.
1267+
Instant requestSendTimeStamp;
12601268

12611269
AppendRequestAndResponse(
12621270
AppendRowsRequest message, StreamWriter streamWriter, RetrySettings retrySettings) {
@@ -1276,11 +1284,8 @@ static final class AppendRequestAndResponse {
12761284
}
12771285
}
12781286

1279-
void trySetRequestInsertQueueTime() {
1280-
// Only set the first time the caller tries to set the timestamp.
1281-
if (requestCreationTimeStamp == null) {
1282-
requestCreationTimeStamp = Instant.now();
1283-
}
1287+
void setRequestSendQueueTime() {
1288+
requestSendTimeStamp = Instant.now();
12841289
}
12851290
}
12861291

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1809,6 +1809,33 @@ public void testAppendSuccessAndInternalQuotaErrorRetrySuccess() throws Exceptio
18091809
writer.close();
18101810
}
18111811

1812+
@Test
1813+
public void testInternalQuotaError_MaxWaitTimeExceed_RetrySuccess() throws Exception {
1814+
// In order for the test to succeed, the given request must complete successfully even after all
1815+
// the retries. The fake server is configured to fail 3 times with a quota error. This means the
1816+
// client will perform retry with exponential backoff. The fake server injects 1 second of delay
1817+
// for each response. In addition, the exponential backoff injects a couple of seconds of delay.
1818+
// This yields an overall delay of about 5 seconds before the request succeeds. If the request
1819+
// send timestamp was being set only once, this would eventually exceed the 4 second timeout
1820+
// limit, and throw an exception. With the current behavior, the request send timestamp is reset
1821+
// each time a retry is performed, so we never exceed the 4 second timeout limit.
1822+
StreamWriter.setMaxRequestCallbackWaitTime(java.time.Duration.ofSeconds(4));
1823+
testBigQueryWrite.setResponseSleep(Duration.ofSeconds(1));
1824+
StreamWriter writer = getTestStreamWriterRetryEnabled();
1825+
testBigQueryWrite.addStatusException(
1826+
com.google.rpc.Status.newBuilder().setCode(Code.RESOURCE_EXHAUSTED.ordinal()).build());
1827+
testBigQueryWrite.addStatusException(
1828+
com.google.rpc.Status.newBuilder().setCode(Code.RESOURCE_EXHAUSTED.ordinal()).build());
1829+
testBigQueryWrite.addStatusException(
1830+
com.google.rpc.Status.newBuilder().setCode(Code.RESOURCE_EXHAUSTED.ordinal()).build());
1831+
testBigQueryWrite.addResponse(createAppendResponse(0));
1832+
1833+
ApiFuture<AppendRowsResponse> appendFuture1 =
1834+
writer.append(createProtoRows(new String[] {"A"}));
1835+
assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue());
1836+
writer.close();
1837+
}
1838+
18121839
@Test
18131840
public void testAppendSuccessAndInternalErrorRetrySuccessExclusive() throws Exception {
18141841
// Ensure we return an error from the fake server when a retry is in progress

0 commit comments

Comments
 (0)