Skip to content

Commit 9eed066

Browse files
committed
Merge branch 'node-socket-repl-stdios'
2 parents eb7b5dd + 84004ff commit 9eed066

2 files changed

Lines changed: 70 additions & 36 deletions

File tree

src/main/clojure/cljs/repl/node.clj

Lines changed: 46 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,18 @@
1818
[clojure.data.json :as json])
1919
(:import [java.net Socket]
2020
[java.lang StringBuilder]
21-
[java.io File BufferedReader BufferedWriter
22-
Writer InputStreamReader IOException]
21+
[java.io File Reader BufferedReader BufferedWriter
22+
InputStreamReader IOException]
2323
[java.lang ProcessBuilder Process]
24-
[java.util.concurrent ConcurrentHashMap]))
24+
[java.util.concurrent ConcurrentHashMap LinkedBlockingQueue]))
2525

2626
(def lock (Object.))
27+
(def results (ConcurrentHashMap.))
2728
(def outs (ConcurrentHashMap.))
29+
(def errs (ConcurrentHashMap.))
30+
31+
(defn thread-name []
32+
(.getName (Thread/currentThread)))
2833

2934
(defn create-socket [^String host port]
3035
(let [socket (Socket. host (int port))
@@ -42,29 +47,23 @@
4247
(.write out (int 0)) ;; terminator
4348
(.flush out))
4449

45-
(defn read-response [^BufferedReader in]
50+
(defn ^String read-response [^BufferedReader in]
4651
(let [sb (StringBuilder.)]
4752
(loop [sb sb c (.read in)]
48-
(cond
49-
(= c 1) (let [ret (str sb)]
50-
(print ret)
51-
(recur (StringBuilder.) (.read in)))
52-
(= c 0) (str sb)
53-
:else (do
54-
(.append sb (char c))
55-
(recur sb (.read in)))))))
53+
(case c
54+
-1 (throw (IOException. "Stream closed"))
55+
0 (str sb)
56+
(do
57+
(.append sb (char c))
58+
(recur sb (.read in)))))))
5659

5760
(defn node-eval
5861
"Evaluate a JavaScript string in the Node REPL process."
5962
[repl-env js]
60-
(let [{:keys [in out]} @(:socket repl-env)]
61-
;; escape backslash for Node.js under Windows
62-
(write out
63-
(json/write-str
64-
{"repl" (.getName (Thread/currentThread))
65-
"form" js}))
66-
(let [result (json/read-str
67-
(read-response in) :key-fn keyword)]
63+
(let [tname (thread-name)
64+
{:keys [out]} @(:socket repl-env)]
65+
(write out (json/write-str {:type "eval" :repl tname :form js}))
66+
(let [result (.take ^LinkedBlockingQueue (.get results tname))]
6867
(condp = (:status result)
6968
"success"
7069
{:status :success
@@ -89,20 +88,26 @@
8988
(defn- alive? [proc]
9089
(try (.exitValue proc) false (catch IllegalThreadStateException _ true)))
9190

92-
(defn- pipe [^Process proc in ^Writer out]
91+
(defn- event-loop [^Process proc in]
9392
;; we really do want system-default encoding here
94-
(with-open [^java.io.Reader in (-> in InputStreamReader. BufferedReader.)]
95-
(loop [buf (char-array 1024)]
96-
(when (alive? proc)
93+
(while (alive? proc)
94+
(try
95+
(let [res (read-response in)]
9796
(try
98-
(let [len (.read in buf)]
99-
(when-not (neg? len)
100-
(.write out buf 0 len)
101-
(.flush out)))
102-
(catch IOException e
103-
(when (and (alive? proc) (not (.contains (.getMessage e) "Stream closed")))
104-
(.printStackTrace e *err*))))
105-
(recur buf)))))
97+
(let [{:keys [type repl value] :or {repl "main"} :as event}
98+
(json/read-str res :key-fn keyword)]
99+
(case type
100+
"result"
101+
(.offer (.get results repl) event)
102+
(when-let [stream (.get (if (= type "out") outs errs) repl)]
103+
(.write stream value 0 (.length ^String value))
104+
(.flush stream))))
105+
(catch Throwable _
106+
(.write *out* res 0 (.length res))
107+
(.flush *out*))))
108+
(catch IOException e
109+
(when (and (alive? proc) (not (.contains (.getMessage e) "Stream closed")))
110+
(.printStackTrace e *err*))))))
106111

107112
(defn- build-process
108113
[opts repl-env input-src]
@@ -119,6 +124,10 @@
119124
(defn setup
120125
([repl-env] (setup repl-env nil))
121126
([{:keys [host port socket state] :as repl-env} opts]
127+
(let [tname (.getName (Thread/currentThread))]
128+
(.put results tname (LinkedBlockingQueue.))
129+
(.put outs tname *out*)
130+
(.put errs tname *err*))
122131
(locking lock
123132
(when-not @socket
124133
(let [output-dir (io/file (util/output-directory opts))
@@ -129,8 +138,6 @@
129138
"var PORT = 5001;"
130139
(str "var PORT = " (:port repl-env) ";")))
131140
proc (.start (build-process opts repl-env of))
132-
_ (do (.start (Thread. (bound-fn [] (pipe proc (.getInputStream proc) *out*))))
133-
(.start (Thread. (bound-fn [] (pipe proc (.getErrorStream proc) *err*)))))
134141
env (ana/empty-env)
135142
core (io/resource "cljs/core.cljs")
136143
;; represent paths as vectors so we can emit JS arrays, this is to
@@ -150,6 +157,7 @@
150157
(if @socket
151158
(recur (read-response (:in @socket)))
152159
(recur nil))))
160+
(.start (Thread. (bound-fn [] (event-loop proc (:in @socket)))))
153161
;; compile cljs.core & its dependencies, goog/base.js must be available
154162
;; for bootstrap to load, use new closure/compile as it can handle
155163
;; resources in JARs
@@ -209,7 +217,6 @@
209217
(node-eval repl-env
210218
(str "goog.global.CLOSURE_UNCOMPILED_DEFINES = "
211219
(json/write-str (:closure-defines opts)) ";")))))
212-
(.put outs (.getName (Thread/currentThread)) *out*)
213220
(swap! state update :listeners inc)))
214221

215222
(defrecord NodeEnv [host port path socket proc state]
@@ -229,6 +236,10 @@
229236
(load-javascript this provides url))
230237
(-tear-down [this]
231238
(swap! state update :listeners dec)
239+
(let [tname (thread-name)]
240+
(.remove results tname)
241+
(.remove outs tname)
242+
(.remove errs tname))
232243
(locking lock
233244
(when (zero? (:listeners @state))
234245
(let [sock @socket]

src/main/clojure/cljs/repl/node_repl.js

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ var net = require("net");
1313
var vm = require("vm");
1414
var dom = require("domain").create();
1515
var PORT = 5001;
16+
var repl = null;
1617

1718
try {
1819
require("source-map-support").install();
@@ -22,14 +23,30 @@ try {
2223
var server = net.createServer(function (socket) {
2324
var buffer = "",
2425
ret = null,
25-
repl = null,
2626
err = null;
2727

2828
socket.write("ready");
2929
socket.write("\0");
3030

3131
socket.setEncoding("utf8");
3232

33+
process.stdout.write = function(chunk, encoding, fd) {
34+
var args = Array.prototype.slice.call(arguments, 0);
35+
args[0] = JSON.stringify({type: "out", repl: repl, value: chunk});
36+
socket.write.apply(socket, args);
37+
socket.write("\0");
38+
};
39+
40+
process.stderr.write = (function(write) {
41+
return function(chunk, encoding, fd) {
42+
var args = Array.prototype.slice.call(arguments, 0);
43+
args[0] = JSON.stringify({type: "err", repl: repl, value: chunk});
44+
socket.write.apply(socket, args);
45+
socket.write("\0");
46+
};
47+
})(process.stderr.write);
48+
49+
3350
dom.on("error", function(ue) {
3451
console.error(ue.stack);
3552
});
@@ -66,16 +83,22 @@ var server = net.createServer(function (socket) {
6683

6784
if(err) {
6885
socket.write(JSON.stringify({
86+
type: "result",
87+
repl: repl,
6988
status: "exception",
7089
value: err.stack
7190
}));
7291
} else if(ret !== undefined && ret !== null) {
7392
socket.write(JSON.stringify({
93+
type: "result",
94+
repl: repl,
7495
status: "success",
7596
value: ret.toString()
7697
}));
7798
} else {
7899
socket.write(JSON.stringify({
100+
type: "result",
101+
repl: repl,
79102
status: "success",
80103
value: null
81104
}));

0 commit comments

Comments
 (0)