File tree Expand file tree Collapse file tree 2 files changed +15
-7
lines changed
google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1 Expand file tree Collapse file tree 2 files changed +15
-7
lines changed Original file line number Diff line number Diff line change @@ -149,8 +149,7 @@ protected void doStart() {
149149
150150 @ Override
151151 protected void doStop () {
152- messageDispatcher .stop ();
153- ackOperationsWaiter .waitComplete ();
152+ runShutdown ();
154153
155154 lock .lock ();
156155 try {
@@ -161,6 +160,11 @@ protected void doStop() {
161160 }
162161 }
163162
163+ private void runShutdown () {
164+ messageDispatcher .stop ();
165+ ackOperationsWaiter .waitComplete ();
166+ }
167+
164168 private class StreamingPullResponseObserver implements ResponseObserver <StreamingPullResponse > {
165169
166170 final SettableApiFuture <Void > errorFuture ;
@@ -282,6 +286,7 @@ public void onFailure(Throwable cause) {
282286 ApiExceptionFactory .createException (
283287 cause , GrpcStatusCode .of (Status .fromThrowable (cause ).getCode ()), false );
284288 logger .log (Level .SEVERE , "terminated streaming with exception" , gaxException );
289+ runShutdown ();
285290 notifyFailed (gaxException );
286291 return ;
287292 }
Original file line number Diff line number Diff line change @@ -308,9 +308,7 @@ protected void doStop() {
308308 public void run () {
309309 try {
310310 // stop connection is no-op if connections haven't been started.
311- stopAllStreamingConnections ();
312- shutdownBackgroundResources ();
313- subStub .shutdownNow ();
311+ runShutdown ();
314312 notifyStopped ();
315313 } catch (Exception e ) {
316314 notifyFailed (e );
@@ -320,6 +318,12 @@ public void run() {
320318 .start ();
321319 }
322320
321+ private void runShutdown () {
322+ stopAllStreamingConnections ();
323+ shutdownBackgroundResources ();
324+ subStub .shutdownNow ();
325+ }
326+
323327 private void startStreamingConnections () {
324328 synchronized (streamingSubscriberConnections ) {
325329 for (int i = 0 ; i < numPullers ; i ++) {
@@ -352,8 +356,7 @@ private void startStreamingConnections() {
352356 public void failed (State from , Throwable failure ) {
353357 // If a connection failed is because of a fatal error, we should fail the
354358 // whole subscriber.
355- stopAllStreamingConnections ();
356- shutdownBackgroundResources ();
359+ runShutdown ();
357360 try {
358361 notifyFailed (failure );
359362 } catch (IllegalStateException e ) {
You can’t perform that action at this time.
0 commit comments