|
18 | 18 | [clojure.data.json :as json]) |
19 | 19 | (:import [java.net Socket] |
20 | 20 | [java.lang StringBuilder] |
21 | | - [java.io File BufferedReader BufferedWriter |
22 | | - Writer InputStreamReader IOException] |
| 21 | + [java.io File Reader BufferedReader BufferedWriter |
| 22 | + InputStreamReader IOException] |
23 | 23 | [java.lang ProcessBuilder Process] |
24 | 24 | [java.util.concurrent ConcurrentHashMap])) |
25 | 25 |
|
26 | 26 | (def lock (Object.)) |
27 | 27 | (def outs (ConcurrentHashMap.)) |
| 28 | +(def errs (ConcurrentHashMap.)) |
28 | 29 |
|
29 | 30 | (defn create-socket [^String host port] |
30 | 31 | (let [socket (Socket. host (int port)) |
|
89 | 90 | (defn- alive? [proc] |
90 | 91 | (try (.exitValue proc) false (catch IllegalThreadStateException _ true))) |
91 | 92 |
|
92 | | -(defn- pipe [^Process proc in ^Writer out] |
| 93 | +(defn- pipe [^Process proc in stream ios] |
93 | 94 | ;; we really do want system-default encoding here |
94 | | - (with-open [^java.io.Reader in (-> in InputStreamReader. BufferedReader.)] |
95 | | - (loop [buf (char-array 1024)] |
| 95 | + (with-open [^Reader in (-> in InputStreamReader. BufferedReader.)] |
| 96 | + (loop [buf (char-array (* 64 1024))] |
96 | 97 | (when (alive? proc) |
97 | 98 | (try |
98 | 99 | (let [len (.read in buf)] |
99 | 100 | (when-not (neg? len) |
100 | | - (.write out buf 0 len) |
101 | | - (.flush out))) |
| 101 | + (try |
| 102 | + (let [{:strs [repl data]} (json/read-str (String. buf)) |
| 103 | + stream (or (.get ios repl) stream)] |
| 104 | + (.write stream data 0 (.length ^String data)) |
| 105 | + (.flush stream)) |
| 106 | + (catch Throwable _ |
| 107 | + (.write stream buf 0 len) |
| 108 | + (.flush stream))))) |
102 | 109 | (catch IOException e |
103 | 110 | (when (and (alive? proc) (not (.contains (.getMessage e) "Stream closed"))) |
104 | 111 | (.printStackTrace e *err*)))) |
|
119 | 126 | (defn setup |
120 | 127 | ([repl-env] (setup repl-env nil)) |
121 | 128 | ([{:keys [host port socket state] :as repl-env} opts] |
| 129 | + (let [tname (.getName (Thread/currentThread))] |
| 130 | + (.put outs tname *out*) |
| 131 | + (.put errs tname *err*)) |
122 | 132 | (locking lock |
123 | 133 | (when-not @socket |
124 | | - (let [output-dir (io/file (util/output-directory opts)) |
| 134 | + (let [out *out* |
| 135 | + err *err* |
| 136 | + output-dir (io/file (util/output-directory opts)) |
125 | 137 | _ (.mkdirs output-dir) |
126 | 138 | of (io/file output-dir "node_repl.js") |
127 | 139 | _ (spit of |
128 | 140 | (string/replace (slurp (io/resource "cljs/repl/node_repl.js")) |
129 | 141 | "var PORT = 5001;" |
130 | 142 | (str "var PORT = " (:port repl-env) ";"))) |
131 | 143 | proc (.start (build-process opts repl-env of)) |
132 | | - _ (do (.start (Thread. (bound-fn [] (pipe proc (.getInputStream proc) *out*)))) |
133 | | - (.start (Thread. (bound-fn [] (pipe proc (.getErrorStream proc) *err*))))) |
| 144 | + _ (do (.start (Thread. (bound-fn [] (pipe proc (.getInputStream proc) out outs)))) |
| 145 | + (.start (Thread. (bound-fn [] (pipe proc (.getErrorStream proc) err errs))))) |
134 | 146 | env (ana/empty-env) |
135 | 147 | core (io/resource "cljs/core.cljs") |
136 | 148 | ;; represent paths as vectors so we can emit JS arrays, this is to |
|
209 | 221 | (node-eval repl-env |
210 | 222 | (str "goog.global.CLOSURE_UNCOMPILED_DEFINES = " |
211 | 223 | (json/write-str (:closure-defines opts)) ";"))))) |
212 | | - (.put outs (.getName (Thread/currentThread)) *out*) |
213 | 224 | (swap! state update :listeners inc))) |
214 | 225 |
|
215 | 226 | (defrecord NodeEnv [host port path socket proc state] |
|
229 | 240 | (load-javascript this provides url)) |
230 | 241 | (-tear-down [this] |
231 | 242 | (swap! state update :listeners dec) |
| 243 | + (let [tname (Thread/currentThread)] |
| 244 | + (.remove outs tname) |
| 245 | + (.remove errs tname)) |
232 | 246 | (locking lock |
233 | 247 | (when (zero? (:listeners @state)) |
234 | 248 | (let [sock @socket] |
|
0 commit comments