|
43 | 43 | :clojure.core.async.flow/ins (reduce-kv reducer-fn {} (:clojure.core.async.flow/ins proc)) |
44 | 44 | :clojure.core.async.flow/outs (reduce-kv reducer-fn {} (:clojure.core.async.flow/outs proc))))) |
45 | 45 |
|
| 46 | +(defn default-flow-state-filter [proc-state] |
| 47 | + (into {} (filter (fn [[k _]] (not= "clojure.core.async.flow" (namespace k))) proc-state))) |
| 48 | + |
| 49 | +(defn filter-state [proc filters] |
| 50 | + (let [pid (:clojure.core.async.flow/pid proc) |
| 51 | + user-filter (cond |
| 52 | + (contains? filters pid) (pid filters) |
| 53 | + (contains? filters :default) (:default filters) |
| 54 | + :else identity) |
| 55 | + filtered-state (->> (:clojure.core.async.flow/state proc) |
| 56 | + (default-flow-state-filter) |
| 57 | + (filter user-filter) |
| 58 | + (into {}))] |
| 59 | + (assoc proc :clojure.core.async.flow/state filtered-state))) |
| 60 | + |
46 | 61 | (defn send-message [state message] |
47 | 62 | (doall (for [channel (:channels @state)] |
48 | 63 | (httpkit/send! channel (transit-str-writer message (:handlers @state)))))) |
|
53 | 68 | (if (:loop-ping? @s) |
54 | 69 | (do (send-message state {:action :ping :data (d/datafy (reduce-kv |
55 | 70 | (fn [res k v] |
56 | | - (assoc res k (mainline-chan-meta v))) |
| 71 | + (-> res |
| 72 | + (assoc k (mainline-chan-meta v)) |
| 73 | + (assoc k (filter-state v (:filters @state))))) |
57 | 74 | {} |
58 | 75 | (flow/ping (:flow @state))))}) |
59 | 76 | (Thread/sleep 1000) |
|
124 | 141 |
|
125 | 142 | Returns: |
126 | 143 | An atom containing the server's state, and prints a local url where the frontend can be reached" |
127 | | - [{:keys [flow port handlers] :or {port 9998}}] |
| 144 | + [{:keys [flow port handlers filters] :or {port 9998}}] |
128 | 145 | (let [state (atom default-state) |
129 | 146 | error-chan (:clojure.datafy/obj (meta (:error (:chans (d/datafy flow))))) |
130 | 147 | report-chan (:clojure.datafy/obj (meta (:report (:chans (d/datafy flow)))))] |
131 | | - (swap! state assoc :flow flow :handlers handlers) |
| 148 | + (swap! state assoc :flow flow :handlers handlers :filters filters) |
132 | 149 | (report-monitoring state report-chan error-chan) |
133 | 150 | (let [server (httpkit/run-server (app state) {:port port |
134 | 151 | :max-body 100000000 |
|
0 commit comments