Skip to content

Commit 7c01f45

Browse files
authored
feat: Create Stream writer v2 - starting with core logics (#867)
* Add a new StreamWriterV2. Compared to existing StreamWriter, its locking mechanism is much simpler. * Stop using Java8 features as we still need to support Java7 * Do not hold lock while sending requests, and some minor refactoring.
1 parent 9c103df commit 7c01f45

File tree

5 files changed

+779
-0
lines changed

5 files changed

+779
-0
lines changed
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Copyright 2020 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.bigquery.storage.v1beta2;
17+
18+
import com.google.api.gax.rpc.BidiStreamingCallable;
19+
import com.google.api.gax.rpc.ClientStream;
20+
import com.google.api.gax.rpc.ResponseObserver;
21+
import com.google.api.gax.rpc.StreamController;
22+
import io.grpc.Status;
23+
import io.grpc.Status.Code;
24+
import io.grpc.StatusRuntimeException;
25+
26+
/**
27+
* StreamConnection is responsible for writing requests to a GRPC bidirecional connection.
28+
*
29+
* <p>StreamWriter creates a connection. Two callback functions are necessary: request_callback and
30+
* done_callback. Request callback is used for every request, and done callback is used to notify
31+
* the user that the connection is closed and no more callbacks will be received from this
32+
* connection.
33+
*
34+
* <p>The stream writer will accept all the requests without flow control, and makes the callbacks
35+
* in receiving order.
36+
*
37+
* <p>It's user's responsibility to do the flow control and maintain the lifetime of the requests.
38+
*/
39+
public class StreamConnection {
40+
private BidiStreamingCallable<AppendRowsRequest, AppendRowsResponse> bidiStreamingCallable;
41+
private ClientStream<AppendRowsRequest> clientStream;
42+
43+
private RequestCallback requestCallback;
44+
private DoneCallback doneCallback;
45+
46+
public StreamConnection(
47+
BigQueryWriteClient client, RequestCallback requestCallback, DoneCallback doneCallback) {
48+
this.requestCallback = requestCallback;
49+
this.doneCallback = doneCallback;
50+
51+
bidiStreamingCallable = client.appendRowsCallable();
52+
clientStream =
53+
bidiStreamingCallable.splitCall(
54+
new ResponseObserver<AppendRowsResponse>() {
55+
56+
@Override
57+
public void onStart(StreamController controller) {
58+
// no-op
59+
}
60+
61+
@Override
62+
public void onResponse(AppendRowsResponse response) {
63+
StreamConnection.this.requestCallback.run(response);
64+
}
65+
66+
@Override
67+
public void onError(Throwable t) {
68+
StreamConnection.this.doneCallback.run(t);
69+
}
70+
71+
@Override
72+
public void onComplete() {
73+
StreamConnection.this.doneCallback.run(
74+
new StatusRuntimeException(
75+
Status.fromCode(Code.CANCELLED)
76+
.withDescription("Stream is closed by user.")));
77+
}
78+
});
79+
}
80+
81+
/**
82+
* Sends a request to the bi-directional stream connection.
83+
*
84+
* @param request request to send.
85+
*/
86+
public void send(AppendRowsRequest request) {
87+
clientStream.send(request);
88+
}
89+
90+
/** Close the bi-directional stream connection. */
91+
public void close() {
92+
clientStream.closeSend();
93+
}
94+
95+
/** Invoked when a response is received from the server. */
96+
public static interface RequestCallback {
97+
public void run(AppendRowsResponse response);
98+
}
99+
100+
/** Invoked when server closes the connection. */
101+
public static interface DoneCallback {
102+
public void run(Throwable finalStatus);
103+
}
104+
}

0 commit comments

Comments
 (0)