3131import com .google .cloud .bigquery .storage .v1 .StreamConnection .RequestCallback ;
3232import com .google .common .annotations .VisibleForTesting ;
3333import com .google .common .base .Preconditions ;
34+ import com .google .common .collect .ImmutableList ;
3435import com .google .common .util .concurrent .Uninterruptibles ;
3536import com .google .protobuf .Int64Value ;
3637import io .grpc .Status ;
4041import io .opentelemetry .api .common .Attributes ;
4142import io .opentelemetry .api .common .AttributesBuilder ;
4243import io .opentelemetry .api .metrics .LongCounter ;
44+ import io .opentelemetry .api .metrics .LongHistogram ;
4345import io .opentelemetry .api .metrics .Meter ;
4446import io .opentelemetry .api .metrics .MeterProvider ;
4547import java .io .IOException ;
@@ -259,6 +261,7 @@ class ConnectionWorker implements AutoCloseable {
259261 private static Pattern streamPatternTable = Pattern .compile (tableMatching );
260262 private Meter writeMeter ;
261263 static AttributeKey <String > telemetryKeyTableId = AttributeKey .stringKey ("table_id" );
264+ static AttributeKey <String > telemetryKeyWriterId = AttributeKey .stringKey ("writer_id" );
262265 private static String dataflowPrefix = "dataflow:" ;
263266 static List <AttributeKey <String >> telemetryKeysTraceId =
264267 new ArrayList <AttributeKey <String >>() {
@@ -268,10 +271,25 @@ class ConnectionWorker implements AutoCloseable {
268271 add (AttributeKey .stringKey ("trace_field_3" ));
269272 }
270273 };
274+ static AttributeKey <String > telemetryKeyErrorCode = AttributeKey .stringKey ("error_code" );
275+ static AttributeKey <String > telemetryKeyIsRetry = AttributeKey .stringKey ("is_retry" );
271276 private Attributes telemetryAttributes ;
272- private LongCounter instrumentIncomingRequestCount ;
273- private LongCounter instrumentIncomingRequestSize ;
274- private LongCounter instrumentIncomingRequestRows ;
277+ // Latency buckets are based on a list of 1.5 ^ n
278+ private static final List <Long > METRICS_MILLISECONDS_LATENCY_BUCKETS =
279+ ImmutableList .of (
280+ 0L , 17L , 38L , 86L , 195L , 438L , 985L , 2217L , 4988L , 11223L , 25251L , 56815L , 127834L ,
281+ 287627L , 647160L );
282+
283+ private static final class OpenTelemetryMetrics {
284+ private LongCounter instrumentAckedRequestCount ;
285+ private LongCounter instrumentAckedRequestSize ;
286+ private LongCounter instrumentAckedRequestRows ;
287+ private LongHistogram instrumentNetworkResponseLatency ;
288+ private LongCounter instrumentConnectionStartCount ;
289+ private LongCounter instrumentConnectionEndCount ;
290+ }
291+
292+ private OpenTelemetryMetrics telemetryMetrics = new OpenTelemetryMetrics ();
275293
276294 public static Boolean isDefaultStreamName (String streamName ) {
277295 Matcher matcher = DEFAULT_STREAM_PATTERN .matcher (streamName );
@@ -327,16 +345,21 @@ private void setTraceIdAttributes(AttributesBuilder builder) {
327345 }
328346 }
329347
348+ // Specify common attributes for all metrics.
349+ // For example, table name and writer id.
350+ // Metrics dashboards can be filtered on available attributes.
330351 private Attributes buildOpenTelemetryAttributes () {
331352 AttributesBuilder builder = Attributes .builder ();
332353 String tableName = getTableName ();
333354 if (!tableName .isEmpty ()) {
334355 builder .put (telemetryKeyTableId , tableName );
335356 }
357+ builder .put (telemetryKeyWriterId , writerId );
336358 setTraceIdAttributes (builder );
337359 return builder .build ();
338360 }
339361
362+ // Refresh the table name attribute when multiplexing switches between tables.
340363 private void refreshOpenTelemetryTableNameAttributes () {
341364 String tableName = getTableName ();
342365 if (!tableName .isEmpty ()
@@ -347,6 +370,22 @@ private void refreshOpenTelemetryTableNameAttributes() {
347370 }
348371 }
349372
373+ // Build new attributes augmented with an error code string.
374+ private Attributes augmentAttributesWithErrorCode (Attributes attributes , String errorCode ) {
375+ AttributesBuilder builder = attributes .toBuilder ();
376+ if ((errorCode != null ) && !errorCode .isEmpty ()) {
377+ builder .put (telemetryKeyErrorCode , errorCode );
378+ }
379+ return builder .build ();
380+ }
381+
382+ // Build new attributes augmented with a flag indicating this was a retry.
383+ private Attributes augmentAttributesWithRetry (Attributes attributes ) {
384+ AttributesBuilder builder = attributes .toBuilder ();
385+ builder .put (telemetryKeyIsRetry , "1" );
386+ return builder .build ();
387+ }
388+
350389 @ VisibleForTesting
351390 Attributes getTelemetryAttributes () {
352391 return telemetryAttributes ;
@@ -360,20 +399,72 @@ private void registerOpenTelemetryMetrics() {
360399 .setInstrumentationVersion (
361400 ConnectionWorker .class .getPackage ().getImplementationVersion ())
362401 .build ();
363- instrumentIncomingRequestCount =
402+ telemetryMetrics .instrumentAckedRequestCount =
403+ writeMeter
404+ .counterBuilder ("append_requests_acked" )
405+ .setDescription ("Counts number of requests acked by the server" )
406+ .build ();
407+ telemetryMetrics .instrumentAckedRequestSize =
408+ writeMeter
409+ .counterBuilder ("append_request_bytes_acked" )
410+ .setDescription ("Counts byte size of requests acked by the server" )
411+ .build ();
412+ telemetryMetrics .instrumentAckedRequestRows =
413+ writeMeter
414+ .counterBuilder ("append_rows_acked" )
415+ .setDescription ("Counts number of request rows acked by the server" )
416+ .build ();
417+ writeMeter
418+ .gaugeBuilder ("active_connection_count" )
419+ .ofLongs ()
420+ .setDescription ("Reports number of active connections" )
421+ .buildWithCallback (
422+ measurement -> {
423+ int count = 0 ;
424+ this .lock .lock ();
425+ try {
426+ if (streamConnectionIsConnected ) {
427+ count = 1 ;
428+ }
429+ } finally {
430+ this .lock .unlock ();
431+ }
432+ measurement .record (count , getTelemetryAttributes ());
433+ });
434+ writeMeter
435+ .gaugeBuilder ("inflight_queue_length" )
436+ .ofLongs ()
437+ .setDescription (
438+ "Reports length of inflight queue. This queue contains sent append requests waiting for response from the server." )
439+ .buildWithCallback (
440+ measurement -> {
441+ int length = 0 ;
442+ this .lock .lock ();
443+ try {
444+ length = inflightRequestQueue .size ();
445+ } finally {
446+ this .lock .unlock ();
447+ }
448+ measurement .record (length , getTelemetryAttributes ());
449+ });
450+ telemetryMetrics .instrumentNetworkResponseLatency =
364451 writeMeter
365- .counterBuilder ("append_requests" )
366- .setDescription ("Counts number of incoming requests" )
452+ .histogramBuilder ("network_response_latency" )
453+ .ofLongs ()
454+ .setDescription (
455+ "Reports time taken in milliseconds for a response to arrive once a message has been sent over the network." )
456+ .setExplicitBucketBoundariesAdvice (METRICS_MILLISECONDS_LATENCY_BUCKETS )
367457 .build ();
368- instrumentIncomingRequestSize =
458+ telemetryMetrics . instrumentConnectionStartCount =
369459 writeMeter
370- .counterBuilder ("append_request_bytes" )
371- .setDescription ("Counts byte size of incoming requests" )
460+ .counterBuilder ("connection_start_count" )
461+ .setDescription (
462+ "Counts number of connection attempts made, regardless of whether these are initial or retry." )
372463 .build ();
373- instrumentIncomingRequestRows =
464+ telemetryMetrics . instrumentConnectionEndCount =
374465 writeMeter
375- .counterBuilder ("append_rows " )
376- .setDescription ("Counts number of incoming request rows " )
466+ .counterBuilder ("connection_end_count " )
467+ .setDescription ("Counts number of connection end events. " )
377468 .build ();
378469 }
379470
@@ -465,6 +556,7 @@ public void run() {
465556
466557 private void resetConnection () {
467558 log .info ("Start connecting stream: " + streamName + " id: " + writerId );
559+ telemetryMetrics .instrumentConnectionStartCount .add (1 , getTelemetryAttributes ());
468560 if (this .streamConnection != null ) {
469561 // It's safe to directly close the previous connection as the in flight messages
470562 // will be picked up by the next connection.
@@ -618,9 +710,6 @@ private ApiFuture<AppendRowsResponse> appendInternal(
618710 + requestWrapper .messageSize )));
619711 return requestWrapper .appendResult ;
620712 }
621- instrumentIncomingRequestCount .add (1 , getTelemetryAttributes ());
622- instrumentIncomingRequestSize .add (requestWrapper .messageSize , getTelemetryAttributes ());
623- instrumentIncomingRequestRows .add (message .getProtoRows ().getRows ().getSerializedRowsCount ());
624713 this .lock .lock ();
625714 try {
626715 if (userClosed ) {
@@ -1214,6 +1303,13 @@ private void requestCallback(AppendRowsResponse response) {
12141303 connectionRetryStartTime = 0 ;
12151304 }
12161305 if (!this .inflightRequestQueue .isEmpty ()) {
1306+ Instant sendInstant = inflightRequestQueue .getFirst ().requestSendTimeStamp ;
1307+ if (sendInstant != null ) {
1308+ Duration durationLatency = Duration .between (sendInstant , Instant .now ());
1309+ telemetryMetrics .instrumentNetworkResponseLatency .record (
1310+ durationLatency .toMillis (), getTelemetryAttributes ());
1311+ }
1312+
12171313 requestWrapper = pollFirstInflightRequestQueue ();
12181314 requestProfilerHook .endOperation (
12191315 RequestProfiler .OperationName .RESPONSE_LATENCY , requestWrapper .requestUniqueId );
@@ -1234,6 +1330,22 @@ private void requestCallback(AppendRowsResponse response) {
12341330 this .lock .unlock ();
12351331 }
12361332
1333+ Attributes augmentedTelemetryAttributes =
1334+ augmentAttributesWithErrorCode (
1335+ getTelemetryAttributes (),
1336+ Code .values ()[
1337+ response .hasError () ? response .getError ().getCode () : Status .Code .OK .ordinal ()]
1338+ .toString ());
1339+ if (requestWrapper .retryCount > 0 ) {
1340+ augmentedTelemetryAttributes = augmentAttributesWithRetry (augmentedTelemetryAttributes );
1341+ }
1342+ telemetryMetrics .instrumentAckedRequestCount .add (1 , augmentedTelemetryAttributes );
1343+ telemetryMetrics .instrumentAckedRequestSize .add (
1344+ requestWrapper .messageSize , augmentedTelemetryAttributes );
1345+ telemetryMetrics .instrumentAckedRequestRows .add (
1346+ requestWrapper .message .getProtoRows ().getRows ().getSerializedRowsCount (),
1347+ augmentedTelemetryAttributes );
1348+
12371349 // Retries need to happen on the same thread as queue locking may occur
12381350 if (response .hasError ()) {
12391351 if (retryOnRetryableError (Code .values ()[response .getError ().getCode ()], requestWrapper )) {
@@ -1316,6 +1428,11 @@ private void doneCallback(Throwable finalStatus) {
13161428 this .lock .lock ();
13171429 try {
13181430 this .streamConnectionIsConnected = false ;
1431+ this .telemetryMetrics .instrumentConnectionEndCount .add (
1432+ 1 ,
1433+ augmentAttributesWithErrorCode (
1434+ getTelemetryAttributes (),
1435+ Code .values ()[Status .fromThrowable (finalStatus ).getCode ().ordinal ()].toString ()));
13191436 if (connectionFinalStatus == null ) {
13201437 if (connectionRetryStartTime == 0 ) {
13211438 connectionRetryStartTime = System .currentTimeMillis ();
@@ -1327,6 +1444,8 @@ private void doneCallback(Throwable finalStatus) {
13271444 || System .currentTimeMillis () - connectionRetryStartTime
13281445 <= maxRetryDuration .toMillis ())) {
13291446 this .conectionRetryCountWithoutCallback ++;
1447+ this .telemetryMetrics .instrumentConnectionStartCount .add (
1448+ 1 , augmentAttributesWithRetry (getTelemetryAttributes ()));
13301449 log .info (
13311450 "Connection is going to be reestablished with the next request. Retriable error "
13321451 + finalStatus .toString ()
0 commit comments