Context
- I am making a system, where different clients issue queries
- A query is resolved by issuing a set of subqueries
- I have an invalidation-worker, which gets notified when subqueries go stale
Goal
- When a subquery goes stale, I want to notify the clients which have made this subquery
Solution
To do this, I am thinking of keeping a mapping. Here's a rough solution you can play with in the REPL:
(ns play (:require [clojure.core.async :as a :refer [go <! go-loop >!]])) (def recordings (atom {})) (defn record-subquery! [client-id query-n subquery-n] (swap! recordings update subquery-n (fn [prev] (let [prev (or prev #{})] (conj prev [client-id query-n]))))) (defn go-subquery [client-id query-n subquery-n] (go (<! (a/timeout (rand-int 2000))) (record-subquery! client-id query-n subquery-n) {:client-id client-id :query-n query-n :subquery-n subquery-n})) (defn go-query [client-id query-n] (go (let [subquery-ns (range query-n (+ query-n 5))] {:client-id client-id :query-n query-n :subqueries (->> subquery-ns (map (partial go-subquery client-id query-n)) a/merge (a/into []) <!)}))) (comment (go (prn (<! (go-query :a 1))))) (def client-chans {:a (a/chan) :b (a/chan)}) (defn client-worker [client-id query-chan] (go-loop [] (when-some [q (<! query-chan)] (prn (format "queried id = %s q = %s" client-id (<! (go-query client-id q)))) (recur)))) (def invalidation-chan (a/chan)) (defn invalidation-broadcaster [] (go (loop [] (<! (a/timeout 1500)) (when (>! invalidation-chan (rand-int 10)) (recur))))) (defn invalidation-worker [chan] (go-loop [] (when-some [sq-id (<! chan)] (let [subs (->> sq-id (@recordings))] (prn (format "invalidating sq-id = %s subs = %s" sq-id subs)) (doseq [[client-id query-n] subs] (>! (client-id client-chans) query-n)) (recur))))) (comment (do (client-worker :a (:a client-chans)) (client-worker :b (:b client-chans)) (invalidation-worker invalidation-chan) (invalidation-broadcaster)) (a/close! invalidation-chan) (go (>! (:a client-chans) 1))) Problem with the solution
I am a bit sad that record-subquery! is nested under go-subquery. This makes go-query stateful. I do it though to avoid the following race condition:
T0: go-query starts T1: subquery-1 completes T2: subquery-1 is invalidated T3: subquery-2 completes T4: go-query completes In this scenario, we would miss the T2 update.
Would you do this differently?