4
votes

I am trying to rewrite Spark Structured Streaming example in Clojure.

The example is written in Scala as follows:

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

(ns flambo-example.streaming-example
  (:import [org.apache.spark.sql Encoders SparkSession Dataset Row]
           [org.apache.spark.sql.functions]
           ))

(def spark
  (->
   (SparkSession/builder)
   (.appName "sample")
   (.master "local[*]")
   .getOrCreate)  
  )


(def lines
  (-> spark
      .readStream
      (.format "socket")
      (.option "host" "localhost")
      (.option "port" 9999)
      .load      
      )
  )

(def words
  (-> lines
      (.as (Encoders/STRING))      
      (.flatMap #(clojure.string/split  % #" " ))      
      ))

The above code causes the following exception.

;; Caused by java.lang.IllegalArgumentException ;; No matching method found: flatMap for class ;; org.apache.spark.sql.Dataset

How can I avoid the error ?

1

1 Answers

1
votes

You have to follow the signatures. Java Dataset API provides two implementations of Dataset.flatMap, one which takes scala.Function1

def flatMap[U](func: (T) ⇒ TraversableOnce[U])(implicit arg0: Encoder[U]): Dataset[U] 

and the second one which takes Spark's own o.a.s.api.java.function.FlatMapFunction

def flatMap[U](f: FlatMapFunction[T, U], encoder: Encoder[U]): Dataset[U] 

The former one is rather useless for you, but you should be able to use the latter one. For RDD API flambo uses macros to create Spark friendly adapters which can be accessed with flambo.api/fn - I am not sure if these will work directly with Datasets, but you should be able to adjust them if you need.

Since you cannot depend on implicit Encoders you also have to provide explicit encoder which matches return type.

Overall you'll need something around:

(def words
  (-> lines
    (.as (Encoders/STRING))      
    (.flatMap f e)      
  ))

where f implements FlatMapFunction and e is an Encoder. One example implementation:

(def words
  (-> lines
      (.as (Encoders/STRING))      
      (.flatMap
        (proxy [FlatMapFunction] [] 
          (call [s] (.iterator (clojure.string/split s #" ")))) 
        (Encoders/STRING))))

but I guess it is possible to find a better one.

In practice I'd avoid typed Dataset whatsoever and focus on DataFrame (Dataset[Row]).