4

I am trying to write to a HashMap using the Arc<Mutex<T>> pattern as part of a website scraping exercise inspired from the Rust cookbook.

This first part uses tokio runtime. I cannot get past the tasks being completed and returning the HashMap as it just hangs.

type Db = Arc<Mutex<HashMap<String, bool>>>; pub async fn handle_async_tasks(db: Db) -> BoxResult<HashMap<String, bool>> { let links = NodeUrl::new("https://www.inverness-courier.co.uk/") .await .unwrap(); let arc = db.clone(); let mut handles = Vec::new(); for link in links.links_with_paths { let x = arc.clone(); handles.push(tokio::spawn(async move { process(x, link).await; })); } // for handle in handles { // handle.await.expect("Task panicked!"); // } < I tried this as well> futures::future::join_all(handles).await; let readables = arc.lock().await; for (key, value) in readables.clone().into_iter() { println!("Checking db: k, v ==>{} / {}", key, value); } let clone_db = readables.clone(); return Ok(clone_db); } async fn process(db: Db, url: Url) { let mut db = db.lock().await; println!("checking {}", url); if check_link(&url).await.is_ok() { db.insert(url.to_string(), true); } else { db.insert(url.to_string(), false); } } async fn check_link(url: &Url) -> BoxResult<bool> { let res = reqwest::get(url.as_ref()).await?; Ok(res.status() != StatusCode::NOT_FOUND) } pub struct NodeUrl { domain: String, pub links_with_paths: Vec<Url>, } #[tokio::main] async fn main() { let db: Db = Arc::new(Mutex::new(HashMap::new())); let db = futures::executor::block_on(task::handle_async_tasks(db)); } 

I would like to return the HashMap to main() where the main thread is blocked. How can I wait for all async threaded processes to be complete and return the HashMap?

6
  • Can you provide your main() function? Otherwise no one can tell you how to use that HashMap in there Commented Nov 29, 2021 at 12:12
  • In addition to my answer I would suggest not jumping straight into async stuff without any prior Rust experience. Commented Nov 29, 2021 at 12:12
  • @orlp I updated my question with the actual url I was using and the main() function. @sebpuetz Commented Nov 29, 2021 at 12:25
  • @godhar What is NodeUrl? Commented Nov 29, 2021 at 12:26
  • @orlp it returns a Vec of URLs, with relative paths. It is working fine. Commented Nov 29, 2021 at 12:30

2 Answers 2

3
let links = NodeUrl::new("https://www.some-site.com/.co.uk/").await.unwrap(); 

This doesn't seem like a valid URL to me.

async fn process(db: Db, url: Url) { let mut db = db.lock().await; println!("checking {}", url); if check_link(&url).await.is_ok() { db.insert(url.to_string(), true); } else { db.insert(url.to_string(), false); } } 

This is highly problematic. You hold the exclusive lock on the database during the entire request. This makes your application effectively serial. The default timeout in reqwest is 30 seconds. So if the server isn't responsive and you have a lot of links to go through the program might just seem to 'hang'.

You should only get the database lock for as short as possible - just to do the insert:

async fn process(db: Db, url: Url) { println!("checking {}", url); if check_link(&url).await.is_ok() { let mut db = db.lock().await; db.insert(url.to_string(), true); } else { let mut db = db.lock().await; db.insert(url.to_string(), false); } } 

Or even better, eliminating the useless if:

async fn process(db: Db, url: Url) { println!("checking {}", url); let valid = check_link(&url).await.is_ok(); db.lock().await.insert(url.to_string(), valid); } 

Finally you didn't show your main function, the way you invoke handle_async_tasks or have other stuff running might be problematic.

Sign up to request clarification or add additional context in comments.

1 Comment

I see your point about the lock open during the request. Thanks for this.
1

My main issue was how to handle the MutexGuard - which I did in the end by using clone and returning the inner value.

There was no need to use an futures::executor in main: since we are within the tokio runtime, calling .await was sufficient to access the final value synchronously.

Cloning the Arc once was enough; I had cloned it twice before passing it into the multi-threaded context.

Thanks to @orlp for pointing out bad logic where it concerned the check_link function.

pub async fn handle_async_tasks() -> BoxResult<HashMap<String, bool>> { let get_links = NodeUrl::new("https://www.invernesscourier.co.uk/") .await .unwrap(); let db: Db = Arc::new(Mutex::new(HashMap::new())); let mut handles = Vec::new(); for link in get_links.links_with_paths { let x = db.clone(); handles.push(tokio::spawn(async move { process(x, link).await; })); } futures::future::join_all(handles).await; let guard = db.lock().await; let cloned = guard.clone(); Ok(cloned) } #[tokio::main] async fn main() { let db = task::handle_async_tasks().await.unwrap(); for (key, value) in db.into_iter() { println!("Checking db: {} / {}", key, value); } } 

This is by no means the best Rust code, but I wanted to share how I tackled things in the end.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.