Skip to content

Commit 72204d5

Browse files
committed
Support providing custom transit write handlers
1 parent 41d1d38 commit 72204d5

4 files changed

Lines changed: 68 additions & 16 deletions

File tree

README.md

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,38 @@ Add the following dependency to your project:
4040
;; http://localhost:9876/index.html#/?port=9876
4141
```
4242

43+
## Custom Transit Handlers
44+
45+
You can provide custom Transit write handlers to properly serialize types that aren't natively supported by Transit (used for visualizing state). These handlers should follow the format expected by cognitect.transit/writer :handlers. If not provided, the default handler will be used, which converts objects to strings.
46+
47+
```clojure
48+
(:require
49+
[clojure.core.async.flow-monitor :as monitor]
50+
[clojure.core.async.flow :as flow]
51+
[cognitect.transit :as transit]
52+
[java.time :as time])
53+
(:import
54+
[java.time Instant ZoneId]
55+
[java.time.format DateTimeFormatter])
56+
57+
(def my-flow (flow/create-flow ...))
58+
59+
(defn format-instant [instant]
60+
(.format
61+
(.withZone
62+
(DateTimeFormatter/ofPattern "MMMM d, yyyy")
63+
(ZoneId/systemDefault))
64+
instant))
65+
66+
(def instant-write-handler
67+
(transit/write-handler "instant-long"
68+
(fn [instant] (format-instant instant))))
69+
70+
(def server-state (monitor/start-server {:flow my-flow
71+
:port 9876
72+
:handlers {Instant instant-write-handler}}))
73+
```
74+
4375
### Stopping the Server
4476

4577
```clojure

resources/public/assets/js/compiled/main.js

Lines changed: 3 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/clojure/core/async/flow_monitor.clj

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,16 @@
2020
:loop-ping? false
2121
:flow nil})
2222

23-
(defn clj->transit-str [arg]
23+
(def default-write-handler (transit/write-handler "default" (fn [obj] (str obj))))
24+
25+
(defn transit-str-writer [data user-handlers]
2426
(let [out (ByteArrayOutputStream. 4096)
25-
writer (transit/writer out :json)]
26-
(transit/write writer arg)
27+
writer (transit/writer out :json {:handlers user-handlers
28+
:default-handler default-write-handler})]
29+
(transit/write writer data)
2730
(.toString out)))
2831

29-
(defn transit-str->clj [arg]
32+
(defn transit-str-reader [arg]
3033
(let [arg-json (json/read-str arg)
3134
arg-bytes (.getBytes arg-json "UTF-8")
3235
in (ByteArrayInputStream. arg-bytes)
@@ -42,17 +45,17 @@
4245

4346
(defn send-message [state message]
4447
(doall (for [channel (:channels @state)]
45-
(httpkit/send! channel (clj->transit-str message)))))
48+
(httpkit/send! channel (transit-str-writer message (:handlers @state))))))
4649

4750
(defn loop-ping [state]
4851
(async/thread
4952
(loop [s state]
5053
(if (:loop-ping? @s)
51-
(do (send-message state {:action :ping :data (reduce-kv
52-
(fn [res k v]
53-
(assoc res k (mainline-chan-meta v)))
54-
{}
55-
(flow/ping (:flow @state)))})
54+
(do (send-message state {:action :ping :data (d/datafy (reduce-kv
55+
(fn [res k v]
56+
(assoc res k (mainline-chan-meta v)))
57+
{}
58+
(flow/ping (:flow @state))))})
5659
(Thread/sleep 1000)
5760
(recur s))
5861
(println "Ping loop stopped")))))
@@ -67,7 +70,7 @@
6770
(swap! state assoc-in [:loop-ping?] true)
6871
(loop-ping state))
6972
:on-receive (fn [ch data]
70-
(let [clj-data (transit-str->clj data)
73+
(let [clj-data (transit-str-reader data)
7174
action (:action clj-data)]
7275
(case action
7376
:inject (flow/inject (:flow @state) (:target clj-data) (edn/read-string (:data clj-data)))
@@ -115,14 +118,17 @@
115118
- opts: A map with the following keys:
116119
- :flow (required) - The return value from clojure.core.async.flow/create-flow
117120
- :port (optional) - The port to run the server on (default: 9998)
121+
- :handlers (optional) - A map of custom Transit write handlers to use when serializing state
122+
data to send to the frontend. These handlers should follow the format
123+
expected by cognitect.transit/writer :handlers
118124
119125
Returns:
120126
An atom containing the server's state, and prints a local url where the frontend can be reached"
121-
[{:keys [flow port] :or {port 9998}}]
127+
[{:keys [flow port handlers] :or {port 9998}}]
122128
(let [state (atom default-state)
123129
error-chan (:clojure.datafy/obj (meta (:error (:chans (d/datafy flow)))))
124130
report-chan (:clojure.datafy/obj (meta (:report (:chans (d/datafy flow)))))]
125-
(swap! state assoc :flow flow)
131+
(swap! state assoc :flow flow :handlers handlers)
126132
(report-monitoring state report-chan error-chan)
127133
(let [server (httpkit/run-server (app state) {:port port
128134
:max-body 100000000

src/clojurescript/flow_monitor_ui/global.cljs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,8 +116,21 @@
116116
(defn send-socket-data [data]
117117
(.send (:ws-chan @global-state) (.stringify js/JSON (t/write writer data))))
118118

119+
(defn tagged-value? [^js v]
120+
(try
121+
(and (.-tag v) (.-rep v))
122+
(catch :default _ false)))
123+
124+
(defn transform-tagged-values [data]
125+
(cond
126+
(tagged-value? data) [(.-tag ^js data) (transform-tagged-values (.-rep ^js data))]
127+
(map? data) (reduce-kv (fn [m k v] (assoc m k (transform-tagged-values v))) {} data)
128+
(vector? data) (mapv transform-tagged-values data)
129+
(set? data) (into #{} (map transform-tagged-values data))
130+
:else data))
131+
119132
(defn process-ws-message [msg]
120-
(let [data (->> msg .-data (t/read reader))
133+
(let [data (->> msg .-data (t/read reader) transform-tagged-values)
121134
action (:action data)]
122135
(case action
123136
:datafy (do

0 commit comments

Comments
 (0)