Examples
Some examples,in github there is the code of the Spark Definitive guide in louna.
(Scala code is presented and then the louna code for each example)
Read some Data
(def df
(-> (.read (get-session))
(.option "header" "true")
(.option "inferSchema" "true")
(.csv (str (settings/get-base-path)
"/data/flight-data/csv/2015-summary.csv"))))
Select filter show
(q (df ?invNo - ?desc _)
((=_ ?invNo 536365))
(.show 5 false))
;;select and rename the 1st column to "invNo",ignore the second
;;select the 3rd and rename it to desc,select the 4th with its
;;original name
;;filter,and show results
Filter,bind,select(normal sql type select) the 2 columns
(q df
((contains_? ?StockCode "DOT") (.or (>_ ?UnitPrice 600)
(includes? ?Description "POSTAGE")))
((lit true) ?expensive)
[?expensive ?unitPrice]
(.show 5))
Function query,with arguments
(defn fq [stock-code unit-price true-var show-number]
(q df
((contains_? ?StockCode stock-code) (.or (>_ ?UnitPrice unit-price)
(includes? ?Description "POSTAGE")))
((lit true-var) ?expensive)
[?expensive ?unitPrice]
(.show show-number)))
(fq "DOT" 600 true 5)
Sorting,collecting 2,printing
(q df
(order-by ?count)
(.take 2)
(sql/print-rows))
Grouping
(q (df ?dest - ?count)
(groupBy ?dest)
(g/sum ?count)
(rename "sum(count)" ?destTotal)
(order-by (desc ?destTotal))
(.limit 5)
(.show))
;;Select/rename the first column,ignore the second,select/rename the third
;;groupBy,aggregate the sum of ?count column,rename column,orderby ....
Local data collection to dataframe
(def person
(sql/seq->df [[0 "Bill Chambers" 0 (long-array [100])]
[1 "Matei Zaharia" 1 (long-array [500 250 100])]
[2 "Michael Armbrust" 1 (long-array [250 100])]]
[["id" :long]
["name"]
["graduate_program" :long]
["spark_status" (t/array-type DataTypes/LongType)]]))
(def graduateProgram
(sql/seq->df [[0 "Masters" "School of Information" "UC Berkeley"]
[2 "Masters" "EECS" "UC Berkeley"]
[1 "Ph.D." "EECS" "UC Berkeley"]]
[["id" :long]
["degree"]
["department"]
["school"]]))
Join
(q (person _ _ ?gid)
(graduateProgram ?gid __)
.show)
;;Simple inner join
;;(person _ _ ?gid) select the 2 firsts with their original name
;;the 3rd as gid,and ignore the rest if any
;;(graduateProgram ?gid __) select the first and give it gid name
;;__ means select all the others with their original names
;;This has as result a join query on gid
;;If person had like 20 or more columns and we wanted the 15 column
;;with name lastname,we could type
;;(person ?lastname:ln __)
;;that would mean select the lastname as ln
;;we can jump using if header is big
Outer Join
(q (person _ _ ?gid)
(graduateProgram ?gid __ :outer)
.show)
UDF
(defudf b1 :long [x]
(if (= x 1) 10 x))
(defudf f1 :boolean [x y]
(= x y))
(defn f2 [x]
(* x 2))
(defudf b2 f2 1 :long)
;;or (defudf b2 (defn [x] (* x 2)) 1 :long)
(q (person ?id ?name ?pid ?status)
((b1 ?pid) ?pid2)
((b2 ?pid) ?pid3)
(graduateProgram ?pid ?degree ?dep ?school)
((f1 ?pid ?id))
.show)
;;udf usage,for filters or binds,2 ways possible,either declare the function(b1 or f1)
;;or use an existing clojure function(b2 udf is using the f2 clojure function)
Datasets
(clj-bean.core/defbean sparkdefinitive.ch11Datasets.Flight
[[String DEST_COUNTRY_NAME]
[String ORIGIN_COUNTRY_NAME]
[long count]])
(def flightsDF
(-> (get-session)
.read
(.parquet (str (settings/get-base-path) "/data/flight-data/parquet/2010-summary.parquet/"))))
(def flightsDS (.as flightsDF (Encoders/bean sparkdefinitive.ch11Datasets.Flight)))
;;Define the Java Bean,read data and make the Dataset
Filter the dataset using Clojure function
(def flightONotsameD
(q flightsDS
(filter_ (fn [flight] (not= (.getORIGIN_COUNTRY_NAME flight) (.getDEST_COUNTRY_NAME flight))))
.collect))
WordCount
(def counts
(-> (louna.state.settings/get-sc)
(.textFile (str (louna.state.settings/get-base-path) "/data/wordcount"))
(flatMap (fn [x] (clojure.string/split x #" ")))
(mapToPair (fn [x] [x 1]))
(reduceByKey (fn [v1 v2] (+ v1 v2)))))
Streaming
(def readStream
(-> (get-session)
(.readStream)
(.schema dataSchema)
(.option "maxFilesPerTrigger" 1)
(.json (str (settings/get-base-path) "/data/activity-data/"))))
(q readStream
((includes? ?gt "stairs") ?stairs)
((true_? ?stairs) (not-nil_? ?gt))
[?gt ?model ?arrival_time ?creation_time]
(.writeStream)
(.queryName "simple_transform")
(.format "memory")
(.outputMode "append")
(.start))
(def tempTable (.table (get-session) "simple_transform"))
(dotimes [- 5]
(q tempTable .show)
(Thread/sleep 2000))
;;Stream,read,query,write results,and start streaming,syntax is like static
;;dataframes,after start we see some temp results