3

There's an example of downloading a file with Rusoto S3 here: How to save a file downloaded from S3 with Rusoto to my hard drive?

The problem is that it looks like it's downloading the whole file into memory and then writing it to disk, because it uses the write_all method which takes an array of bytes, not a stream. How can I use the StreamingBody, which implements futures::Stream to stream the file to disk?

2
  • for x in stream { file.write_all(&x) } something like that... Commented Nov 11, 2018 at 4:04
  • That would require StreamingBody to be an iterator, which it is not. Commented Nov 11, 2018 at 4:09

1 Answer 1

2

Since StreamingBody implements Stream<Item = Vec<u8>, Error = Error>, we can construct a MCVE that represents that:

extern crate futures; // 0.1.25 use futures::{prelude::*, stream}; type Error = Box<std::error::Error>; fn streaming_body() -> impl Stream<Item = Vec<u8>, Error = Error> { const DUMMY_DATA: &[&[u8]] = &[b"0123", b"4567", b"89AB", b"CDEF"]; let iter_of_owned_bytes = DUMMY_DATA.iter().map(|&b| b.to_owned()); stream::iter_ok(iter_of_owned_bytes) } 

We can then get a "streaming body" somehow and use Stream::for_each to process each element in the Stream. Here, we just call write_all with some provided output location:

use std::{fs::File, io::Write}; fn save_to_disk(mut file: impl Write) -> impl Future<Item = (), Error = Error> { streaming_body().for_each(move |chunk| file.write_all(&chunk).map_err(Into::into)) } 

We can then write a little testing main:

fn main() { let mut file = Vec::new(); { let fut = save_to_disk(&mut file); fut.wait().expect("Could not drive future"); } assert_eq!(file, b"0123456789ABCDEF"); } 

Important notes about the quality of this naïve implementation:

  1. The call to write_all may potentially block, which you should not do in an asynchronous program. It would be better to hand off that blocking work to a threadpool.

  2. The usage of Future::wait forces the thread to block until the future is done, which is great for tests but may not be correct for your real use case.

See also:

Sign up to request clarification or add additional context in comments.

8 Comments

One question about this. Where you call streaming_body().for_each(...), is that more or less equivalent to doing for chunk in streaming_body().wait() { ... }, other than one using a closure and the other using an iterator?
@NicholasBishop There is a relevant difference between the two. The for loop you suggest blocks the current thread, so the thread can't do any other work until the complete stream is resolved. The stream combinator for_each(), on the other hand, yields control to the event loop whenever it would block. (Of course the test code in this answer does not use an event loop, and also blocks until the future is resolved. However, the whole point of asynchronous code is not to unnecessarily block the current thread, so you wouldn't do this in real code.)
For my use case I do actually want to block the thread -- just because the API provided is async doesn't mean that the calling code is.
@NicholasBishop Then both versions are fine. I'd probably use Stream::wait() in that case, since working with stream combinators can be cumbersome.
Stream::wait is fine for now, but it's being removed in the futures rework (just like Future::wait). There will be a direct replacement for Future::wait, but I don't know of one for Stream::wait.
|

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.