From 20fab96ed3a62cece0ef421cbc15fc13813a8e6f Mon Sep 17 00:00:00 2001
From: Jim Blomo <jim.blomo+github@gmail.com>
Date: Mon, 28 May 2012 16:49:15 -0700
Subject: [PATCH] CLJ-862 bound number of threads run by pmap

---
 src/clj/clojure/core.clj               |   23 ++++++++++++++---------
 test/clojure/test_clojure/parallel.clj |   11 +++++++++++
 2 files changed, 25 insertions(+), 9 deletions(-)

diff --git a/src/clj/clojure/core.clj b/src/clj/clojure/core.clj
index e05a263..dce049a 100644
--- a/src/clj/clojure/core.clj
+++ b/src/clj/clojure/core.clj
@@ -6215,21 +6215,26 @@
   [^java.util.concurrent.Future f] (.isCancelled f))
 
 (defn pmap
-  "Like map, except f is applied in parallel. Semi-lazy in that the
-  parallel computation stays ahead of the consumption, but doesn't
-  realize the entire result unless required. Only useful for
-  computationally intensive functions where the time of f dominates
-  the coordination overhead."
+  "Like map, except f is applied in parallel. Semi-lazy in that the parallel
+  computation stays ahead of the consumption, but doesn't realize the entire
+  result. If laziness is not required, consider using reducers/map. Only useful
+  for computationally intensive functions where the time of f dominates the
+  coordination overhead."
   {:added "1.0"
    :static true}
   ([f coll]
-   (let [n (+ 2 (.. Runtime getRuntime availableProcessors))
-         rets (map #(future (f %)) coll)
+   (let [f (binding-conveyor-fn f)
+         pool clojure.lang.Agent/pooledExecutor 
+         submit (fn [e]
+                  (let [task #(f e)] ; bind to task for type hint
+                    (.submit pool ^Callable task)))
+         rets (map submit coll)
+         n (.getMaximumPoolSize pool)
          step (fn step [[x & xs :as vs] fs]
                 (lazy-seq
                  (if-let [s (seq fs)]
-                   (cons (deref x) (step xs (rest s)))
-                   (map deref vs))))]
+                   (cons (.get x) (step xs (rest s)))
+                   (map #(.get %) vs))))]
      (step rets (drop n rets))))
   ([f coll & colls]
    (let [step (fn step [cs]
diff --git a/test/clojure/test_clojure/parallel.clj b/test/clojure/test_clojure/parallel.clj
index fb98d60..4ac5aec 100644
--- a/test/clojure/test_clojure/parallel.clj
+++ b/test/clojure/test_clojure/parallel.clj
@@ -27,3 +27,14 @@
   ;; regression fixed in r1218; was OutOfMemoryError
   (is (= '(1) (pmap inc [0]))))
 
+(deftest pmap-latency
+  (let [c (range 10)
+        f #(do (Thread/sleep 50) %)
+        nano-timed (fn [s]
+                     (let [start (System/nanoTime)]
+                       (doall s)
+                       (- (System/nanoTime) start)))]
+    (is (> (nano-timed (map f c)) (nano-timed (pmap f c)))
+        "pmap latency should be lower")))
+
+
-- 
1.7.1

