From 5bce6e99687fb733614696cd5978e8a7a5539aca Mon Sep 17 00:00:00 2001 From: Colin Jones Date: Fri, 8 Mar 2013 15:14:54 -0600 Subject: [PATCH] Use a queue for stdin, to avoid PipedReader/Writer Fixes NREPL-39 --- .../clojure/tools/nrepl/middleware/session.clj | 87 +++++++++++++------- src/test/clojure/clojure/tools/nrepl_test.clj | 33 +++++--- 2 files changed, 78 insertions(+), 42 deletions(-) diff --git a/src/main/clojure/clojure/tools/nrepl/middleware/session.clj b/src/main/clojure/clojure/tools/nrepl/middleware/session.clj index e95dc34..7e81df8 100644 --- a/src/main/clojure/clojure/tools/nrepl/middleware/session.clj +++ b/src/main/clojure/clojure/tools/nrepl/middleware/session.clj @@ -9,7 +9,8 @@ [clojure.tools.nrepl.transport :as t]) (:import clojure.tools.nrepl.transport.Transport (java.io PipedReader PipedWriter Reader Writer PrintWriter StringReader) - clojure.lang.LineNumberingPushbackReader)) + clojure.lang.LineNumberingPushbackReader + java.util.concurrent.LinkedBlockingQueue)) (def ^{:private true} sessions (atom {})) @@ -20,6 +21,7 @@ ;; how best to make it configurable though... (def ^{:dynamic true :private true} *out-limit* 1024) +(def ^{:dynamic true :private true} *skipping-eol* false) (defn- session-out "Returns a PrintWriter suitable for binding as *out* or *err*. All of @@ -56,28 +58,52 @@ {:status :need-input} message on the provided transport so the client/user can provide content to be read." [session-id transport] - (let [request-input (fn [^PipedReader r] - (when-not (.ready r) - (t/send transport - (response-for *msg* :session session-id - :status :need-input)))) - writer (PipedWriter.) + (let [input-queue (LinkedBlockingQueue.) + request-input (fn [] + (cond (> (.size input-queue) 0) + (.take input-queue) + *skipping-eol* + nil + :else + (do + (t/send transport + (response-for *msg* :session session-id + :status :need-input)) + (.take input-queue)))) + do-read (fn [buf off len] + (locking input-queue + (loop [i off] + (cond + (>= i (+ off len)) + (+ off len) + (.peek input-queue) + (do (aset-char buf i (char (.take input-queue))) + (recur (inc i))) + :else + i)))) reader (LineNumberingPushbackReader. - (proxy [PipedReader] [writer] - (close []) + (proxy [Reader] [] + (close [] (.clear input-queue)) (read - ([] (request-input this) - (let [^Reader this this] (proxy-super read))) - ([x] (request-input this) - (let [^Reader this this] - (if (instance? java.nio.CharBuffer x) - (proxy-super read ^java.nio.CharBuffer x) - (proxy-super read ^chars x)))) - ([buf off len] - (let [^Reader this this] - (request-input this) - (proxy-super read buf off len))))))] - [reader writer])) + ([] + (let [^Reader this this] (proxy-super read))) + ([x] + (let [^Reader this this] + (if (instance? java.nio.CharBuffer x) + (proxy-super read ^java.nio.CharBuffer x) + (proxy-super read ^chars x)))) + ([^chars buf off len] + (if (zero? len) + -1 + (let [first-character (request-input)] + (if (or (nil? first-character) (= first-character -1)) + -1 + (do + (aset-char buf off (char first-character)) + (- (do-read buf (inc off) (dec len)) + off)))))))))] + {:input-queue input-queue + :stdin-reader reader})) (defn- create-session "Returns a new atom containing a map of bindings as per @@ -90,10 +116,10 @@ (clojure.main/with-bindings (let [id (uuid) out (session-out :out id transport) - [in in-writer] (session-in id transport)] + {:keys [input-queue stdin-reader]} (session-in id transport)] (binding [*out* out *err* (session-out :err id transport) - *in* in + *in* stdin-reader *ns* (create-ns 'user) *out-limit* (or (baseline-bindings #'*out-limit*) 1024) ; clojure.test captures *out* at load-time, so we need to make sure @@ -105,8 +131,8 @@ ; don't capture that *agent* binding for userland REPL sessions (atom (merge baseline-bindings (dissoc (get-thread-bindings) #'*agent*)) :meta {:id id - :stdin-reader in - :stdin-writer in-writer})))))) + :stdin-reader stdin-reader + :input-queue input-queue})))))) (defn- register-session "Registers a new session containing the baseline bindings contained in the @@ -194,13 +220,14 @@ (fn [{:keys [op stdin session transport] :as msg}] (cond (= op "eval") - (let [s (-> session meta ^LineNumberingPushbackReader (:stdin-reader))] - (when (.ready s) - (clojure.main/skip-if-eol s)) + (let [in (-> (meta session) ^LineNumberingPushbackReader (:stdin-reader))] + (binding [*skipping-eol* true] + (clojure.main/skip-if-eol in)) (h msg)) (= op "stdin") - (do - (-> session meta ^Writer (:stdin-writer) (.write ^String stdin)) + (let [q (-> (meta session) ^Writer (:input-queue))] + (locking q + (doseq [c stdin] (.put q c))) (t/send transport (response-for msg :status :done))) :else (h msg)))) diff --git a/src/test/clojure/clojure/tools/nrepl_test.clj b/src/test/clojure/clojure/tools/nrepl_test.clj index 1487f8c..054901e 100644 --- a/src/test/clojure/clojure/tools/nrepl_test.clj +++ b/src/test/clojure/clojure/tools/nrepl_test.clj @@ -163,7 +163,7 @@ (is (= history (repl-values sc-session "[*3 *2 *1]"))) (is (= history (repl-values sc-session "*1")))))) - + (testing "without a session id, REPL-bound vars like *1 have default values" (is (= [nil] (repl-values client "*1"))))) @@ -301,7 +301,7 @@ (let [server (server/start-server) transport (connect :port (:port server))] (transport/send transport {"op" "eval" "code" "(+ 1 1)"}) - + (let [reader (future (while true (transport/recv transport)))] (Thread/sleep 1000) (.close server) @@ -312,7 +312,7 @@ (is false "A reader started prior to the server closing should throw an error...") (catch Throwable e (is (disconnection-exception? e))))) - + (is (thrown? SocketException (transport/recv transport))) ;; TODO no idea yet why two sends are *sometimes* required to get a failure (try @@ -334,7 +334,7 @@ (is false "reads after the server is closed should fail") (catch Throwable t (is (disconnection-exception? t))))) - + ;; TODO as noted in transports-fail-on-disconnects, *sometimes* two sends are needed ;; to trigger an exception on send to an unavailable server (try (repl-eval session "(+ 1 1)") (catch Throwable t)) @@ -353,20 +353,29 @@ (def-repl-test request-multiple-read-newline-*in* (is (= '(:ohai) (response-values (for [resp (repl-eval session "(read)")] - (do - (when (-> resp :status set (contains? "need-input")) - (session {:op :stdin :stdin ":ohai\n"})) - resp))))) + (do + (when (-> resp :status set (contains? "need-input")) + (session {:op :stdin :stdin ":ohai\n"})) + resp))))) (session {:op :stdin :stdin "a\n"}) (is (= ["a"] (repl-values session "(read-line)")))) +(def-repl-test request-multiple-read-with-buffered-newline-*in* + (is (= '(:ohai) (response-values (for [resp (repl-eval session "(read)")] + (do + (when (-> resp :status set (contains? "need-input")) + (session {:op :stdin :stdin ":ohai\na\n"})) + resp))))) + + (is (= ["a"] (repl-values session "(read-line)")))) + (def-repl-test request-multiple-read-objects-*in* (is (= '(:ohai) (response-values (for [resp (repl-eval session "(read)")] - (do - (when (-> resp :status set (contains? "need-input")) - (session {:op :stdin :stdin ":ohai :kthxbai\n"})) - resp))))) + (do + (when (-> resp :status set (contains? "need-input")) + (session {:op :stdin :stdin ":ohai :kthxbai\n"})) + resp))))) (is (= [" :kthxbai"] (repl-values session "(read-line)")))) -- 1.7.7.5 (Apple Git-26)