From 966cb451f5c86ac37d921a12c5a7581b857683a4 Mon Sep 17 00:00:00 2001
From: Alan Malloy <alan@malloys.org>
Date: Thu, 10 May 2012 19:39:36 -0700
Subject: [PATCH] CLJ-993 - implement range and iterate as reducers.

I also generalized foldvec to work for anything you can split in half,
and used that implementation for Range.

In order to create an object which is both a lazy sequence and a
reducible source, I needed to add a macro named defseq to core_deftype.
It is basically a reimplementation of clojure.lang.LazySeq as a clojure
macro, so that I can "mix in" lazy-sequence functions into a new class
with whatever methods are needed for reducing and folding.
---
 src/clj/clojure/core.clj               |   87 ++++++++++++++++--------
 src/clj/clojure/core/reducers.clj      |  116 ++++++++++++++++++++++++++------
 src/clj/clojure/core_deftype.clj       |   94 ++++++++++++++++++++++++++
 test/clojure/test_clojure/reducers.clj |   38 ++++++++++
 4 files changed, 285 insertions(+), 50 deletions(-)

diff --git a/src/clj/clojure/core.clj b/src/clj/clojure/core.clj
index afeb3b1..d0865b6 100644
--- a/src/clj/clojure/core.clj
+++ b/src/clj/clojure/core.clj
@@ -2591,34 +2591,8 @@
    :deprecated "1.3"}
   [n x] (take n (repeat x)))
 
-(defn iterate
-  "Returns a lazy sequence of x, (f x), (f (f x)) etc. f must be free of side-effects"
-  {:added "1.0"
-   :static true}
-  [f x] (cons x (lazy-seq (iterate f (f x)))))
-
-(defn range 
-  "Returns a lazy seq of nums from start (inclusive) to end
-  (exclusive), by step, where start defaults to 0, step to 1, and end
-  to infinity."
-  {:added "1.0"
-   :static true}
-  ([] (range 0 Double/POSITIVE_INFINITY 1))
-  ([end] (range 0 end 1))
-  ([start end] (range start end 1))
-  ([start end step]
-   (lazy-seq
-    (let [b (chunk-buffer 32)
-          comp (if (pos? step) < >)]
-      (loop [i start]
-        (if (and (< (count b) 32)
-                 (comp i end))
-          (do
-            (chunk-append b i)
-            (recur (+ i step)))
-          (chunk-cons (chunk b) 
-                      (when (comp i end) 
-                        (range i end step)))))))))
+;; needs to be a deftype so that it can be extended to reduce/fold
+(def range)
 
 (defn merge
   "Returns a map that consists of the rest of the maps conj-ed onto
@@ -6329,7 +6303,64 @@
    :static true}
   [promise val] (promise val))
 
+(defseq Iteration [f seed ^:unsynchronized-mutable cached-seq]
+  clojure.lang.Seqable
+  (seq [this]
+    (locking this
+      (first (or cached-seq
+                 (set! cached-seq
+                       [(cons seed
+                              (Iteration. f (f seed) nil))])))))
+  clojure.lang.IPending
+  (isRealized [this]
+    (locking this
+      (not (nil? cached-seq)))))
 
+(defn iterate
+  "Returns a lazy sequence of x, (f x), (f (f x)) etc. f must be free of side-effects"
+  {:added "1.0"
+   :static true}
+  [f x]
+  (Iteration. f x nil))
+
+(defseq Range [start end step ^:unsynchronized-mutable cached-seq]
+  clojure.lang.Counted
+  (count [this]
+    (int (Math/ceil (/ (- end start) step))))
+
+  clojure.lang.Seqable
+  (seq [this]
+    (locking this
+      (first
+       (or cached-seq
+           (set! cached-seq
+                 [(let [cmp (if (pos? step) < >)]
+                    (when (cmp start end)
+                      (let [b (chunk-buffer 32)]
+                        (loop [i start]
+                          (if (and (< (count b) 32)
+                                   (cmp i end))
+                            (do
+                              (chunk-append b i)
+                              (recur (+ i step)))
+                            (chunk-cons (chunk b)
+                                        (Range. i end step nil)))))))])))))
+
+  clojure.lang.IPending
+  (isRealized [this]
+    (locking this
+      (not (nil? cached-seq)))))
+
+(defn range
+ "Returns a lazy seq of nums from start (inclusive) to end
+  (exclusive), by step, where start defaults to 0, step to 1, and end
+  to infinity."
+ {:added "1.0"
+  :static true}
+ ([] (range 0 Double/POSITIVE_INFINITY 1))
+ ([end] (range 0 end 1))
+ ([start end] (range start end 1))
+ ([start end step] (Range. start end step nil)))
 
 (defn flatten
   "Takes any nested combination of sequential things (lists, vectors,
diff --git a/src/clj/clojure/core/reducers.clj b/src/clj/clojure/core/reducers.clj
index efe6118..77d51d1 100644
--- a/src/clj/clojure/core/reducers.clj
+++ b/src/clj/clojure/core/reducers.clj
@@ -13,8 +13,9 @@
       dependency info."
       :author "Rich Hickey"}
   clojure.core.reducers
-  (:refer-clojure :exclude [reduce map mapcat filter remove take take-while drop flatten])
-  (:require [clojure.walk :as walk]))
+  (:refer-clojure :exclude [reduce map mapcat filter remove take take-while drop drop-while flatten])
+  (:require [clojure.walk :as walk])
+  (:import (clojure.core Iteration Range)))
 
 (alias 'core 'clojure.core)
 (set! *warn-on-reflection* true)
@@ -129,6 +130,38 @@
       (coll-fold [_ n combinef reducef]
                  (coll-fold coll n combinef (xf reducef))))))
 
+(defn- reduce-impl
+  "Creates an implementation of CollReduce using the given reducer.
+  The two-argument implementation of reduce will call f1 with no args
+  to get an init value, and then forward on to your three-argument version."
+  [reducer]
+  {:coll-reduce (fn
+                  ([coll f1] (reducer coll f1 (f1)))
+                  ([coll f1 init] (reducer coll f1 init)))})
+
+(defn- fold-by-halves
+  "Creates an implementation of CollFold which works by halving the collection
+  until it is smaller than the requested size, and folding each subsection.
+  halving-fn will be passed as input a collection and its size (so you need not
+  recompute the size); it should return the left and right halves of the
+  collection as a pair. Those halves will normally be of the same type as the
+  parent collection, but anything foldable is sufficient."
+  [halving-fn]
+  {:coll-fold
+   (fn [coll n combinef reducef]
+     (let [size (count coll)]
+       (cond
+         (zero? size) (combinef)
+         (<= size n) (reduce reducef (combinef) coll)
+         :else
+         (let [[left right] (halving-fn coll size)
+               fc (fn [child] #(coll-fold child n combinef reducef))]
+           (fjinvoke
+            #(let [f1 (fc left)
+                   t2 (fjtask (fc right))]
+               (fjfork t2)
+               (combinef (f1) (fjjoin t2))))))))})
+
 (defn- do-curried
   [name doc meta args body]
   (let [cargs (vec (butlast args))]
@@ -254,6 +287,20 @@
               (f1 ret k v)
               ret)))))))
 
+(defcurried drop-while
+  "Skips values from the reduction of coll while (pred val) returns logical true."
+  {:added "1.5"}
+  [pred coll]
+  (reducer coll
+    (fn [f1]
+      (let [keeping? (atom false)]
+        (rfn [f1 k]
+          ([ret k v]
+             (if (or @keeping?
+                     (reset! keeping? (not (pred k v))))
+               (f1 ret k v)
+               ret)))))))
+
 ;;do not construct this directly, use cat
 (deftype Cat [cnt left right]
   clojure.lang.Counted
@@ -300,6 +347,31 @@
       :else
       (Cat. (+ (count left) (count right)) left right))))
 
+(extend Range
+  clojure.core.protocols/CollReduce
+  (reduce-impl (fn [^Range coll f1 init]
+                 (let [step (.-step coll)
+                       end (.-end coll)
+                       cmp (if (pos? step) < >)
+                       done? (fn [x] (cmp x end))]
+                   (loop [ret init, i (.-start coll)]
+                     (if (reduced? ret)
+                       @ret
+                       (if (done? i)
+                         (recur (f1 ret i) (+ i step))
+                         ret))))))
+
+  CollFold
+  (fold-by-halves (fn [^Range r size]
+                    (let [start (.-start r)
+                          step (.-step r)
+                          end (.-end r)
+                          split (-> (quot size 2)
+                                    (* step)
+                                    (+ start))]
+                      [(Range. start split step nil)
+                       (Range. split end step nil)]))))
+
 (defn append!
   ".adds x to acc and returns acc"
   {:added "1.5"}
@@ -322,22 +394,27 @@
     ([] (ctor))
     ([a b] (op a b))))
 
+
+;;;;;;;;; fold/reduce impls for lazy sequence deftypes in clojure.core ;;;;;;;;;
+(extend-type clojure.core.Iteration
+  clojure.core.protocols/CollReduce
+  (coll-reduce [coll f1] (clojure.core.protocols/coll-reduce coll f1 (f1)))
+  (coll-reduce [coll f1 init]
+    (let [f (.-f coll)]
+      (loop [seed (.-seed coll), ret (f1 init seed)]
+        (if (reduced? ret)
+          @ret
+          (let [next (f seed)]
+            (recur next (f1 ret next))))))))
+
 ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; fold impls ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
-(defn- foldvec
-  [v n combinef reducef]
-  (cond
-   (empty? v) (combinef)
-   (<= (count v) n) (reduce reducef (combinef) v)
-   :else
-   (let [split (quot (count v) 2)
-         v1 (subvec v 0 split)
-         v2 (subvec v split (count v))
-         fc (fn [child] #(foldvec child n combinef reducef))]
-     (fjinvoke
-      #(let [f1 (fc v1)
-             t2 (fjtask (fc v2))]
-         (fjfork t2)
-         (combinef (f1) (fjjoin t2)))))))
+
+(extend clojure.lang.IPersistentVector
+  CollFold
+  (fold-by-halves (fn [v size]
+                    (let [split (quot size 2)]
+                      [(subvec v 0 split)
+                       (subvec v split size)]))))
 
 (extend-protocol CollFold
  Object
@@ -346,11 +423,6 @@
   ;;can't fold, single reduce
   (reduce reducef (combinef) coll))
 
- clojure.lang.IPersistentVector
- (coll-fold
-  [v n combinef reducef]
-  (foldvec v n combinef reducef))
-
  clojure.lang.PersistentHashMap
  (coll-fold
   [m n combinef reducef]
diff --git a/src/clj/clojure/core_deftype.clj b/src/clj/clojure/core_deftype.clj
index cca4501..71a2f19 100644
--- a/src/clj/clojure/core_deftype.clj
+++ b/src/clj/clojure/core_deftype.clj
@@ -447,6 +447,100 @@
        ~(build-positional-factory gname classname fields)
        ~classname)))
 
+(defmacro defseq
+  "Emit a deftype for a type which should be viewed as an ISeq, by delegating to
+  its implementation of Seqable.
+
+  The Seqable implementation should, in order to behave as expected for a lazy
+  sequence, cache its return value in a thread-safe way. An implementation of
+  IPending must also be provided, which should determine whether a call to seq
+  would cause computation to be done."
+  {:added "1.5"}
+  [name [& fields] & opts+specs]
+  `(deftype ~name [~@fields]
+     java.io.Serializable
+     clojure.lang.IHashEq
+     (hasheq [this#]
+       (if-let [s# (.seq this#)]
+         (clojure.lang.Util/hasheq s#)
+         1))
+
+     clojure.lang.ISeq
+     (first [this#] (.first (.seq this#)))
+     (next [this#] (.next (.seq this#)))
+     (more [this#] (.more (.seq this#)))
+     (cons [this# x#] (.cons (.seq this#) x#))
+
+     clojure.lang.IPersistentCollection
+     (empty [this#] ())
+     (equiv [this# other#]
+       (if-let [s# (.seq this#)]
+         (.equiv s# other#)
+         (and (or (sequential? other#)
+                  (instance? other# java.util.List))
+              (nil? (seq other#)))))
+
+     Object
+     (equals [this# other#]
+       (.equiv this# other#))
+     (hashCode [this#]
+       (if-let [s# (.seq this#)]
+         (clojure.lang.Util/hash s#)
+         1))
+
+     java.util.List
+     (size [this#]
+       (count this#))
+     (isEmpty [this#]
+       (nil? (.seq this#)))
+     (toArray [this#]
+       (clojure.lang.RT/seqToArray (.seq this#)))
+     (toArray [this# ary#]
+       (clojure.lang.RT/seqToPassedArray (.seq this#) ary#))
+     (iterator [this#]
+       (clojure.lang.SeqIterator. (.seq this#)))
+     (subList [this# from# to#]
+       (-> (java.util.ArrayList. this#)
+           (.subList from# to#)))
+     (indexOf [this# o#]
+       (loop [i# 0, s# (.seq this#)]
+         (cond (nil? s#) -1
+               (= (first s#) o#) i#
+               :else (recur (inc i#) (next s#)))))
+     (lastIndexOf [this# o#]
+       (-> (java.util.ArrayList. this#)
+           (.lastIndexOf [this# o#])))
+     (contains [this# o#]
+       (some #(= o# %) (.seq this#)))
+     (containsAll [this# coll#]
+       (every? #(.contains this# %) coll#))
+     (listIterator [this#]
+       (.listIterator (java.util.ArrayList. this#)))
+     (listIterator [this# idx#]
+       (.listIterator (java.util.ArrayList. this#) idx#))
+     (get [this# idx#] (nth this# idx#))
+
+     ~@(for [[name n-args] '{set 2, add 2, addAll 1,
+                             removeAll 1, retainAll 1, clear 0}]
+         `(~name [~@(repeatedly (inc n-args) gensym)]
+                 (throw (UnsupportedOperationException.))))
+     (^boolean remove [this# ^Object x#]
+       (throw (UnsupportedOperationException.)))
+     (remove [this# ^int idx#]
+       (throw (UnsupportedOperationException.)))
+
+     ;; this implementation will be ignored if the caller provides an
+     ;; implementation of Counted, because theirs gets merged on top
+     ;; of ours. so this will really just be a fallback for if they
+     ;; don't implement a fast count.
+     clojure.lang.Counted
+     (count [this#]
+       (loop [i# 0, s# (.seq this#)]
+         (cond (nil? s#) i#
+               (counted? s#) (+ i# (count s#))
+               :else (recur (inc i#) (.next s#)))))
+     ~@opts+specs))
+
 ;;;;;;;;;;;;;;;;;;;;;;; protocols ;;;;;;;;;;;;;;;;;;;;;;;;
 
 (defn- expand-method-impl-cache [^clojure.lang.MethodImplCache cache c f]
diff --git a/test/clojure/test_clojure/reducers.clj b/test/clojure/test_clojure/reducers.clj
index 1123c36..9943f88 100644
--- a/test/clojure/test_clojure/reducers.clj
+++ b/test/clojure/test_clojure/reducers.clj
@@ -39,3 +39,41 @@
 (defequivtest test-filter
   [filter r/filter #(into [] %)]
   [even? odd? #(< 200 %) identity])
+
+(defequivtest test-drop-while
+  [drop-while r/drop-while #(into [] %)]
+  [neg? pos? #(< % 200) #(> % 200) #{-100}])
+
+(deftest test-iterate
+  (testing "reducible"
+   (is (= [100000]
+          (->> (iterate inc 0)
+               (r/drop 1e5)
+               (r/take 1)
+               (into [])))))
+  (testing "lazy-seq"
+   (is (= 5 (first
+             (drop 5 (iterate inc 0)))))))
+
+(deftest test-range
+  (is (= (take 10000 (range))
+         (->> (range)
+              (r/take 10000)
+              (into []))))
+  (doseq [argvec [[7000]
+                  [0 5736]
+                  [10000 21 -2]
+                  [0 3710 2/3]
+                  [1 -8642 -2]]]
+    (let [reduce-version (apply range argvec)
+          seq-version (seq reduce-version)
+          reduced-vector (into [] reduce-version)]
+      (is (counted? reduce-version))
+      (is (chunked-seq? seq-version))
+      (is (= seq-version reduced-vector))
+      (is (= (count reduce-version)
+             (count reduced-vector)))
+      (let [seq-sum (reduce + seq-version)
+            folded-sum (r/fold + reduce-version)
+            vector-fold-sum (r/fold + reduced-vector)]
+        (is (= seq-sum folded-sum vector-fold-sum))))))
-- 
1.7.4.1

