Skip to content

Commit 54874be

Browse files
authored
feat: add a indicator of how much time a request is waiting for inflight limit (#1514)
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly: - [ ] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/java-bigquerystorage/issues/new/choose) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea - [ ] Ensure the tests and linter pass - [ ] Code coverage does not decrease (if any source code was changed) - [ ] Appropriate docs were updated (if necessary) Fixes #<issue_number_goes_here> ☕️
1 parent 5eaf38e commit 54874be

File tree

3 files changed

+32
-3
lines changed

3 files changed

+32
-3
lines changed

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,17 @@ public Descriptor getDescriptor() {
167167
return this.descriptor;
168168
}
169169

170+
/**
171+
* Returns the wait of a request in Client side before sending to the Server. Request could wait
172+
* in Client because it reached the client side inflight request limit (adjustable when
173+
* constructing the Writer). The value is the wait time for the last sent request. A constant high
174+
* wait value indicates a need for more throughput, you can create a new Stream for to increase
175+
* the throughput in exclusive stream case, or create a new Writer in the default stream case.
176+
*/
177+
public long getInflightWaitSeconds() {
178+
return streamWriter.getInflightWaitSeconds();
179+
}
180+
170181
/** Sets all StreamWriter settings. */
171182
private void setStreamWriterSettings(
172183
@Nullable TransportChannelProvider channelProvider,

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.util.Deque;
3535
import java.util.LinkedList;
3636
import java.util.concurrent.TimeUnit;
37+
import java.util.concurrent.atomic.AtomicLong;
3738
import java.util.concurrent.locks.Condition;
3839
import java.util.concurrent.locks.Lock;
3940
import java.util.concurrent.locks.ReentrantLock;
@@ -159,6 +160,11 @@ public class StreamWriter implements AutoCloseable {
159160
*/
160161
private Thread appendThread;
161162

163+
/*
164+
* The inflight wait time for the previous sent request.
165+
*/
166+
private final AtomicLong inflightWaitSec = new AtomicLong(0);
167+
162168
/** The maximum size of one request. Defined by the API. */
163169
public static long getApiMaxRequestBytes() {
164170
return 10L * 1000L * 1000L; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte)
@@ -316,6 +322,7 @@ private ApiFuture<AppendRowsResponse> appendInternal(AppendRowsRequest message)
316322

317323
@GuardedBy("lock")
318324
private void maybeWaitForInflightQuota() {
325+
long start_time = System.currentTimeMillis();
319326
while (this.inflightRequests >= this.maxInflightRequests
320327
|| this.inflightBytes >= this.maxInflightBytes) {
321328
try {
@@ -332,6 +339,19 @@ private void maybeWaitForInflightQuota() {
332339
.withDescription("Interrupted while waiting for quota."));
333340
}
334341
}
342+
inflightWaitSec.set((System.currentTimeMillis() - start_time) / 1000);
343+
}
344+
345+
/**
346+
* Returns the wait of a request in Client side before sending to the Server. Request could wait
347+
* in Client because it reached the client side inflight request limit (adjustable when
348+
* constructing the StreamWriter). The value is the wait time for the last sent request. A
349+
* constant high wait value indicates a need for more throughput, you can create a new Stream for
350+
* to increase the throughput in exclusive stream case, or create a new Writer in the default
351+
* stream case.
352+
*/
353+
public long getInflightWaitSeconds() {
354+
return inflightWaitSec.longValue();
335355
}
336356

337357
/** Close the stream writer. Shut down all resources. */

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -489,10 +489,8 @@ public void testOneMaxInflightRequests() throws Exception {
489489
testBigQueryWrite.setResponseSleep(Duration.ofSeconds(1));
490490
testBigQueryWrite.addResponse(createAppendResponse(0));
491491

492-
long appendStartTimeMs = System.currentTimeMillis();
493492
ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
494-
long appendElapsedMs = System.currentTimeMillis() - appendStartTimeMs;
495-
assertTrue(appendElapsedMs >= 1000);
493+
assertTrue(writer.getInflightWaitSeconds() >= 1);
496494
assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue());
497495
writer.close();
498496
}

0 commit comments

Comments
 (0)