0

I have to compute asynchronously a set of features that can have multiple dependencies between each other (no loops). For example

 class FeatureEncoderMock(val n:String, val deps: List[String] = List.empty) { def compute = { println(s"starting computation feature $n") Thread.sleep(r.nextInt(2500)) println(s"end computation feature $n") } } val registry = Map( "feat1" -> new FeatureEncoderMock("feat1", List("factLogA", "factLogB")), "factLogA" -> new FeatureEncoderMock("factLogA"), "factLogB" -> new FeatureEncoderMock("factLogB"), "feat1" -> new FeatureEncoderMock("feat1", List("factLogA", "factLogB")), "feat2" -> new FeatureEncoderMock("feat2", List("factLogA")), "feat3" -> new FeatureEncoderMock("feat3", List("feat1")), "feat4" -> new FeatureEncoderMock("feat4", List("feat3", "factLogB")) ) 

What I want to achieve is call a single function on feat4 that will trigger the computation of all dependent features and will take care of dependencies among them. I tried with this

def run(): Unit = { val requested = "feat4" val allFeatures = getChainOfDependencies(requested) val promises = allFeatures.zip(Seq.fill(allFeatures.size)(Promise[Unit])).toMap def computeWithDependencies(f: String) = Future { println(s"computing $f") val encoder = registry(f) if(encoder.deps.isEmpty) { promises(f).success(registry(f).compute) } else { val depTasks = promises.filterKeys(encoder.deps.contains) val depTasksFuture = Future.sequence(depTasks.map(_._2.future)) depTasksFuture.onSuccess({ case _ => println(s"all deps for $f has been computed") promises(f).success(registry(f).compute) println(s"done for $f") }) } } computeWithDependencies(requested) } 

But I cannot understand why the order of execution is not as expected. I am not sure what is the proper way to feed the future inside a promise. I am quite sure that this piece of code is wrong on that part.

1 Answer 1

1

I think you're overthinking it with the promises; Future composition is probably all that you need. Something like this:

import scala.collection.mutable def computeWithDependencies(s: String, cache: mutable.Map[String, Future[Unit]] = mutable.Map.empty) (implicit ec: ExecutionContext): Future[Unit] = { cache.get(s) match { case Some(f) => f case None => { val encoder = registry(s) val depsFutures = encoder.deps.map(d => computeWithDependencies(d, cache)) val result = Future.sequence(depsFutures).flatMap(_ => Future { encoder.compute }) cache += s -> result result } } } 

The call to flatMap ensures that all of the dependency futures complete before the "current" future executes, even if the result (a List[Unit]) is ignored. The business with the cache is just to prevent recomputation if the dependency graph has a "diamond" in it, but could be left out if it won't or if you're ok with recomputing. Anyway, when running this:

val futureResult = computeWithDependencies("feat4") Await.result(futureResult, 30 seconds) 

I see this output:

starting computation feature factLogB starting computation feature factLogA end computation feature factLogB end computation feature factLogA starting computation feature feat1 end computation feature feat1 starting computation feature feat3 end computation feature feat3 starting computation feature feat4 end computation feature feat4 

Which seems correct to me.

Sign up to request clarification or add additional context in comments.

2 Comments

this is perfect! Thanks a lot. Yes, my main problem was that every time I defined a Future, it was starting to compute it, so the composition is what I was missing. Thanks!
One more thing: how would you modify the code if I want to run simultaneously the computeWithDependencies of multiple features?

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.