5

I have a number of (unevaluated) expressions held in a vector; [ expr1 expr2 expr3 ... ]

What I wish to do is hand each expression to a separate thread and wait until one returns a value. At that point I'm not interested in the results from the other threads and would like to cancel them to save CPU resource.

( I realise that this could cause non-determinism in that different runs of the program might cause different expressions to be evaluated first. I have this in hand. )

Is there a standard / idiomatic way of achieving the above?

5 Answers 5

5

Here's my take on it.

Basically you have to resolve a global promise inside each of your futures, then return a vector containing future list and the resolved value and then cancel all the futures in the list:

(defn run-and-cancel [& expr] (let [p (promise) run-futures (fn [& expr] [(doall (map #(future (deliver p (eval %1))) expr)) @p]) [fs res] (apply run-futures expr)] (map future-cancel fs) res)) 
Sign up to request clarification or add additional context in comments.

1 Comment

Thank you soulcheck (and everyone else that contributed). I was gradually moving towards the same solution as above, but my code wasn't as concise or as elegant; I'm new to the language and need some practice ;)
2

It's not reached an official release yet, but core.async looks like it might be an interesting way of solving your problem - and other asynchronous problems, very neatly.

The leiningen incantation for core.async is (currently) as follows:

[org.clojure/core.async "0.1.0-SNAPSHOT"] 

And here's some code to make a function that will take a number of time-consuming functions, and block until one of them returns.

(require '[clojure.core.async :refer [>!! chan alts!! thread]])) (defn return-first [& ops] (let [v (map vector ops (repeatedly chan))] (doseq [[op c] v] (thread (>!! c (op)))) (let [[value channel] (alts!! (map second v))] value))) ;; Make sure the function returns what we expect with a simple Thread/sleep (assert (= (return-first (fn [] (Thread/sleep 3000) 3000) (fn [] (Thread/sleep 2000) 2000) (fn [] (Thread/sleep 5000) 5000)) 2000)) 

In the sample above:

  • chan creates an asynchronous channel
  • >!! puts a value onto a channel
  • thread executes the body in another thread
  • alts!! takes a vector of channels, and returns when a value appears on any of them

There's way more to it than this, and I'm still getting my head round it, but there's a walkthrough here: https://github.com/clojure/core.async/blob/master/examples/walkthrough.clj

And David Nolen's blog has some great, if mind-boggling, posts on it (http://swannodette.github.io/)

Edit

Just seen that Michał Marczyk has answered a very similar question, but better, here, and it allows you to cancel/short-circuit. with Clojure threading long running processes and comparing their returns

Comments

1

What you want is Java's CompletionService. I don't know of any wrapper around this in clojure, but it wouldn't be hard to do with interop. The example below is loosely based around the example on the JavaDoc page for the ExecutorCompletionService.

(defn f [col] (let [cs (ExecutorCompletionService. (Executors/newCachedThreadPool)) futures (map #(.submit cs %) col) result (.get (.take cs))] (map #(.cancel % true) futures) result)) 

3 Comments

Using ExecutorCompletionService is a good suggestion. On the other hand your code would not work, because it ignores the laziness of map: none of the tasks will get submitted and the .take call will block forever.
thanks for the input regarding map; I hadn't noticed that. I suppose I could just wrap the map call in doall?
exactly, you have to wrap them in doall. I case of the .cancel calls, using a doseq would be even more idiomatic.
0

You could use future-call to get a list of all futures, storing them in an Atom. then, compose each running future with a "shoot the other ones in the head" function so the first one will terminate all the remaining ones. Here is an example:

(defn first-out [& fns] (let [fs (atom []) terminate (fn [] (println "cancling..") (doall (map future-cancel @fs)))] (reset! fs (doall (map (fn [x] (future-call #((x) (terminate)))) fns))))) (defn wait-for [n s] (fn [] (print "start...") (flush) (Thread/sleep n) (print s) (flush))) (first-out (wait-for 1000 "long") (wait-for 500 "short")) 

Edit

Just noticed that the previous code does not return the first results, so it is mainly useful for side-effects. here is another version that returns the first result using a promise:

(defn first-out [& fns] (let [fs (atom []) ret (promise) terminate (fn [x] (println "cancling.." ) (doall (map future-cancel @fs)) (deliver ret x))] (reset! fs (doall (map (fn [x] (future-call #(terminate (x)))) fns))) @ret)) (defn wait-for [n s] "this time, return the value" (fn [] (print "start...") (flush) (Thread/sleep n) (print s) (flush) s)) (first-out (wait-for 1000 "long") (wait-for 500 "short")) 

Comments

-1

While I don't know if there is an idiomatic way to achieve your goal but Clojure Future looks like a good fit.

Takes a body of expressions and yields a future object that will invoke the body in another thread, and will cache the result and return it on all subsequent calls to deref/@. If the computation has not yet finished, calls to deref/@ will block, unless the variant of deref with timeout is used.

4 Comments

how would you avoid waiting for all the futures to finish?
Yes, futures were what I was considering but I can't see a way of not blocking unnecessarily. Suppose expr1 takes 10s to evaluate, expr2 1s to evaluate and expr3 100s. How do I know that I should call deref/@ on expr2 given that in general I have no a priori information on the likely evaluation times; otherwise I could just evaluate expr2 in thread and be done with it ;)
@SeanHoldsworth By using (realized?) maybe?
@Chiron wouldn't that imply some form of busy waiting?

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.