Clojure

Consider integration with java.util.function interfaces

Details

  • Type: Enhancement Enhancement
  • Status: Open Open
  • Priority: Major Major
  • Resolution: Unresolved
  • Affects Version/s: Release 1.9
  • Fix Version/s: Release 1.10
  • Component/s: None
  • Labels:
  • Approval:
    Vetted

Description

Moving to Java 8 as a baseline allows us to consider built-in ties to critical java.util.Function interfaces like Function, Predicate, Supplier, etc. Needs assessment about what's possible and what automatic integration it would open for users.

https://docs.oracle.com/javase/8/docs/api/java/util/function/package-summary.html

Activity

Hide
Jason Whitlark added a comment - - edited
;; I dug this out of some scratch code experimenting with kafka streams.  All the reify's were filled with java8 lambdas in the original.

;; I'll dig up another example that shows examples using stuff from java.utils.funstion.* 

;;Some of this was lifted from a franzy example or something?

;; Note that, for example, 
;; https://kafka.apache.org/0102/javadoc/org/apache/kafka/streams/kstream/Predicate.html
;; is different from
;; https://docs.oracle.com/javase/8/docs/api/java/util/function/Predicate.html

(ns utils
  (:import (org.apache.kafka.streams.kstream Reducer KeyValueMapper ValueMapper Predicate))

(defmacro reducer [kv & body]
  `(reify Reducer
     (apply [_# ~(first kv) ~(second kv)]
       ~@body)))

;; public interface KeyValueMapper<K,V,R>
;; apply(K key, V value)
(defmacro kv-mapper [kv & body]
  `(reify KeyValueMapper
     (apply [_# ~(first kv) ~(second kv)]
       ~@body)))

;; public interface ValueMapper<V1,V2>
;; apply(V1 value)
(defmacro v-mapper [v & body]
  `(reify ValueMapper
     (apply [_# ~v]
       ~@body)))

(defmacro pred [kv & body]
  `(reify Predicate
     (test [_# ~(first kv) ~(second kv)]
       ~@body)))

;; I used it something like this:

(ns our-service.kafka-streams
  (:require
   [our-service.util :as k]
   [clojure.string :as str]
  (:import 
           (org.apache.kafka.streams StreamsConfig KafkaStreams KeyValue)
           (org.apache.kafka.streams.kstream KStreamBuilder ValueMapper)))

(defn create-word-count-topology []
  (let [builder (KStreamBuilder.)
        init-stream (.stream builder (into-array ["streams-str-input"]))
        wc (-> init-stream
            (.flatMapValues (k/v-mapper [& value]
                                        (str/split (apply str value) #"\s")))
            (.map (k/kv-mapper [k v]
                               (KeyValue/pair v v)))
            (.filter (k/pred [k v]
                             (println v)
                             (not= v "the")))
            (.groupByKey)
            (.count "CountStore")
            show-item
            ;; this needs to be mapValues
            (.mapValues (reify ValueMapper
                          (apply [_ v]
                            (println v)
                            (str v))))
            (.toStream)
            (.to "wordcount-output"))]
    [builder wc]))
Show
Jason Whitlark added a comment - - edited
;; I dug this out of some scratch code experimenting with kafka streams.  All the reify's were filled with java8 lambdas in the original.

;; I'll dig up another example that shows examples using stuff from java.utils.funstion.* 

;;Some of this was lifted from a franzy example or something?

;; Note that, for example, 
;; https://kafka.apache.org/0102/javadoc/org/apache/kafka/streams/kstream/Predicate.html
;; is different from
;; https://docs.oracle.com/javase/8/docs/api/java/util/function/Predicate.html

(ns utils
  (:import (org.apache.kafka.streams.kstream Reducer KeyValueMapper ValueMapper Predicate))

(defmacro reducer [kv & body]
  `(reify Reducer
     (apply [_# ~(first kv) ~(second kv)]
       ~@body)))

;; public interface KeyValueMapper<K,V,R>
;; apply(K key, V value)
(defmacro kv-mapper [kv & body]
  `(reify KeyValueMapper
     (apply [_# ~(first kv) ~(second kv)]
       ~@body)))

;; public interface ValueMapper<V1,V2>
;; apply(V1 value)
(defmacro v-mapper [v & body]
  `(reify ValueMapper
     (apply [_# ~v]
       ~@body)))

(defmacro pred [kv & body]
  `(reify Predicate
     (test [_# ~(first kv) ~(second kv)]
       ~@body)))

;; I used it something like this:

(ns our-service.kafka-streams
  (:require
   [our-service.util :as k]
   [clojure.string :as str]
  (:import 
           (org.apache.kafka.streams StreamsConfig KafkaStreams KeyValue)
           (org.apache.kafka.streams.kstream KStreamBuilder ValueMapper)))

(defn create-word-count-topology []
  (let [builder (KStreamBuilder.)
        init-stream (.stream builder (into-array ["streams-str-input"]))
        wc (-> init-stream
            (.flatMapValues (k/v-mapper [& value]
                                        (str/split (apply str value) #"\s")))
            (.map (k/kv-mapper [k v]
                               (KeyValue/pair v v)))
            (.filter (k/pred [k v]
                             (println v)
                             (not= v "the")))
            (.groupByKey)
            (.count "CountStore")
            show-item
            ;; this needs to be mapValues
            (.mapValues (reify ValueMapper
                          (apply [_ v]
                            (println v)
                            (str v))))
            (.toStream)
            (.to "wordcount-output"))]
    [builder wc]))
Hide
Ghadi Shayban added a comment -

The JLS infers lambda types by hunting for matching functional interfaces AKA "Single Abstract Method" classes [1] (whether they're interfaces or abstract classes.) We could have a reify-like helper that detects these classes [2]. You would have to hint the target class. We don't really need things that are both `IFn` and `j.u.f.Predicate`, etc.

(import '[java.util.function Predicate Consumer])

(let [orig [1 2 3]
      st (atom [])]
  (.forEach orig (jfn Consumer [x] (swap! st conj x)))
  (= @st orig))

[1] https://docs.oracle.com/javase/specs/jls/se8/html/jls-9.html#jls-9.8
[2] spike https://gist.github.com/ghadishayban/0ac41e81d4df02ff176c22d16ee8b972

Show
Ghadi Shayban added a comment - The JLS infers lambda types by hunting for matching functional interfaces AKA "Single Abstract Method" classes [1] (whether they're interfaces or abstract classes.) We could have a reify-like helper that detects these classes [2]. You would have to hint the target class. We don't really need things that are both `IFn` and `j.u.f.Predicate`, etc.
(import '[java.util.function Predicate Consumer])

(let [orig [1 2 3]
      st (atom [])]
  (.forEach orig (jfn Consumer [x] (swap! st conj x)))
  (= @st orig))
[1] https://docs.oracle.com/javase/specs/jls/se8/html/jls-9.html#jls-9.8 [2] spike https://gist.github.com/ghadishayban/0ac41e81d4df02ff176c22d16ee8b972
Hide
Jason Whitlark added a comment -

Well, that would be an improvement. The practical problem I run into is that I'm frequently deep in a fluent interface, and don't necessarily know the exact class. That said, it's usually only in a few places. Would it make sense to have a registry? Perhaps something like:

(auto-infer-lambda [java.util.function, org.apache.kafka.streams.kstream])

Show
Jason Whitlark added a comment - Well, that would be an improvement. The practical problem I run into is that I'm frequently deep in a fluent interface, and don't necessarily know the exact class. That said, it's usually only in a few places. Would it make sense to have a registry? Perhaps something like: (auto-infer-lambda [java.util.function, org.apache.kafka.streams.kstream])
Hide
Ghadi Shayban added a comment -

Do you ever use SAM classes that are abstract classes and not interfaces?

Show
Ghadi Shayban added a comment - Do you ever use SAM classes that are abstract classes and not interfaces?
Hide
Andrew Oberstar added a comment -

Here's an alternative approach in my ike.cljj library. This uses MethodHandles (i.e. java.lang.invoke package) instead of regular reflection. I'm not sure if I tested this on abstract classes yet.

The usage looks similar to what Ghadi posted:

(defsam my-sam
  java.util.function.Predicate
  [x]
  (= x "it matched"))

(-> (Stream/of "not a match" "it matched")
    (.filter my-sam)
    (.collect Collectors/toList)

(-> (IntStream/range 0 10)
    (.filter (sam* java.util.function.IntPredicate odd?))
    (.collect Collectors/toList)

It uses MethodHandleProxies.asInterfaceInstance to create a proxy instance of the interface that calls a method handle calling a Clojure function. It doesn't try to validate parameter counts, it just treats it as varargs delegating to IFn.applyTo(ISeq). Not sure if it's the most efficient but it was effective for my needs.

I think the LambdaMetaFactory may be the preferred way to meet this type of use case. It was harder for me to follow exactly how to use that though so I didn't end up looking to deep into it.

The main functional issue I have with my approach (and Ghadi's) is that you have to explcitly provide the interface you want to proxy. Java lambdas and Groovy Closures can be used against methods that expect a SAM and it just coerces based on what the method expects. Ideally this would be supported by Clojure too.

Instead of having to do:

(-> (IntStream/range 0 10)
    (.filter (sam* java.util.function.IntPredicate odd?))
    (.collect Collectors/toList)

I'd like to do this:

(-> (IntStream/range 0 10)
    (.filter odd?)
    (.collect Collectors/toList)
Show
Andrew Oberstar added a comment - Here's an alternative approach in my ike.cljj library. This uses MethodHandles (i.e. java.lang.invoke package) instead of regular reflection. I'm not sure if I tested this on abstract classes yet. The usage looks similar to what Ghadi posted:
(defsam my-sam
  java.util.function.Predicate
  [x]
  (= x "it matched"))

(-> (Stream/of "not a match" "it matched")
    (.filter my-sam)
    (.collect Collectors/toList)

(-> (IntStream/range 0 10)
    (.filter (sam* java.util.function.IntPredicate odd?))
    (.collect Collectors/toList)
It uses MethodHandleProxies.asInterfaceInstance to create a proxy instance of the interface that calls a method handle calling a Clojure function. It doesn't try to validate parameter counts, it just treats it as varargs delegating to IFn.applyTo(ISeq). Not sure if it's the most efficient but it was effective for my needs. I think the LambdaMetaFactory may be the preferred way to meet this type of use case. It was harder for me to follow exactly how to use that though so I didn't end up looking to deep into it. The main functional issue I have with my approach (and Ghadi's) is that you have to explcitly provide the interface you want to proxy. Java lambdas and Groovy Closures can be used against methods that expect a SAM and it just coerces based on what the method expects. Ideally this would be supported by Clojure too. Instead of having to do:
(-> (IntStream/range 0 10)
    (.filter (sam* java.util.function.IntPredicate odd?))
    (.collect Collectors/toList)
I'd like to do this:
(-> (IntStream/range 0 10)
    (.filter odd?)
    (.collect Collectors/toList)
Hide
Ghadi Shayban added a comment - - edited

Another possible avenue is to extend java.util.function.Supplier to Clojure functions with an explicit 0-arity. That interface is becoming increasingly common in practice; it might be valuable to special-case it. (We shouldn't & couldn't do a similar thing for defrecords, as they already have a method called get, which clashes with Supplier's only method.)

Show
Ghadi Shayban added a comment - - edited Another possible avenue is to extend java.util.function.Supplier to Clojure functions with an explicit 0-arity. That interface is becoming increasingly common in practice; it might be valuable to special-case it. (We shouldn't & couldn't do a similar thing for defrecords, as they already have a method called get, which clashes with Supplier's only method.)

People

Vote (9)
Watch (5)

Dates

  • Created:
    Updated: