0

I am trying to process some files asynchronously, with the ability to choose the number of threads the program should use. But I want to wait till processFiles() is completed processing all the files. So, I am searching for ways to stop function from returning until all the Futures are done executing. And it would be very helpful if anyone gives any ideas to approach this problem. Here is my sample code.

object FileReader{ def processFiles(files: Array[File]) = { val execService = Executors.newFixedThreadPool(5) implicit val execContext = ExecutionContext.fromExecutorService(execService) val processed = files.map { f => Future { val name = f.getAbsolutePath() val fp = Source.fromFile(name) var data = "" fp.getLines().foreach(x => { data = data ++ s"$x\n" }) fp.close() // process the data. println("Processing ....") data } } execContext.shutdown() } def main(args: Array[String]): Unit = { println("Start") val tmp = new File("/path/to/files") val files = tmp.listFiles() val result = processFiles(files) println("done processing") println("done work") } } 

I am thinking if my usage of Future here is wrong, please correct me if I am wrong.

My expected output :

Start Processing .... Processing .... Processing .... Processing .... Processing .... Processing .... Processing .... Processing .... Processing .... Processing .... done processing done work 

My current output:

Start done processing done work Processing .... Processing .... Processing .... Processing .... Processing .... Processing .... Processing .... Processing .... Processing .... Processing .... 

2 Answers 2

2

You need to use Future.traverse to combine all the Future's for individual file processing and Await.result on them after:

import java.io.File import java.util.concurrent.Executors import scala.io.Source import scala.concurrent._ import scala.concurrent.duration._ import scala.language.postfixOps object FileReader { def processFiles(files: Array[File]) = { val execService = Executors.newFixedThreadPool(5) implicit val execContext = ExecutionContext.fromExecutorService(execService) //Turn `Array[Future[String]]` to `Future[Array[String]]` val processed = Future.traverse(files.toList) { f => Future { val name = f.getAbsolutePath() val fp = Source.fromFile(name) var data = "" fp.getLines() .foreach(x => { data = data ++ s"$x\n" }) fp.close() // process the data. println("Processing ....") data } } //TODO: Put proper timeout here //Execution will be blocked until all futures completed Await.result(processed, 30 minute) execContext.shutdown() } def main(args: Array[String]): Unit = { println("Start") val tmp = new File( "/path/to/file" ) val files = tmp.listFiles() val result = processFiles(files) println("done processing") println("done work") } } 
Sign up to request clarification or add additional context in comments.

5 Comments

Thanks Ivan , for your response, even though this looks correct. I am getting the following error : inferred type arguments [String,Nothing,Array] do not conform to method traverse's type parameter bounds [A,B,M[X] <: IterableOnce[X]] val processed = Future.traverse(files) { f => Any idea what might be the reason? I am trying to change the type of files
@Manikishan thanks for a comment, I've edited answer. My bad, I'm sorry. Short answer: files needs to be converted to list.
Changing files type to Seq[File] worked.
@Manikishan listFiles as far as I remember. Anyway, traverse accepts scala.collection.IterableOnce and Array does not have such inheritance.
@Manikishan upvote also appreciated. Thank you for question!
0

Thanks to @Ivan Kurchenko. The solution worked. I am posting my final version of the code that worked.

object FileReader { def processFiles(files: Seq[File]) = { val execService = Executors.newFixedThreadPool(5) implicit val execContext = ExecutionContext.fromExecutorService(execService) //Turn `Array[Future[String]]` to `Future[Array[String]]` val processed = Future.traverse(files) { f => Future { val name = f.getAbsolutePath() val fp = Source.fromFile(name) var data = "" fp.getLines() .foreach(x => { data = augmentString(data) ++ s"$x\n" }) fp.close() // process the data. println("Processing ....") f } } // TODO: Put proper timeout here // Execution will be blocked until all futures completed Await.result(processed, 30.minute) execContext.shutdown() } def main(args: Array[String]): Unit = { println("Start") val tmp = new File( "/path/to/file" ) val files =tmp.listFiles.toSeq val result = processFiles(files) println("done processing") println("done work") } } 

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.