|
70 | 70 | import com.google.storage.v2.StorageSettings; |
71 | 71 | import com.google.storage.v2.stub.GrpcStorageCallableFactory; |
72 | 72 | import com.google.storage.v2.stub.GrpcStorageStub; |
73 | | -import com.google.storage.v2.stub.StorageStub; |
74 | 73 | import com.google.storage.v2.stub.StorageStubSettings; |
75 | 74 | import io.grpc.ClientInterceptor; |
76 | 75 | import io.grpc.Detachable; |
|
89 | 88 | import java.time.Clock; |
90 | 89 | import java.util.ArrayList; |
91 | 90 | import java.util.Arrays; |
92 | | -import java.util.Collection; |
93 | 91 | import java.util.Collections; |
94 | 92 | import java.util.IdentityHashMap; |
| 93 | +import java.util.Iterator; |
95 | 94 | import java.util.List; |
96 | 95 | import java.util.Locale; |
97 | 96 | import java.util.Map; |
@@ -908,9 +907,27 @@ private Object readResolve() { |
908 | 907 |
|
909 | 908 | private static final class InternalStorageClient extends StorageClient { |
910 | 909 |
|
911 | | - private InternalStorageClient(StorageStub stub) { |
| 910 | + private InternalStorageClient(InternalZeroCopyGrpcStorageStub stub) { |
912 | 911 | super(stub); |
913 | 912 | } |
| 913 | + |
| 914 | + @Override |
| 915 | + public void shutdownNow() { |
| 916 | + try { |
| 917 | + // GrpcStorageStub#close() is final and we can't override it |
| 918 | + // instead hook in here to close out the zero-copy marshaller |
| 919 | + getStub().getObjectMediaResponseMarshaller.close(); |
| 920 | + } catch (IOException e) { |
| 921 | + throw new RuntimeException(e); |
| 922 | + } finally { |
| 923 | + super.shutdownNow(); |
| 924 | + } |
| 925 | + } |
| 926 | + |
| 927 | + @Override |
| 928 | + public InternalZeroCopyGrpcStorageStub getStub() { |
| 929 | + return (InternalZeroCopyGrpcStorageStub) super.getStub(); |
| 930 | + } |
914 | 931 | } |
915 | 932 |
|
916 | 933 | private static final class InternalZeroCopyGrpcStorageStub extends GrpcStorageStub |
@@ -1071,30 +1088,21 @@ public void close() throws IOException { |
1071 | 1088 | * them all as suppressed exceptions on the first occurrence. |
1072 | 1089 | */ |
1073 | 1090 | @VisibleForTesting |
1074 | | - static void closeAllStreams(Collection<InputStream> inputStreams) throws IOException { |
1075 | | - IOException ioException = |
1076 | | - inputStreams.stream() |
1077 | | - .map( |
1078 | | - stream -> { |
1079 | | - try { |
1080 | | - stream.close(); |
1081 | | - return null; |
1082 | | - } catch (IOException e) { |
1083 | | - return e; |
1084 | | - } |
1085 | | - }) |
1086 | | - .filter(Objects::nonNull) |
1087 | | - .reduce( |
1088 | | - null, |
1089 | | - (l, r) -> { |
1090 | | - if (l != null) { |
1091 | | - l.addSuppressed(r); |
1092 | | - return l; |
1093 | | - } else { |
1094 | | - return r; |
1095 | | - } |
1096 | | - }, |
1097 | | - (l, r) -> l); |
| 1091 | + static void closeAllStreams(Iterable<InputStream> inputStreams) throws IOException { |
| 1092 | + Iterator<InputStream> iterator = inputStreams.iterator(); |
| 1093 | + IOException ioException = null; |
| 1094 | + while (iterator.hasNext()) { |
| 1095 | + InputStream next = iterator.next(); |
| 1096 | + try { |
| 1097 | + next.close(); |
| 1098 | + } catch (IOException e) { |
| 1099 | + if (ioException == null) { |
| 1100 | + ioException = e; |
| 1101 | + } else { |
| 1102 | + ioException.addSuppressed(e); |
| 1103 | + } |
| 1104 | + } |
| 1105 | + } |
1098 | 1106 |
|
1099 | 1107 | if (ioException != null) { |
1100 | 1108 | throw ioException; |
|
0 commit comments