|
21 | 21 | import static com.google.common.base.Preconditions.checkArgument; |
22 | 22 | import static com.google.common.base.Preconditions.checkState; |
23 | 23 | import static java.nio.charset.StandardCharsets.UTF_8; |
24 | | -import static java.util.concurrent.Executors.callable; |
25 | 24 |
|
26 | 25 | import com.google.api.core.ApiFuture; |
27 | 26 | import com.google.api.gax.paging.Page; |
|
37 | 36 | import com.google.cloud.Policy; |
38 | 37 | import com.google.cloud.WriteChannel; |
39 | 38 | import com.google.cloud.storage.Acl.Entity; |
| 39 | +import com.google.cloud.storage.ApiaryUnbufferedReadableByteChannel.ApiaryReadRequest; |
40 | 40 | import com.google.cloud.storage.BlobReadChannelV2.BlobReadChannelContext; |
41 | 41 | import com.google.cloud.storage.HmacKey.HmacKeyMetadata; |
42 | 42 | import com.google.cloud.storage.PostPolicyV4.ConditionV4Type; |
43 | 43 | import com.google.cloud.storage.PostPolicyV4.PostConditionsV4; |
44 | 44 | import com.google.cloud.storage.PostPolicyV4.PostFieldsV4; |
45 | 45 | import com.google.cloud.storage.PostPolicyV4.PostPolicyV4Document; |
| 46 | +import com.google.cloud.storage.UnbufferedReadableByteChannelSession.UnbufferedReadableByteChannel; |
46 | 47 | import com.google.cloud.storage.UnifiedOpts.ObjectSourceOpt; |
47 | 48 | import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt; |
48 | 49 | import com.google.cloud.storage.UnifiedOpts.Opts; |
|
59 | 60 | import com.google.common.collect.Maps; |
60 | 61 | import com.google.common.hash.Hashing; |
61 | 62 | import com.google.common.io.BaseEncoding; |
62 | | -import com.google.common.io.CountingOutputStream; |
| 63 | +import com.google.common.io.ByteStreams; |
63 | 64 | import com.google.common.primitives.Ints; |
64 | 65 | import java.io.ByteArrayInputStream; |
| 66 | +import java.io.ByteArrayOutputStream; |
65 | 67 | import java.io.IOException; |
66 | 68 | import java.io.InputStream; |
67 | 69 | import java.io.OutputStream; |
|
73 | 75 | import java.nio.ByteBuffer; |
74 | 76 | import java.nio.channels.Channels; |
75 | 77 | import java.nio.channels.ReadableByteChannel; |
| 78 | +import java.nio.channels.WritableByteChannel; |
76 | 79 | import java.nio.file.Files; |
77 | 80 | import java.nio.file.Path; |
78 | 81 | import java.text.SimpleDateFormat; |
@@ -604,9 +607,25 @@ public byte[] readAllBytes(BlobId blob, BlobSourceOption... options) { |
604 | 607 | Opts<ObjectSourceOpt> unwrap = Opts.unwrap(options); |
605 | 608 | Opts<ObjectSourceOpt> resolve = unwrap.resolveFrom(blob); |
606 | 609 | ImmutableMap<StorageRpc.Option, ?> optionsMap = resolve.getRpcOptions(); |
607 | | - ResultRetryAlgorithm<?> algorithm = |
608 | | - retryAlgorithmManager.getForObjectsGet(storageObject, optionsMap); |
609 | | - return run(algorithm, () -> storageRpc.load(storageObject, optionsMap), Function.identity()); |
| 610 | + boolean autoGzipDecompression = |
| 611 | + Utils.isAutoGzipDecompression(resolve, /*defaultWhenUndefined=*/ true); |
| 612 | + UnbufferedReadableByteChannelSession<StorageObject> session = |
| 613 | + ResumableMedia.http() |
| 614 | + .read() |
| 615 | + .byteChannel(BlobReadChannelContext.from(this)) |
| 616 | + .setAutoGzipDecompression(autoGzipDecompression) |
| 617 | + .unbuffered() |
| 618 | + .setApiaryReadRequest( |
| 619 | + new ApiaryReadRequest(storageObject, optionsMap, ByteRangeSpec.nullRange())) |
| 620 | + .build(); |
| 621 | + ByteArrayOutputStream baos = new ByteArrayOutputStream(); |
| 622 | + try (UnbufferedReadableByteChannel r = session.open(); |
| 623 | + WritableByteChannel w = Channels.newChannel(baos)) { |
| 624 | + ByteStreams.copy(r, w); |
| 625 | + } catch (IOException e) { |
| 626 | + throw StorageException.translate(e); |
| 627 | + } |
| 628 | + return baos.toByteArray(); |
610 | 629 | } |
611 | 630 |
|
612 | 631 | @Override |
@@ -638,19 +657,26 @@ public void downloadTo(BlobId blob, Path path, BlobSourceOption... options) { |
638 | 657 |
|
639 | 658 | @Override |
640 | 659 | public void downloadTo(BlobId blob, OutputStream outputStream, BlobSourceOption... options) { |
641 | | - final CountingOutputStream countingOutputStream = new CountingOutputStream(outputStream); |
642 | 660 | final StorageObject pb = codecs.blobId().encode(blob); |
643 | | - ImmutableMap<StorageRpc.Option, ?> optionsMap = |
644 | | - Opts.unwrap(options).resolveFrom(blob).getRpcOptions(); |
645 | | - ResultRetryAlgorithm<?> algorithm = retryAlgorithmManager.getForObjectsGet(pb, optionsMap); |
646 | | - run( |
647 | | - algorithm, |
648 | | - callable( |
649 | | - () -> { |
650 | | - storageRpc.read( |
651 | | - pb, optionsMap, countingOutputStream.getCount(), countingOutputStream); |
652 | | - }), |
653 | | - Function.identity()); |
| 661 | + Opts<ObjectSourceOpt> resolve = Opts.unwrap(options).resolveFrom(blob); |
| 662 | + ImmutableMap<StorageRpc.Option, ?> optionsMap = resolve.getRpcOptions(); |
| 663 | + boolean autoGzipDecompression = |
| 664 | + Utils.isAutoGzipDecompression(resolve, /*defaultWhenUndefined=*/ true); |
| 665 | + UnbufferedReadableByteChannelSession<StorageObject> session = |
| 666 | + ResumableMedia.http() |
| 667 | + .read() |
| 668 | + .byteChannel(BlobReadChannelContext.from(this)) |
| 669 | + .setAutoGzipDecompression(autoGzipDecompression) |
| 670 | + .unbuffered() |
| 671 | + .setApiaryReadRequest(new ApiaryReadRequest(pb, optionsMap, ByteRangeSpec.nullRange())) |
| 672 | + .build(); |
| 673 | + // don't close the provided stream |
| 674 | + WritableByteChannel w = Channels.newChannel(outputStream); |
| 675 | + try (UnbufferedReadableByteChannel r = session.open()) { |
| 676 | + ByteStreams.copy(r, w); |
| 677 | + } catch (IOException e) { |
| 678 | + throw StorageException.translate(e); |
| 679 | + } |
654 | 680 | } |
655 | 681 |
|
656 | 682 | @Override |
|
0 commit comments