Skip to content

Commit 036d2e6

Browse files
authored
fix: Persist missingValueInterpretationMap in StreamWriter's Builder (#2587)
* Persist missingValueInterpretationMap in StreamWriter's Builder. In case the StreamWriter is recreated, the map will be used in the new StreamWriter too. * Fix format * Addressed comments: Revived the getMissingValueInterpretationMap method and added Integration Test.
1 parent 95a9977 commit 036d2e6

File tree

7 files changed

+328
-74
lines changed

7 files changed

+328
-74
lines changed

google-cloud-bigquerystorage/clirr-ignored-differences.xml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,5 +197,20 @@
197197
<differenceType>1001</differenceType>
198198
<className>com/google/cloud/bigquery/storage/v1/StreamWriter$SingleConnectionOrConnectionPool</className>
199199
</difference>
200+
<difference>
201+
<differenceType>7002</differenceType>
202+
<className>com/google/cloud/bigquery/storage/v1/JsonStreamWriter</className>
203+
<method>void setMissingValueInterpretationMap(java.util.Map)</method>
204+
</difference>
205+
<difference>
206+
<differenceType>7002</differenceType>
207+
<className>com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter</className>
208+
<method>void setMissingValueInterpretationMap(java.util.Map)</method>
209+
</difference>
210+
<difference>
211+
<differenceType>7002</differenceType>
212+
<className>com/google/cloud/bigquery/storage/v1/StreamWriter</className>
213+
<method>void setMissingValueInterpretationMap(java.util.Map)</method>
214+
</difference>
200215
</differences>
201216

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -119,18 +119,6 @@ public long getInflightWaitSeconds() {
119119
return this.schemaAwareStreamWriter.getInflightWaitSeconds();
120120
}
121121

122-
/**
123-
* Sets the missing value interpretation map for the JsonStreamWriter. The input
124-
* missingValueInterpretationMap is used for all append requests unless otherwise changed.
125-
*
126-
* @param missingValueInterpretationMap the missing value interpretation map used by the
127-
* JsonStreamWriter.
128-
*/
129-
public void setMissingValueInterpretationMap(
130-
Map<String, AppendRowsRequest.MissingValueInterpretation> missingValueInterpretationMap) {
131-
this.schemaAwareStreamWriter.setMissingValueInterpretationMap(missingValueInterpretationMap);
132-
}
133-
134122
/** @return the missing value interpretation map used for the writer. */
135123
public Map<String, AppendRowsRequest.MissingValueInterpretation>
136124
getMissingValueInterpretationMap() {
@@ -414,6 +402,21 @@ public Builder setDefaultMissingValueInterpretation(
414402
return this;
415403
}
416404

405+
/**
406+
* Sets the missing value interpretation map for the JsonStreamWriter. The input
407+
* missingValueInterpretationMap is used for all append requests unless otherwise changed.
408+
*
409+
* @param missingValueInterpretationMap the missing value interpretation map used by the
410+
* JsonStreamWriter.
411+
* @return Builder
412+
*/
413+
public Builder setMissingValueInterpretationMap(
414+
Map<String, AppendRowsRequest.MissingValueInterpretation> missingValueInterpretationMap) {
415+
this.schemaAwareStreamWriterBuilder.setMissingValueInterpretationMap(
416+
missingValueInterpretationMap);
417+
return this;
418+
}
419+
417420
/**
418421
* Builds JsonStreamWriter
419422
*

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter.java

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ private SchemaAwareStreamWriter(Builder<T> builder)
105105
streamWriterBuilder.setLocation(builder.location);
106106
streamWriterBuilder.setDefaultMissingValueInterpretation(
107107
builder.defaultMissingValueInterpretation);
108+
streamWriterBuilder.setMissingValueInterpretationMap(builder.missingValueInterpretationMap);
108109
streamWriterBuilder.setClientId(builder.clientId);
109110
streamWriterBuilder.setEnableLatencyProfiler(builder.enableRequestProfiler);
110111
requestProfilerHook = new RequestProfiler.RequestProfilerHook(builder.enableRequestProfiler);
@@ -298,18 +299,6 @@ public long getInflightWaitSeconds() {
298299
return streamWriter.getInflightWaitSeconds();
299300
}
300301

301-
/**
302-
* Sets the missing value interpretation map for the SchemaAwareStreamWriter. The input
303-
* missingValueInterpretationMap is used for all append requests unless otherwise changed.
304-
*
305-
* @param missingValueInterpretationMap the missing value interpretation map used by the
306-
* SchemaAwareStreamWriter.
307-
*/
308-
public void setMissingValueInterpretationMap(
309-
Map<String, AppendRowsRequest.MissingValueInterpretation> missingValueInterpretationMap) {
310-
streamWriter.setMissingValueInterpretationMap(missingValueInterpretationMap);
311-
}
312-
313302
/** @return the missing value interpretation map used for the writer. */
314303
public Map<String, AppendRowsRequest.MissingValueInterpretation>
315304
getMissingValueInterpretationMap() {
@@ -475,6 +464,8 @@ public static final class Builder<T> {
475464

476465
private AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation =
477466
MissingValueInterpretation.MISSING_VALUE_INTERPRETATION_UNSPECIFIED;
467+
private Map<String, AppendRowsRequest.MissingValueInterpretation>
468+
missingValueInterpretationMap = new HashMap();
478469
private String clientId;
479470

480471
private boolean enableRequestProfiler = false;
@@ -684,6 +675,20 @@ public Builder setDefaultMissingValueInterpretation(
684675
return this;
685676
}
686677

678+
/**
679+
* Sets the missing value interpretation map for the SchemaAwareStreamWriter. The input
680+
* missingValueInterpretationMap is used for all append requests unless otherwise changed.
681+
*
682+
* @param missingValueInterpretationMap the missing value interpretation map used by the
683+
* SchemaAwareStreamWriter.
684+
* @return Builder
685+
*/
686+
public Builder setMissingValueInterpretationMap(
687+
Map<String, AppendRowsRequest.MissingValueInterpretation> missingValueInterpretationMap) {
688+
this.missingValueInterpretationMap = missingValueInterpretationMap;
689+
return this;
690+
}
691+
687692
/**
688693
* Sets the RetrySettings to use for in-stream error retry.
689694
*

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -68,11 +68,6 @@ public class StreamWriter implements AutoCloseable {
6868
// Cache of location info for a given dataset.
6969
private static Map<String, String> projectAndDatasetToLocation = new ConcurrentHashMap<>();
7070

71-
// Map of fields to their MissingValueInterpretation, which dictates how a field should be
72-
// populated when it is missing from an input user row.
73-
private Map<String, AppendRowsRequest.MissingValueInterpretation> missingValueInterpretationMap =
74-
new HashMap();
75-
7671
/*
7772
* The identifier of stream to write to.
7873
*/
@@ -103,6 +98,11 @@ public class StreamWriter implements AutoCloseable {
10398
private AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation =
10499
MissingValueInterpretation.MISSING_VALUE_INTERPRETATION_UNSPECIFIED;
105100

101+
// Map of fields to their MissingValueInterpretation, which dictates how a field should be
102+
// populated when it is missing from an input user row.
103+
private Map<String, AppendRowsRequest.MissingValueInterpretation> missingValueInterpretationMap =
104+
new HashMap();
105+
106106
/**
107107
* Stream can access a single connection or a pool of connection depending on whether multiplexing
108108
* is enabled.
@@ -229,6 +229,7 @@ private StreamWriter(Builder builder) throws IOException {
229229
this.streamName = builder.streamName;
230230
this.writerSchema = builder.writerSchema;
231231
this.defaultMissingValueInterpretation = builder.defaultMissingValueInterpretation;
232+
this.missingValueInterpretationMap = builder.missingValueInterpretationMap;
232233
BigQueryWriteSettings clientSettings = getBigQueryWriteSettings(builder);
233234
this.requestProfilerHook =
234235
new RequestProfiler.RequestProfilerHook(builder.enableRequestProfiler);
@@ -420,18 +421,6 @@ private void validateFetchedConnectonPool(StreamWriter.Builder builder) {
420421
}
421422
}
422423

423-
/**
424-
* Sets the missing value interpretation map for the stream writer. The input
425-
* missingValueInterpretationMap is used for all write requests unless otherwise changed.
426-
*
427-
* @param missingValueInterpretationMap the missing value interpretation map used by stream
428-
* writer.
429-
*/
430-
public void setMissingValueInterpretationMap(
431-
Map<String, AppendRowsRequest.MissingValueInterpretation> missingValueInterpretationMap) {
432-
this.missingValueInterpretationMap = missingValueInterpretationMap;
433-
}
434-
435424
/**
436425
* Schedules the writing of rows at the end of current stream.
437426
*
@@ -700,6 +689,9 @@ public static final class Builder {
700689
private AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation =
701690
MissingValueInterpretation.MISSING_VALUE_INTERPRETATION_UNSPECIFIED;
702691

692+
private Map<String, AppendRowsRequest.MissingValueInterpretation>
693+
missingValueInterpretationMap = new HashMap();
694+
703695
private boolean enableRequestProfiler = false;
704696
private boolean enableOpenTelemetry = false;
705697

@@ -851,6 +843,20 @@ public Builder setDefaultMissingValueInterpretation(
851843
return this;
852844
}
853845

846+
/**
847+
* Sets the missing value interpretation map for the stream writer. The input
848+
* missingValueInterpretationMap is used for all write requests unless otherwise changed.
849+
*
850+
* @param missingValueInterpretationMap the missing value interpretation map used by stream
851+
* writer.
852+
* @return Builder
853+
*/
854+
public Builder setMissingValueInterpretationMap(
855+
Map<String, AppendRowsRequest.MissingValueInterpretation> missingValueInterpretationMap) {
856+
this.missingValueInterpretationMap = missingValueInterpretationMap;
857+
return this;
858+
}
859+
854860
/**
855861
* Enable a latency profiler that would periodically generate a detailed latency report for the
856862
* top latency requests. This is currently an experimental API.

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java

Lines changed: 97 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -868,6 +868,95 @@ public void testSimpleSchemaUpdate_skipRefreshWriterIfSchemaProvided() throws Ex
868868
}
869869
}
870870

871+
@Test
872+
public void testSimpleSchemaUpdate_withInterpretationMap() throws Exception {
873+
testBigQueryWrite.addResponse(
874+
WriteStream.newBuilder()
875+
.setName(TEST_STREAM)
876+
.setTableSchema(TABLE_SCHEMA)
877+
.setLocation("us")
878+
.build());
879+
Map<String, AppendRowsRequest.MissingValueInterpretation> missingValueMap = new HashMap<>();
880+
missingValueMap.put("col1", AppendRowsRequest.MissingValueInterpretation.NULL_VALUE);
881+
missingValueMap.put("col3", AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE);
882+
883+
try (JsonStreamWriter writer =
884+
getTestJsonStreamWriterBuilder(TEST_STREAM)
885+
.setDefaultMissingValueInterpretation(MissingValueInterpretation.DEFAULT_VALUE)
886+
.setMissingValueInterpretationMap(missingValueMap)
887+
.build()) {
888+
889+
testBigQueryWrite.addResponse(
890+
AppendRowsResponse.newBuilder()
891+
.setAppendResult(
892+
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build())
893+
.setUpdatedSchema(UPDATED_TABLE_SCHEMA)
894+
.build());
895+
testBigQueryWrite.addResponse(createAppendResponse(1));
896+
// Verify the map before the writer is refreshed
897+
assertEquals(missingValueMap, writer.getMissingValueInterpretationMap());
898+
testBigQueryWrite.addResponse(createAppendResponse(2));
899+
testBigQueryWrite.addResponse(createAppendResponse(3));
900+
901+
// First batch of appends. First append request will return an updated-schema, but the second
902+
// and maybe the third append will be processed before the first response will refresh the
903+
// StreamWriter.
904+
JSONObject foo = new JSONObject();
905+
foo.put("foo", "aaa");
906+
JSONArray jsonArr = new JSONArray();
907+
jsonArr.put(foo);
908+
909+
ApiFuture<AppendRowsResponse> appendFuture1 = writer.append(jsonArr);
910+
ApiFuture<AppendRowsResponse> appendFuture2 = writer.append(jsonArr);
911+
ApiFuture<AppendRowsResponse> appendFuture3 = writer.append(jsonArr);
912+
913+
assertEquals(0L, appendFuture1.get().getAppendResult().getOffset().getValue());
914+
assertEquals(1L, appendFuture2.get().getAppendResult().getOffset().getValue());
915+
assertEquals(2L, appendFuture3.get().getAppendResult().getOffset().getValue());
916+
917+
// Another append, this time with columns to match the updated schema.
918+
JSONObject updatedFoo = new JSONObject();
919+
updatedFoo.put("foo", "aaa");
920+
updatedFoo.put("bar", "bbb");
921+
JSONArray updatedJsonArr = new JSONArray();
922+
updatedJsonArr.put(updatedFoo);
923+
ApiFuture<AppendRowsResponse> appendFuture4 = writer.append(updatedJsonArr);
924+
925+
assertEquals(3L, appendFuture4.get().getAppendResult().getOffset().getValue());
926+
assertEquals(4, testBigQueryWrite.getAppendRequests().size());
927+
assertEquals(
928+
1,
929+
testBigQueryWrite
930+
.getAppendRequests()
931+
.get(3)
932+
.getProtoRows()
933+
.getRows()
934+
.getSerializedRowsCount());
935+
assertEquals(
936+
testBigQueryWrite
937+
.getAppendRequests()
938+
.get(3)
939+
.getProtoRows()
940+
.getRows()
941+
.getSerializedRows(0),
942+
UpdatedFooType.newBuilder().setFoo("aaa").setBar("bbb").build().toByteString());
943+
944+
assertTrue(testBigQueryWrite.getAppendRequests().get(0).getProtoRows().hasWriterSchema());
945+
assertTrue(
946+
testBigQueryWrite.getAppendRequests().get(2).getProtoRows().hasWriterSchema()
947+
|| testBigQueryWrite.getAppendRequests().get(3).getProtoRows().hasWriterSchema());
948+
949+
// Verify the map after the writer is refreshed
950+
assertEquals(missingValueMap, writer.getMissingValueInterpretationMap());
951+
assertEquals(
952+
testBigQueryWrite.getAppendRequests().get(3).getDefaultMissingValueInterpretation(),
953+
MissingValueInterpretation.DEFAULT_VALUE);
954+
assertEquals(
955+
testBigQueryWrite.getAppendRequests().get(3).getMissingValueInterpretations(),
956+
missingValueMap);
957+
}
958+
}
959+
871960
@Test
872961
public void testWithoutIgnoreUnknownFieldsUpdateImmeidateSuccess() throws Exception {
873962
TableSchema tableSchema = TableSchema.newBuilder().addFields(0, TEST_INT).build();
@@ -1523,13 +1612,16 @@ public void testAppendWithMissingValueMap() throws Exception {
15231612
JSONArray jsonArr = new JSONArray();
15241613
jsonArr.put(flexible);
15251614

1615+
Map<String, AppendRowsRequest.MissingValueInterpretation> missingValueMap = new HashMap<>();
1616+
missingValueMap.put("col1", AppendRowsRequest.MissingValueInterpretation.NULL_VALUE);
1617+
missingValueMap.put("col3", AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE);
1618+
15261619
try (JsonStreamWriter writer =
1527-
getTestJsonStreamWriterBuilder(TEST_STREAM, tableSchema).setTraceId("test:empty").build()) {
1620+
getTestJsonStreamWriterBuilder(TEST_STREAM, tableSchema)
1621+
.setMissingValueInterpretationMap(missingValueMap)
1622+
.setTraceId("test:empty")
1623+
.build()) {
15281624

1529-
Map<String, AppendRowsRequest.MissingValueInterpretation> missingValueMap = new HashMap<>();
1530-
missingValueMap.put("col1", AppendRowsRequest.MissingValueInterpretation.NULL_VALUE);
1531-
missingValueMap.put("col3", AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE);
1532-
writer.setMissingValueInterpretationMap(missingValueMap);
15331625
assertEquals(missingValueMap, writer.getMissingValueInterpretationMap());
15341626

15351627
testBigQueryWrite.addResponse(

0 commit comments

Comments
 (0)