0

I'm updating code to the newest versions of hyper and futures, but everything I've tried misses implemented traits in some kind or another.

A not working example playground for this ...

extern crate futures; // 0.3.5 extern crate hyper; // 0.13.6 use futures::{future, FutureExt, StreamExt, TryFutureExt, TryStreamExt}; use hyper::body; fn get_body_as_vec<'a>(b: body::Body) -> future::BoxFuture<'a, Result<Vec<String>, hyper::Error>> { let f = b.and_then(|bytes| { let s = std::str::from_utf8(&bytes).expect("sends no utf-8"); let mut lines: Vec<String> = Vec::new(); for l in s.lines() { lines.push(l.to_string()); } future::ok(lines) }); Box::pin(f) } 

This produces the error:

error[E0277]: the trait bound `futures::stream::AndThen<hyper::Body, futures::future::Ready<std::result::Result<std::vec::Vec<std::string::String>, hyper::Error>>, [closure@src/lib.rs:8:24: 15:6]>: futures::Future` is not satisfied --> src/lib.rs:17:5 | 17 | Box::pin(f) | ^^^^^^^^^^^ the trait `futures::Future` is not implemented for `futures::stream::AndThen<hyper::Body, futures::future::Ready<std::result::Result<std::vec::Vec<std::string::String>, hyper::Error>>, [closure@src/lib.rs:8:24: 15:6]>` | = note: required for the cast to the object type `dyn futures::Future<Output = std::result::Result<std::vec::Vec<std::string::String>, hyper::Error>> + std::marker::Send` 

I'm unable to create a compatible future. Body is a stream and I can't find any "converter" function with the required traits implemented.

With hyper 0.12, I used concat2().

4
  • You cannot use and_then from StreamExt over reference of a Stream. Commented Jul 20, 2020 at 11:41
  • StreamExt is up to now not used in this code. And the compiler has no problem with a reference of Body (which is no standard stream). Commented Jul 20, 2020 at 13:16
  • it looks like it comes from TryStreamExt, it is ok, what i meant is this: play.rust-lang.org/… Commented Jul 20, 2020 at 13:25
  • Ah, ok. Missing copy trait. I've this fn only for this question. In my code I don't ref/move body around. I've changed it above. Commented Jul 20, 2020 at 22:01

2 Answers 2

2

From the reference of and_then:

Note that this function consumes the receiving stream and returns a wrapped version of it.

To process the entire stream and return a single future representing success or error, use try_for_each instead.

Yes your f is still a Stream, try_for_each will work as reference suggested but try_fold would be a better choice to represent bytes as lines in vector but as @Shepmaster points in the comment; there is a possibility that if we directly convert chunks to the UTF-8 we can lose integrity of multi-byte characters from response.

Due to consistency of data, the easiest solution might be collecting all the bytes before conversion to UTF-8.

use futures::{future, FutureExt, TryStreamExt}; use hyper::body; fn get_body_as_vec<'a>(b: body::Body) -> future::BoxFuture<'a, Result<Vec<String>>> { let f = b .try_fold(vec![], |mut vec, bytes| { vec.extend_from_slice(&bytes); future::ok(vec) }) .map(|x| { Ok(std::str::from_utf8(&x?)? .lines() .map(ToString::to_string) .collect()) }); Box::pin(f) } 

Playground


You can test the multiple chunk behavior by using channel from hyper Body. Here is I've created the line partition across the chunks scenario, this will work fine with the code above but if you directly process the chunks you will lose the consistency.

let (mut sender, body) = body::Body::channel(); tokio::spawn(async move { sender .send_data("Line1\nLine2\nLine3\nLine4\nLine5".into()) .await; sender .send_data("next bytes of Line5\nLine6\nLine7\nLine8\n----".into()) .await; }); println!("{:?}", get_body_as_vec(body).await); 
  • Playground ( Success scenario )
  • Playground ( Fail scenario: "next bytes of Line5" will be represented as new line in Vec)

Note : I've used std::error:Error as a return type since both hyper::Error and FromUtf8Error implement it, you may still use your expect strategy with hyper::Error.

Sign up to request clarification or add additional context in comments.

5 Comments

Doesn't help me. This returns a Vec with one element holding the body as one String. The Result-Vec should have one element per line.
play.rust-lang.org/… the trait futures::Future is not implemented for &std::vec::Vec<std::string::String>
This solution seems brittle — what if the returned data contains multibyte UTF-8 characters that are split across chunks of the body?
@Shepmaster Well I've focused the Future and Stream part but yes, we should collect all the chunks from stream into a vec before converting bytes to the UTF-8, thanks for the warning, I'll edit the code
A more efficient (but way more complicated) solution could probably take it chunk-by-chunk and keep some state around between calls, but I agree that getting all the data at once is the easiest way to get it right.
2

I found two solutions, each of them is pretty simple:

/* WARNING for beginners!!! This use statement is important so we can later use .data() method!!! */ use hyper::body::{to_bytes, HttpBody}; // Takes only single chunk of data! let my_vector: Vec<u8> = request.into_body().data().await.unwrap().unwrap().to_vec(); // Takes all data chunks, not just the first one: let my_bytest = body::to_bytes(res.into_body()).await?; let my_string = String::from_utf8(my_vector).unwrap(); 

This example doesn't handle errors properly, ensure your code does.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.