Skip to content

Commit 2e49ce8

Browse files
authored
feat: update StreamWriterV2 to support trace id (#895)
* feat: StreamWriterV2 will attach stream name in the first request in the connection and remove stream name and schema in the following ones. * feat: StreamWriterV2 will attach stream name in the first request in the connection and remove stream name and schema in the following ones * feat: StreamWriterV2 to support traceId * Enforce traceId to be the format of A:B
1 parent 0870797 commit 2e49ce8

File tree

2 files changed

+61
-4
lines changed

2 files changed

+61
-4
lines changed

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,6 @@
4141
/**
4242
* A BigQuery Stream Writer that can be used to write data into BigQuery Table.
4343
*
44-
* <p>TODO: Attach traceId.
45-
*
4644
* <p>TODO: Support batching.
4745
*
4846
* <p>TODO: Support schema change.
@@ -74,6 +72,11 @@ public class StreamWriterV2 implements AutoCloseable {
7472
*/
7573
private final long maxInflightBytes;
7674

75+
/*
76+
* TraceId for debugging purpose.
77+
*/
78+
private final String traceId;
79+
7780
/*
7881
* Tracks current inflight requests in the stream.
7982
*/
@@ -143,6 +146,7 @@ private StreamWriterV2(Builder builder) throws IOException {
143146
this.writerSchema = builder.writerSchema;
144147
this.maxInflightRequests = builder.maxInflightRequest;
145148
this.maxInflightBytes = builder.maxInflightBytes;
149+
this.traceId = builder.traceId;
146150
this.waitingRequestQueue = new LinkedList<AppendRequestAndResponse>();
147151
this.inflightRequestQueue = new LinkedList<AppendRequestAndResponse>();
148152
if (builder.client == null) {
@@ -433,6 +437,9 @@ private AppendRowsRequest prepareRequestBasedOnPosition(
433437
requestBuilder.getProtoRowsBuilder().setWriterSchema(this.writerSchema);
434438
}
435439
requestBuilder.setWriteStream(this.streamName);
440+
if (this.traceId != null) {
441+
requestBuilder.setTraceId(this.traceId);
442+
}
436443
} else {
437444
requestBuilder.clearWriteStream();
438445
requestBuilder.getProtoRowsBuilder().clearWriterSchema();
@@ -539,6 +546,8 @@ public static final class Builder {
539546
private CredentialsProvider credentialsProvider =
540547
BigQueryWriteSettings.defaultCredentialsProviderBuilder().build();
541548

549+
private String traceId = null;
550+
542551
private Builder(String streamName) {
543552
this.streamName = Preconditions.checkNotNull(streamName);
544553
this.client = null;
@@ -591,6 +600,20 @@ public Builder setCredentialsProvider(CredentialsProvider credentialsProvider) {
591600
return this;
592601
}
593602

603+
/**
604+
* Sets traceId for debuging purpose. TraceId must follow the format of
605+
* CustomerDomain:DebugString, e.g. DATAFLOW:job_id_x.
606+
*/
607+
public Builder setTraceId(String traceId) {
608+
int colonIndex = traceId.indexOf(':');
609+
if (colonIndex == -1 || colonIndex == 0 || colonIndex == traceId.length() - 1) {
610+
throw new IllegalArgumentException(
611+
"TraceId must follow the format of A:B. Actual:" + traceId);
612+
}
613+
this.traceId = traceId;
614+
return this;
615+
}
616+
594617
/** Builds the {@code StreamWriterV2}. */
595618
public StreamWriterV2 build() throws IOException {
596619
return new StreamWriterV2(this);

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
public class StreamWriterV2Test {
5555
private static final Logger log = Logger.getLogger(StreamWriterV2Test.class.getName());
5656
private static final String TEST_STREAM = "projects/p/datasets/d/tables/t/streams/s";
57+
private static final String TEST_TRACE_ID = "DATAFLOW:job_id";
5758
private FakeScheduledExecutorService fakeExecutor;
5859
private FakeBigQueryWrite testBigQueryWrite;
5960
private static MockServiceHelper serviceHelper;
@@ -84,7 +85,7 @@ public void tearDown() throws Exception {
8485
}
8586

8687
private StreamWriterV2 getTestStreamWriterV2() throws IOException {
87-
return StreamWriterV2.newBuilder(TEST_STREAM, client).build();
88+
return StreamWriterV2.newBuilder(TEST_STREAM, client).setTraceId(TEST_TRACE_ID).build();
8889
}
8990

9091
private ProtoSchema createProtoSchema() {
@@ -184,10 +185,12 @@ private void verifyAppendRequests(long appendCount) {
184185
// First request received by server should have schema and stream name.
185186
assertTrue(serverRequest.getProtoRows().hasWriterSchema());
186187
assertEquals(serverRequest.getWriteStream(), TEST_STREAM);
188+
assertEquals(serverRequest.getTraceId(), TEST_TRACE_ID);
187189
} else {
188190
// Following request should not have schema and stream name.
189191
assertFalse(serverRequest.getProtoRows().hasWriterSchema());
190192
assertEquals(serverRequest.getWriteStream(), "");
193+
assertEquals(serverRequest.getTraceId(), "");
191194
}
192195
}
193196
}
@@ -209,7 +212,10 @@ public void testBuildBigQueryWriteClientInWriter() throws Exception {
209212
@Test
210213
public void testAppendWithRowsSuccess() throws Exception {
211214
StreamWriterV2 writer =
212-
StreamWriterV2.newBuilder(TEST_STREAM, client).setWriterSchema(createProtoSchema()).build();
215+
StreamWriterV2.newBuilder(TEST_STREAM, client)
216+
.setWriterSchema(createProtoSchema())
217+
.setTraceId(TEST_TRACE_ID)
218+
.build();
213219

214220
long appendCount = 100;
215221
for (int i = 0; i < appendCount; i++) {
@@ -269,6 +275,34 @@ public void run() throws Throwable {
269275
assertTrue(ex.getStatus().getDescription().contains("Writer schema must be provided"));
270276
}
271277

278+
@Test
279+
public void testInvalidTraceId() throws Exception {
280+
assertThrows(
281+
IllegalArgumentException.class,
282+
new ThrowingRunnable() {
283+
@Override
284+
public void run() throws Throwable {
285+
StreamWriterV2.newBuilder(TEST_STREAM).setTraceId("abc");
286+
}
287+
});
288+
assertThrows(
289+
IllegalArgumentException.class,
290+
new ThrowingRunnable() {
291+
@Override
292+
public void run() throws Throwable {
293+
StreamWriterV2.newBuilder(TEST_STREAM).setTraceId("abc:");
294+
}
295+
});
296+
assertThrows(
297+
IllegalArgumentException.class,
298+
new ThrowingRunnable() {
299+
@Override
300+
public void run() throws Throwable {
301+
StreamWriterV2.newBuilder(TEST_STREAM).setTraceId(":abc");
302+
}
303+
});
304+
}
305+
272306
@Test
273307
public void testAppendSuccessAndConnectionError() throws Exception {
274308
StreamWriterV2 writer = getTestStreamWriterV2();

0 commit comments

Comments
 (0)