4040 *
4141 * <p>TODO: Attach schema.
4242 *
43- * <p>TODO: Add inflight control.
44- *
4543 * <p>TODO: Attach traceId.
4644 *
4745 * <p>TODO: Support batching.
@@ -53,12 +51,35 @@ public class StreamWriterV2 implements AutoCloseable {
5351
5452 private Lock lock ;
5553 private Condition hasMessageInWaitingQueue ;
54+ private Condition inflightReduced ;
5655
5756 /*
5857 * The identifier of stream to write to.
5958 */
6059 private final String streamName ;
6160
61+ /*
62+ * Max allowed inflight requests in the stream. Method append is blocked at this.
63+ */
64+ private final long maxInflightRequests ;
65+
66+ /*
67+ * Max allowed inflight bytes in the stream. Method append is blocked at this.
68+ */
69+ private final long maxInflightBytes ;
70+
71+ /*
72+ * Tracks current inflight requests in the stream.
73+ */
74+ @ GuardedBy ("lock" )
75+ private long inflightRequests = 0 ;
76+
77+ /*
78+ * Tracks current inflight bytes in the stream.
79+ */
80+ @ GuardedBy ("lock" )
81+ private long inflightBytes = 0 ;
82+
6283 /*
6384 * Indicates whether user has called Close() or not.
6485 */
@@ -101,7 +122,10 @@ public static long getApiMaxRequestBytes() {
101122 private StreamWriterV2 (Builder builder ) {
102123 this .lock = new ReentrantLock ();
103124 this .hasMessageInWaitingQueue = lock .newCondition ();
125+ this .inflightReduced = lock .newCondition ();
104126 this .streamName = builder .streamName ;
127+ this .maxInflightRequests = builder .maxInflightRequest ;
128+ this .maxInflightBytes = builder .maxInflightBytes ;
105129 this .waitingRequestQueue = new LinkedList <AppendRequestAndResponse >();
106130 this .inflightRequestQueue = new LinkedList <AppendRequestAndResponse >();
107131 this .streamConnection =
@@ -186,14 +210,38 @@ public ApiFuture<AppendRowsResponse> append(AppendRowsRequest message) {
186210 "Stream is closed due to " + connectionFinalStatus .toString ())));
187211 return requestWrapper .appendResult ;
188212 }
213+
214+ ++this .inflightRequests ;
215+ this .inflightBytes += requestWrapper .messageSize ;
189216 waitingRequestQueue .addLast (requestWrapper );
190217 hasMessageInWaitingQueue .signal ();
218+ maybeWaitForInflightQuota ();
191219 return requestWrapper .appendResult ;
192220 } finally {
193221 this .lock .unlock ();
194222 }
195223 }
196224
225+ @ GuardedBy ("lock" )
226+ private void maybeWaitForInflightQuota () {
227+ while (this .inflightRequests >= this .maxInflightRequests
228+ || this .inflightBytes >= this .maxInflightBytes ) {
229+ try {
230+ inflightReduced .await (100 , TimeUnit .MILLISECONDS );
231+ } catch (InterruptedException e ) {
232+ log .warning (
233+ "Interrupted while waiting for inflight quota. Stream: "
234+ + streamName
235+ + " Error: "
236+ + e .toString ());
237+ throw new StatusRuntimeException (
238+ Status .fromCode (Code .CANCELLED )
239+ .withCause (e )
240+ .withDescription ("Interrupted while waiting for quota." ));
241+ }
242+ }
243+ }
244+
197245 /** Close the stream writer. Shut down all resources. */
198246 @ Override
199247 public void close () {
@@ -303,7 +351,7 @@ private void cleanupInflightRequests() {
303351 try {
304352 finalStatus = this .connectionFinalStatus ;
305353 while (!this .inflightRequestQueue .isEmpty ()) {
306- localQueue .addLast (this . inflightRequestQueue . pollFirst ());
354+ localQueue .addLast (pollInflightRequestQueue ());
307355 }
308356 } finally {
309357 this .lock .unlock ();
@@ -322,7 +370,7 @@ private void requestCallback(AppendRowsResponse response) {
322370 AppendRequestAndResponse requestWrapper ;
323371 this .lock .lock ();
324372 try {
325- requestWrapper = this . inflightRequestQueue . pollFirst ();
373+ requestWrapper = pollInflightRequestQueue ();
326374 } finally {
327375 this .lock .unlock ();
328376 }
@@ -343,6 +391,15 @@ private void doneCallback(Throwable finalStatus) {
343391 }
344392 }
345393
394+ @ GuardedBy ("lock" )
395+ private AppendRequestAndResponse pollInflightRequestQueue () {
396+ AppendRequestAndResponse requestWrapper = this .inflightRequestQueue .pollFirst ();
397+ --this .inflightRequests ;
398+ this .inflightBytes -= requestWrapper .messageSize ;
399+ this .inflightReduced .signal ();
400+ return requestWrapper ;
401+ }
402+
346403 /** Constructs a new {@link StreamWriterV2.Builder} using the given stream and client. */
347404 public static StreamWriterV2 .Builder newBuilder (String streamName , BigQueryWriteClient client ) {
348405 return new StreamWriterV2 .Builder (streamName , client );
@@ -351,15 +408,33 @@ public static StreamWriterV2.Builder newBuilder(String streamName, BigQueryWrite
351408 /** A builder of {@link StreamWriterV2}s. */
352409 public static final class Builder {
353410
411+ private static final long DEFAULT_MAX_INFLIGHT_REQUESTS = 1000L ;
412+
413+ private static final long DEFAULT_MAX_INFLIGHT_BYTES = 100 * 1024 * 1024 ; // 100Mb.
414+
354415 private String streamName ;
355416
356417 private BigQueryWriteClient client ;
357418
419+ private long maxInflightRequest = DEFAULT_MAX_INFLIGHT_REQUESTS ;
420+
421+ private long maxInflightBytes = DEFAULT_MAX_INFLIGHT_BYTES ;
422+
358423 private Builder (String streamName , BigQueryWriteClient client ) {
359424 this .streamName = Preconditions .checkNotNull (streamName );
360425 this .client = Preconditions .checkNotNull (client );
361426 }
362427
428+ public Builder setMaxInflightRequests (long value ) {
429+ this .maxInflightRequest = value ;
430+ return this ;
431+ }
432+
433+ public Builder setMaxInflightBytes (long value ) {
434+ this .maxInflightBytes = value ;
435+ return this ;
436+ }
437+
363438 /** Builds the {@code StreamWriterV2}. */
364439 public StreamWriterV2 build () {
365440 return new StreamWriterV2 (this );
0 commit comments