matttproud.com (blog)

Flowchart for Choosing gRPC Methods Types

Zürich, Schweiz

Scenario

Let’s suppose your boss tasks you with the problem of building a data upload service for blobs. The only requirement is the system has to be built on gRPC. How do you go about doing this, and, in the end, does your solution look more like no. 1 or no. 2 below?

  1. Prototypical Unary RPC:

     1  2  3  4  5  6  7  8  9 10 
    message UploadRequest {  string file_name = 1;  bytes content = 2; }  message UploadResponse { ... }  service UploadService {  rpc Upload(UploadRequest) returns (UploadResponse) {} } 
  2. Prototypical Streaming RPC

     1  2  3  4  5  6  7  8  9 10 
    message UploadFragment {  string file_name = 1;  bytes content = 2; }  message UploadResponse { ... }  service UploadService {  rpc Upload(stream UploadFragment) returns (UploadResponse) {} } 

These are two approaches to building the solution. But they have different costs to build, maintain, and reason about, and each has different scenarios in which one form would be more suitable than the other.

These are tradeoffs I want to explore with you.

Opening

Remote Procedure Call (RPC) infrastructures are a godsend in the space of distributed systems. One of the infrastructures in that ecosystem that I value for its versatility and extensibility is gRPC. This appeal has several ingredients:

Challenge: Choosing Which Method Structure

But, in spite of those things I like, a challenge lies in that last bullet point:

The reality is that there are not two but rather four main structural choices, which per the gRPC documentation are:

How do you know which one to choose? My observation is that the choice is not really all that clear, especially if you are a beginner in the ecosystem. To make matters worse, the choice marries a combination of considerations from domain-specific requirements to local practical matters.

One of the realities I’ve seen in my career is a preferential bias from less experienced engineers toward modeling a lot of operations as streaming RPC. I am not sure exactly what motivates it. I think curiosity and novelty and a desire to master something complex are certainly ingredients.

For me, I have nothing against streaming RPC, but it tends to be significantly more complicated to implement correctly both client- and server-side. And that’s not including the impact of adjacent ecosystem components when being used in a streaming system (e.g., implementing an interceptor correctly for it).

Summary: I tend to think most things should be modeled with unary methods unless there is a strong, compelling reason not to do so.

This is informed by a set of values that I prioritize (in order):

  1. simplicity: choose the least complex solution for the job and its requirements

    All of my career experience with maintaining and extending legacy systems essentially leads me to some of the same conclusions of Google’s Go Style Guide reaches around simplicity and least mechanism.

    Use the most clear and least magic solution for the problem that you can get away with.

  2. correctness: don’t sacrifice it

    This is a fundamental invariant of design. What I will note is that there is seldom one correct solution to a problem. There can be multiple, and there are tradeoffs.

Enormity of the Decision Space

As a thought exercise, I attempted to make a flow chart of the high-level considerations I would apply when making a decision:

gRPC Method Type Decision Flowchart
gRPC Method Type Decision Flowchart

As you can see, it’s a bit of a monster. How do we make sense of this?

Tip: You can click on the image to see a larger version.

The edges are colored and sized differently according to the vertical and horizontal position of their origin.

As I undertook this exercise, I did not want to approach this problem exhaustively but rather comprehensively. Namely, how would I go about giving advice to myself as a junior engineer with everything I have learned today?

I tried to make each node in the graph self-standing in terms of the text; but if something doesn’t make sense, you should consult the flowchart explanation in the appendix.

Closing

I set off writing this article, thinking it was going to be a short exercise. It turned out I was wrong. What I wanted to write was a message in a bottle back to my past self. It’s not that I made a mistake with system design, but rather I’d have found the questions posed instructive.

As you can see, there are a lot of considerations that flow into this. My goal was never to be exhaustive in covering all of them but rather to give you a smattering of what I would consider myself. There are plenty of more advanced topics to consider that I did not have the energy to write about, so here is a taster:

If you’re looking for something more official and canonical, my colleague Eric Anderson from the gRPC team gave a talk about this at the 2018 CloudNativeCon entitled Using gRPC for Long-lived and Streaming RPCs:

There is admittedly a bit of a bias in my examination above: use the simplest solution that will deliver your goals, which most of the time is unary RPC methods. I hope that is something that Eric would agree with; he knows far more than I!

Aside: As I was labeling the diagram, I mostly did so in breadth-first order; whereas when I wrote out the prose above I did so in a more depth-first manner. Woops. I’ve spent so much time futzing with the labeling of content, identifiers, and such that I need a break.

I hope this exercise was as useful to you as it was to me.

Appendix: Walking the Flowchart

The part is optional to read. It provides an extended explanation of what you are seeing in the chart itself. Without further ado, let’s walk through the chart.

Step: Start Designing the RPC Service (Node 1)

As long as we are are considering modeling, we are kind of in this activity of designing. Let’s for the sake of argument consider one RPC method we want to design and work through it iteratively. You’ll work through this chart essentially for each RPC method you design.

Tip: If that sounds taxing, this may be a good reason to settle on certain common service API verbs like CRUDL with a common contract.

Question: Shortly-Lived Requests (Node 2)

The question to ask here is whether the wall time for receiving, processing, and replying to the request is short. What does short mean? That’s left as an exercise to the reader:

My gut tells me that 30 seconds is a good default upper limit for shortly-lived request. It is already a long time, but you can see that complexity ratchets up if it is more longly-lived (think: durability in case of interruption).

The other question to ask is not whether the operation could take 30 seconds, but to consider typical successful operation durations. I’d consider: median duration and 95th percentile duration. These metrics help you figure out how common a duration is.

If you don’t know what the expected duration is, consider modeling the operation in an isolated function in a program instead of as a RPC method and seeing how it performs.

Try something like this using Go’s benchmark framework:

 1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 
// Environment contains everything to benchmark the system interaction you want // to observe. type Environment struct { /* ... */ }  // newEnvironment creates everything the environment needs to be benchmarked. // It amortizes setup cost and cleans up anything that needs to be. func newEnvironment(b *testing.B) (*Environment) {  b.Helper()  // Set up dependencies, create environment, etc.  b.Cleanup(func() { /* cleanup */ })  return env }  // Exercise runs one iteration of the benchmark process. It should strive to // do no setup insofar as possible. func (env *Environment) Exercise(b *testing.B) {  b.Helper()  // Exercise the actual system here: perform some SQL, do some POSIX I/O, etc. }  // BenchmarkSystem is run by the Go testing harness. func BenchmarkSystem(b *testing.B) {  env := newEnvironment(b)  for b.Loop() {  env.Exercise(b)  } } 

Step: Use Unary RPC Method (Node 3)

Here the case is rather easy: just build your unary RPC method per the gRPC instructions. You might end up with something like this prototypical echo service in your Protocol Buffer IDL definition.

 1  2  3  4  5  6  7  8  9 10 11 
message EchoRequest {  string message = 1; }  message EchoResponse {  string message = 1; }  service EchoService {  rpc Echo(EchoRequest) returns (EchoResponse) {} } 

All that remains is creating the server implementation:

 1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 
package echoserver  import (  "context"   echopb "my/echo_service_proto" )  type Server struct{}  func (Server) Echo(_ context.Context, req *echopb.EchoRequest) (*echopb.EchoResponse, error) {  // Demonstrated using https://go.dev/blog/protobuf-opaque.  resp := echopb.EchoResponse_builder{  Message: proto.String(req.GetMessage()),  }.Build()  return resp, nil } 

Question: Do Other Clients want to Watch (Node 4)

Technically this question can apply with shortly-lived request (see previous question), but that is rare. The main idea behind it is this:

  1. a client sends a request to your server to perform some operation that will take a long time

  2. other clients or parties (or potentially the original client at a point later) wants to observe the state of the operation after it has begun

You can typically model operations that other parties can watch by there being some sort of an ID that the client provides to the server to name the operation or that ID being something the server gives back to the client. Either way, there is another service API mechanism to look up the status of the operation based on that ID.

Some of the later steps demonstrate a few patters for doing this.

Question: Are Trailing Operations Opaque (Node 5)

This question flows from the previous about other watchers. The way I would think about this question is whether the lingering operation is expected to be deeply modified at any point later. If an ongoing operation is opaque, the most you’ll know about it is whether it is still running, not really anything deeply about its progress or its make-up. A good analogy for this is a simple tracking number for a piece of mail; that is a relatively opaque way of tracking progress.

Aside: In truth, what you can do with a simple piece of mail and a tracking number can vary significantly from locality to locality. In Switzerland, I can do a lot (e.g., request the post to hold the mail or deliver it to another address). Back in the United States with USPS, relatively little.

If an operation is not opaque, perhaps you have the ability to modify it later (think: mutating an ongoing workflow). A good analogy for a such an operation is hiring a moving company to move your possessions to a new home. If the company is of repute, they’ll give you a tracking number for your move, and you can contact the company with that number to amend the delivery of your possessions to the new home (e.g., the contractors who are tearing out the old carpet are going to take two extra weeks, and the flooring can’t be covered in boxes).

Step: Use Long Running Operations (Node 12)

A Long-Running Operation (LRO) is a special form of a unary RPC. The principle behind it is that you have an ordinary unary RPC method, and it returns a google.longrunning.Operation message:

 1  2  3  4  5  6  7  8  9 10 
import "google/longrunning/operations.proto";  message CompactionRequest {  string database_name = 1;  string table_name = 2; }  service DatabaseService {  rpc Compact(CompactionRequest) returns (google.longrunning.Operation) {} } 

The LRO pattern is common in intent-oriented management planes.

I highly recommend any user of LRO consult the embedded documentation in the IDL source code. Additional semantics for LRO are defined in the API Improvement Proposals (AIP), particularly AIP-151.

The google.longrunning.Operations RPC service interface is also something your server should implement and expose for management and interrogation of LRO status. Its API is rather crude, so it doesn’t really give you an ability to deeply inspect an operation or mutate one (outside of requesting cancellation).

In the case of data compaction in the example above, an LRO seems reasonable: it’s kind of a one-shot operation. It takes a long time to run (potentially). You tend to ignore it once you request it. You might cancel it if need be, but you seldom have the need to modify an outstanding compaction operation. Moreover, one would expect that a proper data storage system would handle compaction idempotently and transactionally should two compaction operations happen in rapid succession or race each other.

Step: Use Job API Convention (Node 13)

There is a more advanced form of LRO management where the system designer creates a custom service definition that conforms to the AIP-152 Job specification. You might also find this pattern in intent-oriented management planes.

We can adapt the example for Node 12 above using RunCompactionJob as a resource around which a standard CRUDL are built, including an AIP-152-prescribed Run method. This CRUDL contract enables RunCompactionJob to be modifiable as opposed to the one-shot approach above. This has a lot of power if the operation might need reconsideration or be run with a separate authority:

 1  2  3  4  5  6  7  8  9  10  11  12  13  14  15  16  17  18  19  20  21  22  23  24  25  26  27  28  29  30  31  32  33  34  35  36  37  38  39  40  41  42  43  44  45  46  47  48  49  50  51  52  53  54  55  56  57  58  59  60  61  62  63  64  65  66  67  68  69  70  71  72  73  74  75  76  77  78  79  80  81  82  83  84  85  86  87  88  89  90  91  92  93  94  95  96  97  98  99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 
import "google/api/client.proto"; import "google/api/field_behavior.proto"; import "google/api/http.proto"; import "google/api/resource.proto"; import "google/longrunning/operations.proto"; import "google/protobuf/duration.proto"; import "google/protobuf/empty.proto"; import "google/protobuf/field_mask.proto" import "google/protobuf/timestamp.proto";  package com.matttproud.dbsaas;  message RunCompactionJob {  // Unlike CompactionRequest above, the resource name for the compaction job  // includes the database and table name (it's hierarchical).  option (google.api.resource) = { // See AIP-122.  type: "dbsaas.matttproud.com/RunCompactionJob" // See AIP-123.  pattern: "db/{db}/table/{table}/runCompactionJobs/{run_compaction_job}"  };  string name = 1 [(google.api.field_behavior) = IDENTIFIER]; // See AIP-122.  RowRange row_range = 2;  string etag = 3; // See AIP-154.  State state = 4 [(google.api.field_behavior) = OUTPUT_ONLY]; // See AIP-203. }  message RowRange {  message OpenClosedInterval {  string start = 1;  string end = 2;  }  message OpenInterval {  string start = 1;  }  message ClosedInterval {  string end = 2;  }  message AllRows {} // Poor man's enum for a oneof.   oneof range { // See AIP-146.  OpenClosedInterval start_and_end = 1;  OpenInterval start = 2;  ClosedInterval end = 3;  AllRows all_rows = 4;  } }  enum State {  STATE_UNSPECIFIED = 0; // See AIP-126.  STATE_PENDING = 1;  STATE_RUNNING = 2;  STATE_ERROR = 3;  STATE_COMPLETE = 4; }  service RunCompactionJobService {  rpc RunCompactionJob(RunCompactionJobRequest) // See AIP-152.  returns (google.longrunning.Operation) {  option (google.api.http) = {  post: "/v1/{name=db/*/table/*/runCompactionJobs/*}:run"  body: "*"  };  option (google.longrunning.operation_info) = { // See AIP-151.  response_type: "RunCompactionJobResponse"  metadata_type: "RunCompactionJobMetadata"  };  }  rpc GetRunCompactionJob(GetRunCompactionJobRequest) returns (RunCompactionJob) { // See AIP-131.  option (google.api.http) = {  get: "/v1/{name=db/*/table/*/runCompactionJobs/*}"  };  option (google.api.method_signature) = "name";  }  rpc ListRunCompactionJobs(ListRunCompactionJobsRequest) returns (ListRunCompactionJobsResponse) { // See AIP-132.  option (google.api.http) = {  get: "/v1/{parent=db/*/table/*}/runCompactionJobs"  };  option (google.api.method_signature) = "parent";  }  rpc CreateRunCompactionJob(CreateRunCompactionJobRequest) returns (RunCompactionJob) { // See AIP-133.  option (google.api.http) = {  post: "/v1/{parent=db/*/table/*}/runCompactionJobs"  body: "runCompactionJob"  };  option (google.api.method_signature) = "parent,run_compaction_job";  }  rpc UpdateRunCompactionJob(UpdateRunCompactionJobRequest) returns (RunCompactionJob) { // See AIP-134.  option (google.api.http) = {  patch: "/v1/{name=db/*/table/*/runCompactionJobs/*}"  body: "runCompactionJob"  };  option (google.api.method_signature) = "run_compaction_job,update_mask";  }  rpc DeleteRunCompactionJob(DeleteRunCompactionJobRequest) returns (google.protobuf.Empty) { // AIP-135.  option (google.api.http) = {  delete: "/v1/{name=db/*/table/*/runCompactionJobs/*}"  };  option (google.api.method_signature) = "name";  } }  message RunCompactionJobRequest { // See AIP-152.  string name = 1 [  (google.api.field_behavior) = REQUIRED,  (google.api.resource_reference) = {  type: "dbsaas.matttproud.com/RunCompactionJob" // See AIP-123.  }]; }  message RunCompactionJobResponse { // See AIP-151.  google.protobuf.Timestamp finish_time = 1;  google.protobuf.Duration wall_time = 2;  int32 row_count_before = 3;  int32 row_count_after = 4; }  message RunCompactionJobMetadata { // See AIP-151.  google.protobuf.Timestamp start_time = 1;  int32 row_count_visited = 2;  int32 row_count_compacted = 3; }  message GetRunCompactionJobRequest { // See AIP-131.  string name = 1 [  (google.api.field_behavior) = REQUIRED,  (google.api.resource_reference) = {  type: "dbsaas.matttproud.com/RunCompactionJob" // See AIP-123.  }]; }  message ListRunCompactionJobsRequest { // See AIP-132.  string parent = 1 [  (google.api.field_behavior) = REQUIRED,  (google.api.resource_reference) = {  child_type: "dbsaas.matttproud.com/RunCompactionJob" // See AIP-123.  }];  int32 page_size = 2;  string page_token = 3; }  message ListRunCompactionJobsResponse { // See AIP-132.  repeated RunCompactionJob run_compaction_jobs = 1;  string next_page_token = 2; }  message CreateRunCompactionJobRequest { // See AIP-133.  string parent = 1 [  (google.api.field_behavior) = REQUIRED,  (google.api.resource_reference) = {  child_type: "dbsaas.matttproud.com/RunCompactionJob" // See AIP-123.  }];  string run_compaction_job_id = 2 [(google.api.field_behavior) = REQUIRED];  RunCompactionJob run_compaction_job = 3 [(google.api.field_behavior) = REQUIRED]; }  message UpdateRunCompactionJobRequest { // See AIP-134.  RunCompactionJob run_compaction_job = 1 [(google.api.field_behavior) = REQUIRED];  google.protobuf.FieldMask update_mask = 2; // See AIP-161. }  message DeleteRunCompactionJobRequest { // See AIP-135.  string name = 1 [  (google.api.field_behavior) = REQUIRED,  (google.api.resource_reference) = {  type: "dbsaas.matttproud.com/RunCompactionJob" // See AIP-123.  }]; } 

Aside: I’ve gone to rather significant lengths to attempt to model the Job API sketch in as AIP-correct of a way as possible.

If you are not terribly familiar with gRPC or Google’s APIs for its Cloud products, you might come away thinking: this is a lot of ceremony and complexity, and you might be right — to some extent. But let’s consider the what AIP gives us:

  1. Well-known conventions and structures for both API producer and consumer alike.

  2. Producer and consumer do not need to consider and design many vagaries for common cases.

One critique I’ll levy against AIP is that I think some of the official examples could be improved (e.g., focus on the most minimally-viable correct exemplar and then show some advanced models). Far too many of the base case examples in the documentation are demonstrated with nested sub-resources in mind, an advanced design topic, which adds extra noise for the reader.

It’s left as an exercise to the implementer whether the RunCompactionJob is mutable after the RPC service /RunCompactionJobService.RunCompactionJob has been called. Supporting such mutation certainly could be use case for the Job API.

Question: Does the Client Send a Lot of Data (Node 6)

The crux of this question orients itself around how much data the client needs to send to accomplish what it wants. There’s not a one-size-fits all approach, as much of the questions are answered in application- and technology stack-specific contexts.

Let’s think about the basic scenario of a RPC service that offers a method file file uploads:

 1  2  3  4  5  6  7  8  9 10 11 
message UploadRequest {  string file_name = 1;  bytes content = 2;  string quota_id = 3; }  message UploadResponse { ... }  service UploadService {  rpc Upload(UploadRequest) returns (UploadResponse) {} } 

If /UploadService.Upload is called exclusively on small text files like FILE_ID.DIZ, it’s easy to imagine this model working reasonably well.

Note: I just picked that file designation out of a hat since I expect the file definition to be commonly understood and also understood to be small.

But suppose /UploadService.Upload is to be general purpose and potentially handle large file sent to cold storage. In the current model, effectively the entire file needs to be sent by the client to the server before the server can even start processing it (this is how unary RPC methods work in gRPC). With large files, this means both client and server need to contend with buffering issues (kernel and hardware) and avoid memory or disk space exhaustion in user space code. There may even be network protocol-specific considerations, too.

So one could make the argument that large amounts of data need to be handled in a piecemeal way. Some of the solutions below explore this further.

Note: A well-formed API contract found in the wild for the scenario described above is the google.bytestream.ByteStream service.

Question: Does the Server Need to Periodically Communicate with Client (Node 7)

This question orients itself around application- or protocol-level needs for the server to send periodic communications with the client in the process of the client sending a large amount of data to the server.

Let’s consider a scenario like this: the server reports back available quota budget to the client as data is sent to it. Node 14 provides an example of this.

Step: Use Bidirectional Streaming (Node 14)

In this case, you’ll be building a service definition using bidirectional (BiDi) streaming. Both client and server send fragments to each other over the course of a long-running RPC session.

We’ll build an example for Node 13 in the Protocol Buffer IDL specification below:

 1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 
message UploadFragment {  string file_name = 1;  int32 chunk_id = 2; // Must be monotonically increasing for a given file name.  bytes content = 3;  string quota_id = 4;  int32 estimated_remaining_file_size_bytes = 5;  int32 estimated_remaining_files_count = 6; } message UploadStatus {  int32 remaining_quota_bytes = 1; }  service UploadService {  rpc Upload(stream UploadFragment) returns (stream UploadStatus) {} } 

Based on the user’s available quota (known by server and book-kept under UploadFragment.quota_id) weighed against the remaining file size (known by client under UploadFragment.estimated_remaining_file_size_bytes) and the number of remaining files (known by client under UploadFragment.estimated_remaining_files_count), the server will periodically report to the client quota status so that the upload session can gracefully upload the most important files and terminate the upload before quota is exhausted. This semantic might seem strange, but there could be multiple operations happening against a given quota ID in the production system at any given time.

Question: Can the Client Easily Chunk Work (Node 15)

By chunking work, we mean to understand the feasibility of the client breaking what it wants to send into batches to send to the server piecemeal. Here are some considerations:

Several things speak to batching:

As do several things against it:

Some examples:

Question: Does each request entail high fixed costs server-side (Nodes 18 and 20)

There are two-types of costs:

They are not independent of each other in a situation but rather can coexist in the context of a system: §c = c_f + c_v \cdot u§. While each is worth considering on their own, we want to consider the fixed part here. There are situations where §c_f§ crowds out the effects of §c_v§ for many plausible sizes of unit of work §u§: §c_f > c_v \cdot u§.

Let’s consider one of the unary upload solutions from Node 6, except that it saves data to slower, cold storage. It could look something like this:

 1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 
type Severity int  const (  severityUnknown Severity = iota  SeverityDefault  SeverityBestEffort  SeverityUrgent )  type PagerService interface {  Page(ctx context.Context, eventID int, msg string, severity Severity) error  AwaitResolution(ctx context.Context, eventID) error }  type TapeBackupUploader struct {  OpsNotifier PagerService // Ruin the lives of the poor saps.  Device string // Example: /dev/st0 }  func pageAndAwait(ctx context.Context, pager PagerService, eventID int, op, msg string, s Severity) error {  if err := ps.Page(ctx, id, msg, SeverityUrgent); err != nil {  return fmt.Errorf("pre-%v: %v", op, err)  }  if err := ps.AwaitResolution(ctx, id); err != nil {  // TODO: Remember who was oncall and punish.  return fmt.Errorf("%v: %v", op, err)  } }  func (u *TapeBackupUploader) Upload(ctx context.Context, req *uploadpb.UploadRequest) (*uploadpb.UploadResponse, error) {  { // Phase 1: Acquire tape.  id := newEvent(ctx, req)  msg := fmt.Sprintf("Load tape into %q, you lazy slag.", u.Device)  if err := pageAndWait(ctx, u.OpsNotifier, id, "provisioning tape", msg, SeverityUrgent); err != nil {  return nil, err  }  }  { // Phase 2: Write data.  if err := writeData(ctx, u.Device, req); err != nil {  return nil, fmt.Errorf("writing data to tape: %v", err)  }  }  { // Phase 3: Return tape.  id := newEvent(ctx, req)  msg := fmt.Sprintf("Return tape from %q to the shelf, you lazy slag.", u.Device)  if err := pageAndWait(ctx, u.OpsNotifier, id, "returning tape", msg, SeverityUrgent); err != nil {  return nil, err  }  } } 

Looking at the code above, can you guess which phases probably take the most time? writeData certainly won’t be fast, but we’re having to wait on a human to do some rather menial tasks in Phases 1 and 3. So in such a situation we certainly could have a large §c_f§. Obviously there are other cases where the fixed cost of setting up the operation are large, too, and these don’t involve human operator intervention.

Question: Does the client or server depend on complex interception? (Nodes 21 and 22)

gRPC offers a powerful feature for creation of various forms of behavior-altering middleware through something called interception APIs. I really encourage you to give the link a look if you are unfamiliar with this concept. Interception can power a whole bunch of things:

  1. Metrics and Telemetry
  2. Authorization, Authentication, and Auditing (AAA)
  3. Tracing
  4. Caching
  5. Load Reporting and Shedding

Moreover, interception is built through user-implemented interceptors. Interceptors can be run either client- or server-side. And most interestingly, interceptors can be chained.

If you are familiar with Go’s common http.Handler API, it can be used through the process of wrapping and even currying to form behavioral chains server-side. Consider an example that provides elementary timing telemetry to a request handler:

1 2 3 4 5 6 7 8 9 
func instrument(m *expvar.Int, h http.Handler) http.Handler {  return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {  defer func(start time.Time) {  elapsed := time.Since(start)  m.Add(int(elapsed))  }(time.Now())  h(w, r)  }) } 

What I described above for Go’s net/http is very simple compared to gRPC interception. Recall the dimensions I mentioned above; there’s another one: unary or streaming! So suddenly this situation is a lot more complex

It’s not terribly uncommon for larger organizations to use interception to achieve middleware goals, and often the behaviors are extremely load-bearing.

Let’s look at the prototypical forms of the interception API.

Unary client-side implements grpc.UnaryClientInterceptor, which appears as this:

1 
type UnaryClientInterceptor func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error 

Here’s an example:

1 2 3 4 5 6 7 8 9 
func interceptor(m *expvar.Int) grpc.UnaryClientInterceptor {  return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {  defer func(start time.Time) {  elapsed := time.Since(start)  m.Add(int(elapsed))  }(time.Now())  return invoker(ctx, method, req, reply, cc, opts...)  } } 

There are three phases:

  1. pre-processing: interception provides this logic, if any
  2. running the underlying call: the interceptor arranges this
  3. post-processing: interception provides this, if any

Now let’s look at the streaming client interceptor signature grpc.StreamClientInterceptor:

1 
type StreamClientInterceptor func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) 

Here’s an implementation:

 1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 
type wrapped struct {  send, recv *expvar.Int  grpc.ClientStream }  func (w *wrapped) RecvMsg(m any) error {  defer func(start time.Time) {  elapsed := time.Since(start)  recv.Add(int(elapsed))  }(time.Now())  return w.ClientStream.RecvMsg(m) }  func (w *wrapped) SendMsg(m any) error {  defer func(start time.Time) {  elapsed := time.Since(start)  send.Add(int(elapsed))  }(time.Now())  return w.ClientStream.SendMsg(m) }  func interceptor(send, recv *expvar.Int) grpc.StreamClientInterceptor {  return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {  // There could be additional business logic in here, too.  s, err := streamer(ctx, desc, cc, method, opts...)  if err != nil {  return nil, err  }  return &wrapped{send, recv, s}, nil  } } 

grpc.StreamClientInterceptor is a lot more complex than the unary in terms of phases:

  1. pre-processing: interception provides this logic, if any
  2. wrap the streamer, if at all
    1. sending, if at all
    2. receiving, if at all
  3. running the underlying stream: the interceptor arranges this
  4. for each send and receive of stream messages:
    1. running the send (or its wrapper) if at all
    2. running the receive (or its wrapper) if at all

Let’s suppose we are interested in reporting telemetry about operation successes and failures. With unary, this is relatively straightforward: the call succeeds or fails. There is a 1:1 mapping between the call and an outcome; increment the respective counter. But with streaming we suddenly have to consider multiplicity of sends and receives. The streaming call may have §n§ sends and §m§ receives. How do you report that in terms of telemetry:

Telemetry might sound like an easy problem to solve, but it’s only the tip of the iceberg. To understand why, let’s look at server-side interception.

Server-side interception is achieved with unary methods using the grpc.UnaryServerInterceptor API:

1 
type UnaryServerInterceptor func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) 

An example implementation:

1 2 3 4 5 6 7 8 9 
func interceptor(m *expvar.Int) grpc.UnaryServerInterceptor {  return func (ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {  defer func(start time.Time) {  elapsed := time.Since(start)  m.Add(int(elapsed))  }(time.Now())  return handler(ctx, req)  } } 

And here again is what server-side streaming looks like with grpc.StreamServerInterceptor:

1 
type StreamServerInterceptor func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error 

And an implementation:

 1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 
type wrapped struct {  send, recv *expvar.Int  grpc.ServerStream }  func (w *wrapped) RecvMsg(m any) error {  defer func(start time.Time) {  elapsed := time.Since(start)  recv.Add(int(elapsed))  }(time.Now())  return w.ServerStream.RecvMsg(m) }  func (w *wrapped) SendMsg(m any) error {  defer func(start time.Time) {  elapsed := time.Since(start)  send.Add(int(elapsed))  }(time.Now())  return w.ServerStream.SendMsg(m) }  func intercept(send, recv *expvar.Int) grpc.StreamServerInterceptor {  return func streamInterceptor(srv any, ss grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error {  // There could be additional business logic in here, too.  return handler(srv, &wrapped{send, recv, ss})  } } 

I alluded to there being a tip of the iceberg with complexity and that it could lie server-side. The truth is that it’s not that server-side is more complex; its that the middleware that’s often used there is even more complex.

Let’s consider a likely middleware to be universally used in production-grade systems: query cost reporting. How you estimate query cost for unary versus streaming operations is very different. Take a look at the specified information that is supposed to be returned for cost estimation: Open Request Cost Aggregation (ORCA):

 1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 
// n.b. --- I've excised fields I don't care about.  message OrcaLoadReport {  // Application specific requests costs. Each value is an absolute cost (e.g. 3487 bytes of  // storage) associated with the request.  map<string, double> request_cost = 4;   // Resource utilization values. Each value is expressed as a fraction of total resources  // available, derived from the latest sample or measurement.  map<string, double> utilization = 5;   // Application specific opaque metrics.  map<string, double> named_metrics = 8;   // Application specific utilization expressed as a fraction of available  // resources. For example, an application may report the max of CPU and memory  // utilization for better load balancing if it is both CPU and memory bound.  // This should be derived from the latest sample or measurement.  // The value may be larger than 1.0 when the usage exceeds the reporter  // dependent notion of soft limits.  double application_utilization = 9; } 

That looks somewhat easy to compute unary-wise, but streaming? Get out of here. You have to sit down and think about application-level utilization semantics. Yes, that’s doable, but will your guess of how to do it work with other services served by you binary? I can’t say, but this sounds precarious when building a system with multiple engineers.

And this leads to the central observation of this question: your systems may rely on powerful interceptors (even unbeknownst to you). Imagine if your organization has a platform team. Are you confident about the following:

  1. The interceptors exist with parity across unary and streaming forms?
  2. The interceptors are implemented correctly in both forms?
  3. The interceptors are semantically expressable in both forms?

Before you say “yes” without thinking about it, you need to deeply understand two things:

  1. The semantic intended behavior of the interceptor.
  2. The full complement of lifecycle phases for the given interceptor type.

I’ve worked with RPC systems like this for almost 20 years, including with gRPC’s predecessor, which shares many conceptually similar models to what I’ve just enumerated. I don’t think I could grok no. 2 (lifecycle phases) without building a hello world server and client and testing things for myself just to remind myself how everything works in real life.

All I am going to intone in this is that I do not consider using streaming lightly, especially with load-bearing interception.

Step: Use Client-Side Streaming (Node 19)

This is essentially a trimmed down version of Node 14, where instead of having the server provide periodic updates to the client about the status, only a terminal result is shared with the client once it all finishes.

 1  2  3  4  5  6  7  8  9 10 11 12 
message UploadFragment {  string file_name = 1;  bytes content = 2;  string quota_id = 3; } message UploadStatus {  int32 remaining_quota_bytes = 1; }  service UploadService {  rpc Upload(stream UploadFragment) returns (UploadStatus) {} } 

A more realistic case of this is clients pushing telemetry to a central collection agent:

 1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 
import "google/protobuf/struct.proto"  message Update {  message Gauge {  double value = 1;  }  message Counter {  double value = 1;  }  message Histogram {  message Bucket {  double start = 1;  int64 sum = 2;  }  repeated Bucket buckets = 1;  }  message Sample {  map<string, string> labels = 1;  oneof value {  Gauge gauge = 1;  Counter counter = 2;  Histogram histogram = 3;  }  }  string name = 1;  google.protobuf.Struct source_schema = 2;  repeated Sample sample = 3; }  message IngestionResult {}  service TelemetryReceiver {  rpc Push(stream Update) returns (IngestionResult) {} } 

We could also imagine this situation reversed whereby the collection service polls this data from the clients instead, but that’s not the topic of this section.

Question: Does the Client Receive a Lot of Data (Node 8)

This question is effectively a variation of Node 6, except that instead of the client sending a lot of data to the server it receives a lot of data from the server (e.g., instead of an upload service, we have a download service). I’d apply a similar mental rubric here.

Question: Does the Client Need to Periodically Communicate with Server (Node 9)

This question is a variation of Node 7, except that the client needs to inform the server of something periodically. We could imagine a scenario in which a client receives units of work from the server that it performs and then tells the server it is ready for the next unit of work once the initial ones it completes are done.

This would not be too different in principle from how SETI@home worked way back in the day. If you don’t remember, SETI@home was an early public distributed computing activity, wherein clients would download signal data from the research project, perform expensive analysis on the data, and then send results back to the server.

A flow might look like this:

  1. Client advertises itself with SETI@home.
  2. SETI@home assigns a unit of work to the client.
  3. Client downloads the work.
  4. Client processes the work over the next minutes to hours (depending on client’s processing power).
  5. Client uploads work.
  6. Client receives new unit of work, and the process continues until the heat death of the universe.

This could be modeled with either streaming or unary operations, to be sure, but we’re imagining a long-term session being opened between client and server. We could also imagine the SETI@home story including flow control information to the server: Hey, I’m not ready yet for the next chunk of data; hold off!

Question: Can the Client Easily Use Pagination (Node 16)

Pagination’s considerations are complementary to Node 15:

There are other cases where the server-side data fundamentally is not paginateable barring the server itself undertaking effort to make it so (e.g., real-time signal data that the server buffers).

Step: Use Server-Side Streaming (Node 17)

Server-side streaming complements Node 19. Let’s reformulate it as a download service, instead:

 1  2  3  4  5  6  7  8  9 10 11 
message DownloadRequest {  string file_name = 1; }  message DownloadFragment {  bytes content = 1; }  service DownloadService {  rpc Download(DownloadRequest) returns (stream DownloadFragment) {} } 

There are plenty of other twists on this, too: streaming real-time sensor readings, etc.

Question: Does the Server Model Something Event-Driven (Node 10)

By this point in the diagram, we are dealing with a system that sends and receives little data for some definition of “little.” We are dealing with something that could be moderately long-running. Something that might fall into that space is an event-driven system from real-time sources that could need relatively expedient processing. A few examples:

Question: Would Periodic Polling be Incorrect or Unergonomic for Clients (Node 11)

This question is very tightly coupled with Node 10. When faced with how to pass event-driven data to clients, you need to think of the requirements:

It might seem weird to frame these considerations under the rubric of client polling, but they help inform whether the option is even on the table or not.

And then even if the technical requirements are forgiving, how easily can the client managing polling the server for updates? Streaming for both clients and servers entails a bit of coding overhead. Periodic polling has some coding overhead of its own (e.g., determining interval or poll policy, retry policy, etc). These aren’t really hard problems, but perhaps they are enough to just bite the bullet and consider streaming.

Navigation:
Tags: