Skip to content

Commit 9a29abe

Browse files
committed
.
1 parent 45295c0 commit 9a29abe

File tree

3 files changed

+20
-5
lines changed

3 files changed

+20
-5
lines changed

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/FakeBigQueryWrite.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ public FakeBigQueryWrite() {
3434
serviceImpl = new FakeBigQueryWriteImpl();
3535
}
3636

37+
public void waitForResponseScheduled() throws InterruptedException {
38+
serviceImpl.waitResponseScheduled();
39+
}
40+
3741
@Override
3842
public List<AbstractMessage> getRequests() {
3943
return new LinkedList<AbstractMessage>(serviceImpl.getCapturedRequests());

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/FakeBigQueryWriteImpl.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.List;
2222
import java.util.concurrent.LinkedBlockingQueue;
2323
import java.util.concurrent.ScheduledExecutorService;
24+
import java.util.concurrent.Semaphore;
2425
import java.util.concurrent.TimeUnit;
2526
import java.util.concurrent.atomic.AtomicInteger;
2627
import java.util.logging.Logger;
@@ -44,6 +45,7 @@ class FakeBigQueryWriteImpl extends BigQueryWriteGrpc.BigQueryWriteImplBase {
4445
private boolean autoPublishResponse;
4546
private ScheduledExecutorService executor = null;
4647
private Duration responseDelay = Duration.ZERO;
48+
Semaphore responseCount = new Semaphore(0, true);
4749

4850
/** Class used to save the state of a possible response. */
4951
private static class Response {
@@ -111,6 +113,11 @@ public void flushRows(
111113
}
112114
}
113115

116+
// Wait for n responses to be scheduled to sent.
117+
public void waitResponseScheduled() throws InterruptedException {
118+
responseCount.acquire();
119+
}
120+
114121
@Override
115122
public StreamObserver<AppendRowsRequest> appendRows(
116123
final StreamObserver<AppendRowsResponse> responseObserver) {
@@ -136,6 +143,7 @@ public void run() {
136143
responseDelay.toMillis(),
137144
TimeUnit.MILLISECONDS);
138145
}
146+
responseCount.release();
139147
}
140148

141149
@Override

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -452,11 +452,11 @@ public void run() {
452452
assertEquals(true, t.isAlive());
453453
assertEquals(false, appendFuture1.isDone());
454454
// Wait is necessary for response to be scheduled before timer is advanced.
455-
Thread.sleep(5000L);
455+
testBigQueryWrite.waitForResponseScheduled();
456456
fakeExecutor.advanceTime(Duration.ofSeconds(10));
457457
// The first requests gets back while the second one is blocked.
458458
assertEquals(2L, appendFuture1.get().getAppendResult().getOffset().getValue());
459-
Thread.sleep(5000L);
459+
testBigQueryWrite.waitForResponseScheduled();
460460
// Wait is necessary for response to be scheduled before timer is advanced.
461461
fakeExecutor.advanceTime(Duration.ofSeconds(10));
462462
t.join();
@@ -496,7 +496,7 @@ public void testFlowControlBehaviorException() throws Exception {
496496
ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
497497
ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer, new String[] {"B"});
498498
// Wait is necessary for response to be scheduled before timer is advanced.
499-
Thread.sleep(5000L);
499+
testBigQueryWrite.waitForResponseScheduled();
500500
fakeExecutor.advanceTime(Duration.ofSeconds(10));
501501
try {
502502
appendFuture2.get();
@@ -944,8 +944,11 @@ public void testShutdownWithConnectionError() throws Exception {
944944

945945
ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
946946
ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer, new String[] {"B"});
947-
ApiFuture<AppendRowsResponse> appendFuture3 = sendTestMessage(writer, new String[] {"B"});
948-
Thread.sleep(5000L);
947+
ApiFuture<AppendRowsResponse> appendFuture3 = sendTestMessage(writer, new String[] {"C"});
948+
// Wait for all 3 response arrives.
949+
testBigQueryWrite.waitForResponseScheduled();
950+
testBigQueryWrite.waitForResponseScheduled();
951+
testBigQueryWrite.waitForResponseScheduled();
949952
// Move the needle for responses to be sent.
950953
fakeExecutor.advanceTime(Duration.ofSeconds(20));
951954
// Shutdown writer immediately and there will be some error happened when flushing the queue.

0 commit comments

Comments
 (0)