3838import java .util .concurrent .locks .Condition ;
3939import java .util .concurrent .locks .Lock ;
4040import java .util .concurrent .locks .ReentrantLock ;
41+ import java .util .logging .Level ;
4142import java .util .logging .Logger ;
4243import javax .annotation .concurrent .GuardedBy ;
4344
@@ -105,6 +106,12 @@ public class StreamWriter implements AutoCloseable {
105106 @ GuardedBy ("lock" )
106107 private boolean streamConnectionIsConnected = false ;
107108
109+ /*
110+ * A boolean to track if we cleaned up inflight queue.
111+ */
112+ @ GuardedBy ("lock" )
113+ private boolean inflightCleanuped = false ;
114+
108115 /*
109116 * Retry threshold, limits how often the connection is retried before processing halts.
110117 */
@@ -376,7 +383,8 @@ public void close() {
376383 if (this .ownsBigQueryWriteClient ) {
377384 this .client .close ();
378385 try {
379- this .client .awaitTermination (1 , TimeUnit .MINUTES );
386+ // Backend request has a 2 minute timeout, so wait a little longer than that.
387+ this .client .awaitTermination (150 , TimeUnit .SECONDS );
380388 } catch (InterruptedException ignored ) {
381389 }
382390 }
@@ -465,7 +473,7 @@ private void appendLoop() {
465473 // We can close the stream connection and handle the remaining inflight requests.
466474 if (streamConnection != null ) {
467475 this .streamConnection .close ();
468- waitForDoneCallback (1 , TimeUnit .MINUTES );
476+ waitForDoneCallback (2 , TimeUnit .MINUTES );
469477 }
470478
471479 // At this point, there cannot be more callback. It is safe to clean up all inflight requests.
@@ -550,6 +558,7 @@ private void cleanupInflightRequests() {
550558 while (!this .inflightRequestQueue .isEmpty ()) {
551559 localQueue .addLast (pollInflightRequestQueue ());
552560 }
561+ this .inflightCleanuped = true ;
553562 } finally {
554563 this .lock .unlock ();
555564 }
@@ -572,7 +581,21 @@ private void requestCallback(AppendRowsResponse response) {
572581 if (conectionRetryCountWithoutCallback != 0 ) {
573582 conectionRetryCountWithoutCallback = 0 ;
574583 }
575- requestWrapper = pollInflightRequestQueue ();
584+ if (!this .inflightRequestQueue .isEmpty ()) {
585+ requestWrapper = pollInflightRequestQueue ();
586+ } else if (inflightCleanuped ) {
587+ // It is possible when requestCallback is called, the inflight queue is already drained
588+ // because we timed out waiting for done.
589+ return ;
590+ } else {
591+ // This is something not expected, we shouldn't have an empty inflight queue otherwise.
592+ log .log (Level .WARNING , "Unexpected: request callback called on an empty inflight queue." );
593+ connectionFinalStatus =
594+ new StatusRuntimeException (
595+ Status .fromCode (Code .FAILED_PRECONDITION )
596+ .withDescription ("Request callback called on an empty inflight queue." ));
597+ return ;
598+ }
576599 } finally {
577600 this .lock .unlock ();
578601 }
0 commit comments