|
21 | 21 | [java.io File Reader BufferedReader BufferedWriter |
22 | 22 | InputStreamReader IOException] |
23 | 23 | [java.lang ProcessBuilder Process] |
24 | | - [java.util.concurrent ConcurrentHashMap])) |
| 24 | + [java.util.concurrent ConcurrentHashMap LinkedBlockingQueue])) |
25 | 25 |
|
26 | 26 | (def lock (Object.)) |
| 27 | +(def results (ConcurrentHashMap.)) |
27 | 28 | (def outs (ConcurrentHashMap.)) |
28 | 29 | (def errs (ConcurrentHashMap.)) |
29 | 30 |
|
| 31 | +(defn thread-name [] |
| 32 | + (.getName (Thread/currentThread))) |
| 33 | + |
30 | 34 | (defn create-socket [^String host port] |
31 | 35 | (let [socket (Socket. host (int port)) |
32 | 36 | in (io/reader socket) |
|
56 | 60 | (defn node-eval |
57 | 61 | "Evaluate a JavaScript string in the Node REPL process." |
58 | 62 | [repl-env js] |
59 | | - (let [{:keys [in out]} @(:socket repl-env)] |
60 | | - ;; escape backslash for Node.js under Windows |
61 | | - (write out |
62 | | - (json/write-str |
63 | | - {"repl" (.getName (Thread/currentThread)) |
64 | | - "form" js})) |
65 | | - (let [result (json/read-str |
66 | | - (read-response in) :key-fn keyword)] |
| 63 | + (let [tname (thread-name) |
| 64 | + {:keys [out]} @(:socket repl-env)] |
| 65 | + (write out (json/write-str {:type "eval" :repl tname :form js})) |
| 66 | + (let [result (.take ^LinkedBlockingQueue (.get results tname))] |
67 | 67 | (condp = (:status result) |
68 | 68 | "success" |
69 | 69 | {:status :success |
|
88 | 88 | (defn- alive? [proc] |
89 | 89 | (try (.exitValue proc) false (catch IllegalThreadStateException _ true))) |
90 | 90 |
|
91 | | -(defn- pipe [^Process proc in stream ios] |
| 91 | +(defn- event-loop [^Process proc in] |
92 | 92 | ;; we really do want system-default encoding here |
93 | | - (with-open [^Reader in (-> in InputStreamReader. BufferedReader.)] |
94 | | - (while (alive? proc) |
95 | | - (try |
96 | | - (let [res (read-response in)] |
97 | | - (try |
98 | | - (let [{:keys [repl content]} (json/read-str res :key-fn keyword) |
99 | | - stream (or (.get ios repl) stream)] |
100 | | - (.write stream content 0 (.length ^String content)) |
101 | | - (.flush stream)) |
102 | | - (catch Throwable _ |
103 | | - (.write stream res 0 (.length res)) |
104 | | - (.flush stream)))) |
105 | | - (catch IOException e |
106 | | - (when (and (alive? proc) (not (.contains (.getMessage e) "Stream closed"))) |
107 | | - (.printStackTrace e *err*))))))) |
| 93 | + (while (alive? proc) |
| 94 | + (try |
| 95 | + (let [res (read-response in)] |
| 96 | + (try |
| 97 | + (let [{:keys [type repl value] :or {repl "main"} :as event} |
| 98 | + (json/read-str res :key-fn keyword)] |
| 99 | + (case type |
| 100 | + "result" |
| 101 | + (.offer (.get results repl) event) |
| 102 | + (when-let [stream (.get (if (= type "out") outs errs) repl)] |
| 103 | + (.write stream value 0 (.length ^String value)) |
| 104 | + (.flush stream)))) |
| 105 | + (catch Throwable _ |
| 106 | + (.write *out* res 0 (.length res)) |
| 107 | + (.flush *out*)))) |
| 108 | + (catch IOException e |
| 109 | + (when (and (alive? proc) (not (.contains (.getMessage e) "Stream closed"))) |
| 110 | + (.printStackTrace e *err*)))))) |
108 | 111 |
|
109 | 112 | (defn- build-process |
110 | 113 | [opts repl-env input-src] |
|
122 | 125 | ([repl-env] (setup repl-env nil)) |
123 | 126 | ([{:keys [host port socket state] :as repl-env} opts] |
124 | 127 | (let [tname (.getName (Thread/currentThread))] |
| 128 | + (.put results tname (LinkedBlockingQueue.)) |
125 | 129 | (.put outs tname *out*) |
126 | 130 | (.put errs tname *err*)) |
127 | 131 | (locking lock |
128 | 132 | (when-not @socket |
129 | | - (let [out *out* |
130 | | - err *err* |
131 | | - output-dir (io/file (util/output-directory opts)) |
| 133 | + (let [output-dir (io/file (util/output-directory opts)) |
132 | 134 | _ (.mkdirs output-dir) |
133 | 135 | of (io/file output-dir "node_repl.js") |
134 | 136 | _ (spit of |
135 | 137 | (string/replace (slurp (io/resource "cljs/repl/node_repl.js")) |
136 | 138 | "var PORT = 5001;" |
137 | 139 | (str "var PORT = " (:port repl-env) ";"))) |
138 | 140 | proc (.start (build-process opts repl-env of)) |
139 | | - _ (do (.start (Thread. (bound-fn [] (pipe proc (.getInputStream proc) out outs)))) |
140 | | - (.start (Thread. (bound-fn [] (pipe proc (.getErrorStream proc) err errs))))) |
141 | 141 | env (ana/empty-env) |
142 | 142 | core (io/resource "cljs/core.cljs") |
143 | 143 | ;; represent paths as vectors so we can emit JS arrays, this is to |
|
157 | 157 | (if @socket |
158 | 158 | (recur (read-response (:in @socket))) |
159 | 159 | (recur nil)))) |
| 160 | + (.start (Thread. (bound-fn [] (event-loop proc (:in @socket))))) |
160 | 161 | ;; compile cljs.core & its dependencies, goog/base.js must be available |
161 | 162 | ;; for bootstrap to load, use new closure/compile as it can handle |
162 | 163 | ;; resources in JARs |
|
235 | 236 | (load-javascript this provides url)) |
236 | 237 | (-tear-down [this] |
237 | 238 | (swap! state update :listeners dec) |
238 | | - (let [tname (Thread/currentThread)] |
| 239 | + (let [tname (thread-name)] |
| 240 | + (.remove results tname) |
239 | 241 | (.remove outs tname) |
240 | 242 | (.remove errs tname)) |
241 | 243 | (locking lock |
|
0 commit comments