Skip to content

Commit caf1e76

Browse files
fix: fix deadlock issue in ConnectionWorkerPool (#1938)
* feat: Split writer into connection worker and wrapper, this is a prerequisite for multiplexing client * feat: add connection worker pool skeleton, used for multiplexing client * feat: add Load api for connection worker for multiplexing client * feat: add multiplexing support to connection worker. We will treat every new stream name as a switch of destinationt * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: port the multiplexing client core algorithm and basic tests also fixed a tiny bug inside fake bigquery write impl for getting thre response from offset * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: wire multiplexing connection pool to stream writer * feat: some fixes for multiplexing client * feat: fix some todos, and reject the mixed behavior of passed in client or not * feat: fix the bug that we may peek into the write_stream field but it's possible the proto schema does not contain this field * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: fix the bug that we may peek into the write_stream field but it's possible the proto schema does not contain this field * feat: add getInflightWaitSeconds implementation * feat: Add schema comparision in connection loop to ensure schema update for the same stream name can be notified * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: add schema update support to multiplexing * fix: fix windows build bug: windows Instant resolution is different with linux * fix: fix another failing tests for windows build * fix: fix another test failure for Windows build * feat: Change new thread for each retry to be a thread pool to avoid create/tear down too much threads if lots of retries happens * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * fix: add back the background executor provider that's accidentally removed * feat: throw error when use connection pool for explicit stream * fix: Add precision truncation to the passed in value from JSON float and double type. * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * modify the bom version * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * fix deadlockissue in ConnectionWorkerPool * fix: fix deadlock issue during close + append for multiplexing * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent 23ec7fa commit caf1e76

File tree

3 files changed

+61
-17
lines changed

3 files changed

+61
-17
lines changed

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -642,6 +642,7 @@ private void requestCallback(AppendRowsResponse response) {
642642
if (response.hasError()) {
643643
Exceptions.StorageException storageException =
644644
Exceptions.toStorageException(response.getError(), null);
645+
log.fine(String.format("Got error message: %s", response.toString()));
645646
if (storageException != null) {
646647
requestWrapper.appendResult.setException(storageException);
647648
} else if (response.getRowErrorsCount() > 0) {

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.io.IOException;
2929
import java.util.Collections;
3030
import java.util.Comparator;
31+
import java.util.HashMap;
3132
import java.util.HashSet;
3233
import java.util.List;
3334
import java.util.Map;
@@ -48,7 +49,7 @@ public class ConnectionWorkerPool {
4849

4950
private static final Logger log = Logger.getLogger(ConnectionWorkerPool.class.getName());
5051
/*
51-
* Max allowed inflight requests in the stream. Method append is blocked at this.
52+
* Max allowed inflight requests in the stream.getInflightWaitSeconds Method append is blocked at this.
5253
*/
5354
private final long maxInflightRequests;
5455

@@ -68,12 +69,10 @@ public class ConnectionWorkerPool {
6869
private final FlowController.LimitExceededBehavior limitExceededBehavior;
6970

7071
/** Map from write stream to corresponding connection. */
71-
private final Map<StreamWriter, ConnectionWorker> streamWriterToConnection =
72-
new ConcurrentHashMap<>();
72+
private final Map<StreamWriter, ConnectionWorker> streamWriterToConnection = new HashMap<>();
7373

7474
/** Map from a connection to a set of write stream that have sent requests onto it. */
75-
private final Map<ConnectionWorker, Set<StreamWriter>> connectionToWriteStream =
76-
new ConcurrentHashMap<>();
75+
private final Map<ConnectionWorker, Set<StreamWriter>> connectionToWriteStream = new HashMap<>();
7776

7877
/** Collection of all the created connections. */
7978
private final Set<ConnectionWorker> connectionWorkerPool =
@@ -227,14 +226,13 @@ public ApiFuture<AppendRowsResponse> append(StreamWriter streamWriter, ProtoRows
227226
public ApiFuture<AppendRowsResponse> append(
228227
StreamWriter streamWriter, ProtoRows rows, long offset) {
229228
// We are in multiplexing mode after entering the following logic.
230-
ConnectionWorker connectionWorker =
231-
streamWriterToConnection.compute(
232-
streamWriter,
233-
(key, existingStream) -> {
234-
// Though compute on concurrent map is atomic, we still do explicit locking as we
235-
// may have concurrent close(...) triggered.
236-
lock.lock();
237-
try {
229+
ConnectionWorker connectionWorker;
230+
lock.lock();
231+
try {
232+
connectionWorker =
233+
streamWriterToConnection.compute(
234+
streamWriter,
235+
(key, existingStream) -> {
238236
// Stick to the existing stream if it's not overwhelmed.
239237
if (existingStream != null && !existingStream.getLoad().isOverwhelmed()) {
240238
return existingStream;
@@ -252,10 +250,10 @@ public ApiFuture<AppendRowsResponse> append(
252250
createdOrExistingConnection, (ConnectionWorker k) -> new HashSet<>());
253251
connectionToWriteStream.get(createdOrExistingConnection).add(streamWriter);
254252
return createdOrExistingConnection;
255-
} finally {
256-
lock.unlock();
257-
}
258-
});
253+
});
254+
} finally {
255+
lock.unlock();
256+
}
259257
Stopwatch stopwatch = Stopwatch.createStarted();
260258
ApiFuture<AppendRowsResponse> responseFuture =
261259
connectionWorker.append(

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@
2525
import com.google.api.gax.grpc.testing.MockServiceHelper;
2626
import com.google.cloud.bigquery.storage.test.Test.FooType;
2727
import com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool.Settings;
28+
import com.google.common.util.concurrent.ListeningExecutorService;
29+
import com.google.common.util.concurrent.MoreExecutors;
30+
import com.google.common.util.concurrent.ThreadFactoryBuilder;
2831
import com.google.common.util.concurrent.Uninterruptibles;
2932
import com.google.protobuf.DescriptorProtos;
3033
import com.google.protobuf.Int64Value;
@@ -35,6 +38,8 @@
3538
import java.util.List;
3639
import java.util.UUID;
3740
import java.util.concurrent.ExecutionException;
41+
import java.util.concurrent.Executors;
42+
import java.util.concurrent.Future;
3843
import java.util.concurrent.TimeUnit;
3944
import org.junit.Before;
4045
import org.junit.Test;
@@ -314,6 +319,46 @@ public void testMultiStreamAppend_appendWhileClosing() throws Exception {
314319
assertThat(connectionWorkerPool.getTotalConnectionCount()).isEqualTo(0);
315320
}
316321

322+
@Test
323+
public void testCloseWhileAppending_noDeadlockHappen() throws Exception {
324+
ConnectionWorkerPool.setOptions(
325+
Settings.builder().setMaxConnectionsPerRegion(10).setMinConnectionsPerRegion(5).build());
326+
ConnectionWorkerPool connectionWorkerPool =
327+
createConnectionWorkerPool(
328+
/*maxRequests=*/ 1500, /*maxBytes=*/ 100000, java.time.Duration.ofSeconds(5));
329+
330+
// Sets the sleep time to simulate requests stuck in connection.
331+
testBigQueryWrite.setResponseSleep(Duration.ofMillis(20L));
332+
StreamWriter writeStream1 = getTestStreamWriter(TEST_STREAM_1);
333+
334+
ListeningExecutorService threadPool =
335+
MoreExecutors.listeningDecorator(
336+
Executors.newCachedThreadPool(
337+
new ThreadFactoryBuilder()
338+
.setDaemon(true)
339+
.setNameFormat("AsyncStreamReadThread")
340+
.build()));
341+
342+
long appendCount = 10;
343+
for (long i = 0; i < appendCount; i++) {
344+
testBigQueryWrite.addResponse(createAppendResponse(i));
345+
}
346+
List<Future<?>> futures = new ArrayList<>();
347+
348+
for (int i = 0; i < 500; i++) {
349+
futures.add(
350+
threadPool.submit(
351+
() -> {
352+
sendFooStringTestMessage(
353+
writeStream1, connectionWorkerPool, new String[] {String.valueOf(0)}, 0);
354+
}));
355+
}
356+
connectionWorkerPool.close(writeStream1);
357+
for (int i = 0; i < 500; i++) {
358+
futures.get(i).get();
359+
}
360+
}
361+
317362
@Test
318363
public void testToTableName() {
319364
assertThat(ConnectionWorkerPool.toTableName("projects/p/datasets/d/tables/t/streams/s"))

0 commit comments

Comments
 (0)