Skip to content

Commit 586777f

Browse files
authored
fix: migrate json writer to use StreamWriterV2 (#1058)
* . * . * fix: move JsonWriter to use StreamWriterV2. * . * . * . * . * . * . * . * .
1 parent 157a897 commit 586777f

File tree

7 files changed

+48
-101
lines changed

7 files changed

+48
-101
lines changed

google-cloud-bigquerystorage/clirr-ignored-differences.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,9 @@
2626
<differenceType>8001</differenceType>
2727
<className>com/google/cloud/bigquery/storage/v1alpha2/stub/GrpcBigQueryWriteStub</className>
2828
</difference>
29+
<difference>
30+
<differenceType>7002</differenceType>
31+
<className>com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter$Builder</className>
32+
<method>com.google.cloud.bigquery.storage.v1beta2.JsonStreamWriter$Builder createDefaultStream()</method>
33+
</difference>
2934
</differences>

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter.java

Lines changed: 42 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,17 @@
1616
package com.google.cloud.bigquery.storage.v1beta2;
1717

1818
import com.google.api.core.ApiFuture;
19-
import com.google.api.gax.batching.BatchingSettings;
2019
import com.google.api.gax.batching.FlowControlSettings;
2120
import com.google.api.gax.core.CredentialsProvider;
2221
import com.google.api.gax.rpc.TransportChannelProvider;
2322
import com.google.cloud.bigquery.Schema;
2423
import com.google.common.base.Preconditions;
2524
import com.google.protobuf.Descriptors;
2625
import com.google.protobuf.Descriptors.Descriptor;
27-
import com.google.protobuf.Int64Value;
2826
import com.google.protobuf.Message;
2927
import java.io.IOException;
3028
import java.util.logging.Logger;
29+
import java.util.regex.Matcher;
3130
import java.util.regex.Pattern;
3231
import javax.annotation.Nullable;
3332
import org.json.JSONArray;
@@ -46,8 +45,8 @@ public class JsonStreamWriter implements AutoCloseable {
4645

4746
private BigQueryWriteClient client;
4847
private String streamName;
49-
private StreamWriter streamWriter;
50-
private StreamWriter.Builder streamWriterBuilder;
48+
private StreamWriterV2 streamWriter;
49+
private StreamWriterV2.Builder streamWriterBuilder;
5150
private Descriptor descriptor;
5251
private TableSchema tableSchema;
5352

@@ -64,18 +63,18 @@ private JsonStreamWriter(Builder builder)
6463
BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(builder.tableSchema);
6564

6665
if (this.client == null) {
67-
streamWriterBuilder = StreamWriter.newBuilder(builder.streamOrTableName);
66+
streamWriterBuilder = StreamWriterV2.newBuilder(builder.streamName);
6867
} else {
69-
streamWriterBuilder = StreamWriter.newBuilder(builder.streamOrTableName, builder.client);
68+
streamWriterBuilder = StreamWriterV2.newBuilder(builder.streamName, builder.client);
7069
}
70+
streamWriterBuilder.setWriterSchema(ProtoSchemaConverter.convert(this.descriptor));
7171
setStreamWriterSettings(
7272
builder.channelProvider,
7373
builder.credentialsProvider,
7474
builder.endpoint,
75-
builder.flowControlSettings,
76-
builder.createDefaultStream);
75+
builder.flowControlSettings);
7776
this.streamWriter = streamWriterBuilder.build();
78-
this.streamName = this.streamWriter.getStreamNameString();
77+
this.streamName = builder.streamName;
7978
}
8079

8180
/**
@@ -109,17 +108,10 @@ public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, long offset) {
109108
Message protoMessage = JsonToProtoMessage.convertJsonToProtoMessage(this.descriptor, json);
110109
rowsBuilder.addSerializedRows(protoMessage.toByteString());
111110
}
112-
AppendRowsRequest.ProtoData.Builder data = AppendRowsRequest.ProtoData.newBuilder();
113111
// Need to make sure refreshAppendAndSetDescriptor finish first before this can run
114112
synchronized (this) {
115-
data.setWriterSchema(ProtoSchemaConverter.convert(this.descriptor));
116-
data.setRows(rowsBuilder.build());
117-
AppendRowsRequest.Builder request = AppendRowsRequest.newBuilder().setProtoRows(data.build());
118-
if (offset >= 0) {
119-
request.setOffset(Int64Value.of(offset));
120-
}
121113
final ApiFuture<AppendRowsResponse> appendResponseFuture =
122-
this.streamWriter.append(request.build());
114+
this.streamWriter.append(rowsBuilder.build(), offset);
123115
return appendResponseFuture;
124116
}
125117
}
@@ -134,7 +126,7 @@ public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, long offset) {
134126
void refreshConnection()
135127
throws IOException, InterruptedException, Descriptors.DescriptorValidationException {
136128
synchronized (this) {
137-
this.streamWriter.shutdown();
129+
this.streamWriter.close();
138130
this.streamWriter = streamWriterBuilder.build();
139131
this.descriptor =
140132
BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(this.tableSchema);
@@ -164,41 +156,28 @@ private void setStreamWriterSettings(
164156
@Nullable TransportChannelProvider channelProvider,
165157
@Nullable CredentialsProvider credentialsProvider,
166158
@Nullable String endpoint,
167-
@Nullable FlowControlSettings flowControlSettings,
168-
Boolean createDefaultStream) {
159+
@Nullable FlowControlSettings flowControlSettings) {
169160
if (channelProvider != null) {
170161
streamWriterBuilder.setChannelProvider(channelProvider);
171162
}
172163
if (credentialsProvider != null) {
173164
streamWriterBuilder.setCredentialsProvider(credentialsProvider);
174165
}
175-
BatchingSettings.Builder batchSettingBuilder =
176-
BatchingSettings.newBuilder()
177-
.setElementCountThreshold(1L)
178-
.setRequestByteThreshold(4 * 1024 * 1024L);
179-
if (flowControlSettings != null) {
180-
streamWriterBuilder.setBatchingSettings(
181-
batchSettingBuilder.setFlowControlSettings(flowControlSettings).build());
182-
} else {
183-
streamWriterBuilder.setBatchingSettings(batchSettingBuilder.build());
184-
}
185166
if (endpoint != null) {
186167
streamWriterBuilder.setEndpoint(endpoint);
187168
}
188-
if (createDefaultStream) {
189-
streamWriterBuilder.createDefaultStream();
169+
if (flowControlSettings != null) {
170+
if (flowControlSettings.getMaxOutstandingRequestBytes() != null) {
171+
streamWriterBuilder.setMaxInflightBytes(
172+
flowControlSettings.getMaxOutstandingRequestBytes());
173+
}
174+
if (flowControlSettings.getMaxOutstandingElementCount() != null) {
175+
streamWriterBuilder.setMaxInflightRequests(
176+
flowControlSettings.getMaxOutstandingElementCount());
177+
}
190178
}
191179
}
192180

193-
/**
194-
* Setter for table schema. Used for schema updates.
195-
*
196-
* @param tableSchema
197-
*/
198-
void setTableSchema(TableSchema tableSchema) {
199-
this.tableSchema = tableSchema;
200-
}
201-
202181
/**
203182
* newBuilder that constructs a JsonStreamWriter builder with BigQuery client being initialized by
204183
* StreamWriter by default.
@@ -259,7 +238,7 @@ public void close() {
259238
}
260239

261240
public static final class Builder {
262-
private String streamOrTableName;
241+
private String streamName;
263242
private BigQueryWriteClient client;
264243
private TableSchema tableSchema;
265244

@@ -269,6 +248,13 @@ public static final class Builder {
269248
private String endpoint;
270249
private boolean createDefaultStream = false;
271250

251+
private static String streamPatternString =
252+
"(projects/[^/]+/datasets/[^/]+/tables/[^/]+)/streams/[^/]+";
253+
private static String tablePatternString = "(projects/[^/]+/datasets/[^/]+/tables/[^/]+)";
254+
255+
private static Pattern streamPattern = Pattern.compile(streamPatternString);
256+
private static Pattern tablePattern = Pattern.compile(tablePatternString);
257+
272258
/**
273259
* Constructor for JsonStreamWriter's Builder
274260
*
@@ -279,7 +265,17 @@ public static final class Builder {
279265
* @param client
280266
*/
281267
private Builder(String streamOrTableName, TableSchema tableSchema, BigQueryWriteClient client) {
282-
this.streamOrTableName = streamOrTableName;
268+
Matcher streamMatcher = streamPattern.matcher(streamOrTableName);
269+
if (!streamMatcher.matches()) {
270+
Matcher tableMatcher = tablePattern.matcher(streamOrTableName);
271+
if (!tableMatcher.matches()) {
272+
throw new IllegalArgumentException("Invalid name: " + streamOrTableName);
273+
} else {
274+
this.streamName = streamOrTableName + "/_default";
275+
}
276+
} else {
277+
this.streamName = streamOrTableName;
278+
}
283279
this.tableSchema = tableSchema;
284280
this.client = client;
285281
}
@@ -322,13 +318,12 @@ public Builder setFlowControlSettings(FlowControlSettings flowControlSettings) {
322318
}
323319

324320
/**
325-
* If it is writing to a default stream.
321+
* Stream name on the builder.
326322
*
327323
* @return Builder
328324
*/
329-
public Builder createDefaultStream() {
330-
this.createDefaultStream = true;
331-
return this;
325+
public String getStreamName() {
326+
return streamName;
332327
}
333328

334329
/**

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriterTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -529,7 +529,6 @@ public void testCreateDefaultStream() throws Exception {
529529
.build());
530530
try (JsonStreamWriter writer =
531531
JsonStreamWriter.newBuilder(TEST_TABLE, v2Schema)
532-
.createDefaultStream()
533532
.setChannelProvider(channelProvider)
534533
.setCredentialsProvider(NoCredentialsProvider.create())
535534
.build()) {

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryBigDecimalByteStringEncoderTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,6 @@ public void TestBigDecimalEncoding()
105105
TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, TABLE);
106106
try (JsonStreamWriter jsonStreamWriter =
107107
JsonStreamWriter.newBuilder(parent.toString(), tableInfo.getDefinition().getSchema())
108-
.createDefaultStream()
109108
.build()) {
110109
JSONObject row = new JSONObject();
111110
row.put(

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryTimeEncoderTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,6 @@ public void TestTimeEncoding()
104104
TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, TABLE);
105105
try (JsonStreamWriter jsonStreamWriter =
106106
JsonStreamWriter.newBuilder(parent.toString(), tableInfo.getDefinition().getSchema())
107-
.createDefaultStream()
108107
.build()) {
109108
JSONObject row = new JSONObject();
110109
row.put("test_str", "Start of the day");

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryWriteManualClientTest.java

Lines changed: 0 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
import org.junit.Assert;
4242
import org.junit.BeforeClass;
4343
import org.junit.Test;
44-
import org.threeten.bp.Duration;
4544
import org.threeten.bp.LocalDateTime;
4645

4746
/** Integration tests for BigQuery Write API. */
@@ -180,54 +179,6 @@ private AppendRowsRequest.Builder createAppendRequestComplicateType(
180179
return requestBuilder.setProtoRows(dataBuilder.build()).setWriteStream(streamName);
181180
}
182181

183-
@Test
184-
public void testBatchWriteWithCommittedStream()
185-
throws IOException, InterruptedException, ExecutionException {
186-
WriteStream writeStream =
187-
client.createWriteStream(
188-
CreateWriteStreamRequest.newBuilder()
189-
.setParent(tableId)
190-
.setWriteStream(
191-
WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build())
192-
.build());
193-
try (StreamWriter streamWriter =
194-
StreamWriter.newBuilder(writeStream.getName())
195-
.setBatchingSettings(
196-
StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
197-
.toBuilder()
198-
.setRequestByteThreshold(1024 * 1024L) // 1 Mb
199-
.setElementCountThreshold(2L)
200-
.setDelayThreshold(Duration.ofSeconds(2))
201-
.build())
202-
.build()) {
203-
LOG.info("Sending one message");
204-
ApiFuture<AppendRowsResponse> response =
205-
streamWriter.append(
206-
createAppendRequest(writeStream.getName(), new String[] {"aaa"}).build());
207-
assertEquals(0, response.get().getAppendResult().getOffset().getValue());
208-
209-
LOG.info("Sending two more messages");
210-
ApiFuture<AppendRowsResponse> response1 =
211-
streamWriter.append(
212-
createAppendRequest(writeStream.getName(), new String[] {"bbb", "ccc"}).build());
213-
ApiFuture<AppendRowsResponse> response2 =
214-
streamWriter.append(
215-
createAppendRequest(writeStream.getName(), new String[] {"ddd"}).build());
216-
assertEquals(1, response1.get().getAppendResult().getOffset().getValue());
217-
assertEquals(3, response2.get().getAppendResult().getOffset().getValue());
218-
219-
TableResult result =
220-
bigquery.listTableData(
221-
tableInfo.getTableId(), BigQuery.TableDataListOption.startIndex(0L));
222-
Iterator<FieldValueList> iter = result.getValues().iterator();
223-
assertEquals("aaa", iter.next().get(0).getStringValue());
224-
assertEquals("bbb", iter.next().get(0).getStringValue());
225-
assertEquals("ccc", iter.next().get(0).getStringValue());
226-
assertEquals("ddd", iter.next().get(0).getStringValue());
227-
assertEquals(false, iter.hasNext());
228-
}
229-
}
230-
231182
ProtoRows CreateProtoRows(String[] messages) {
232183
ProtoRows.Builder rows = ProtoRows.newBuilder();
233184
for (String message : messages) {
@@ -389,7 +340,6 @@ public void testJsonStreamWriterWithDefaultStream()
389340
TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName);
390341
try (JsonStreamWriter jsonStreamWriter =
391342
JsonStreamWriter.newBuilder(parent.toString(), tableInfo.getDefinition().getSchema())
392-
.createDefaultStream()
393343
.build()) {
394344
LOG.info("Sending one message");
395345
JSONObject row1 = new JSONObject();

samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public static void writeToDefaultStream(String projectId, String datasetName, St
5454
// For more information about JsonStreamWriter, see:
5555
// https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1beta2/JstreamWriter.html
5656
try (JsonStreamWriter writer =
57-
JsonStreamWriter.newBuilder(parentTable.toString(), schema).createDefaultStream().build()) {
57+
JsonStreamWriter.newBuilder(parentTable.toString(), schema).build()) {
5858
// Append 10 JSON objects to the stream.
5959
for (int i = 0; i < 10; i++) {
6060
// Create a JSON object that is compatible with the table schema.

0 commit comments

Comments
 (0)