am I giving the work to the core in the wrong way?
Yes, you are giving one request to Tokio and requiring that it complete before starting the next request. You've taken asynchronous code and forced it to be sequential.
You need to give the reactor a single future that will perform different kinds of concurrent work.
Hyper 0.14
use futures::prelude::*; use hyper::{body, client::Client}; use std::{ io::{self, Write}, iter, }; use tokio; const N_CONCURRENT: usize = 1; #[tokio::main] async fn main() { let client = Client::new(); let uri = "http://httpbin.org/ip".parse().unwrap(); let uris = iter::repeat(uri).take(50); stream::iter(uris) .map(move |uri| client.get(uri)) .buffer_unordered(N_CONCURRENT) .then(|res| async { let res = res.expect("Error making request: {}"); println!("Response: {}", res.status()); body::to_bytes(res).await.expect("Error reading body") }) .for_each(|body| async move { io::stdout().write_all(&body).expect("Error writing body"); }) .await; }
With N_CONCURRENT set to 1:
real 1.119 1119085us user 0.012 12021us sys 0.011 11459us
And set to 10:
real 0.216 216285us user 0.014 13596us sys 0.021 20640us
Cargo.toml
[dependencies] futures = "0.3.17" hyper = { version = "0.14.13", features = ["client", "http1", "tcp"] } tokio = { version = "1.12.0", features = ["full"] }
Hyper 0.12
use futures::{stream, Future, Stream}; // 0.1.25 use hyper::Client; // 0.12.23 use std::{ io::{self, Write}, iter, }; use tokio; // 0.1.15 const N_CONCURRENT: usize = 1; fn main() { let client = Client::new(); let uri = "http://httpbin.org/ip".parse().unwrap(); let uris = iter::repeat(uri).take(50); let work = stream::iter_ok(uris) .map(move |uri| client.get(uri)) .buffer_unordered(N_CONCURRENT) .and_then(|res| { println!("Response: {}", res.status()); res.into_body() .concat2() .map_err(|e| panic!("Error collecting body: {}", e)) }) .for_each(|body| { io::stdout() .write_all(&body) .map_err(|e| panic!("Error writing: {}", e)) }) .map_err(|e| panic!("Error making request: {}", e)); tokio::run(work); }
With N_CONCURRENT set to 1:
real 0m2.279s user 0m0.193s sys 0m0.065s
And set to 10:
real 0m0.529s user 0m0.186s sys 0m0.075s
See also: