Syntax reference
q
The query macro,for example
(q df
((not_ (<=_ ?UnitPrice 250)) ?isExpensive)
((true_? ?isExpensive))
[?Description ?UnitPrice]
(.show 5))
Its arguments are called qforms.
q is translated to ->,in the end,but before this it transform the qforms
q produces valid Clojure code,that uses the Java API
qforms are not valid Clojure code,and thats why macro is used
It does
1)In the end is replaced with ->
2)Replace ?qvar to (org.apache.spark.sql.functions/col "qvar")
?:qvar to "qvar"
(qvar are written with ? in front so we know what is Column and what is
String,inside in the schema of the dataframe, ?qvar is saved as "qvar" column name)
3)Transform the qforms for example
The louna select qform
[?expensive ?unitPrice]
Is translated to
(louna.q.run/sql-select [(org.apache.spark.sql.functions/col "expensive")
(org.apache.spark.sql.functions/col "unitPrice")])
4)q produces valid Clojure code,to run on run-time.
For many forms little tranform or non is needed for
example (.show) need no tranform at all.
For others q produces clojure functions that are simple wrappers over the Java API
or produce Java API code.
Some forms need more tranformations like the joins.
In the end all code is translated to Java API calls,many forms are Java API calls
after q is ended,the others call the Java API at runtime.
Spark even if a clojure wrapper is used,finds the underling Spark code to make the
query plan etc,so there is no problem using wrappers.
UDF/UDAF
Spark Java API has so many functions to run on Columns,in louna are used with macros
or with simple Wrapper functions,Catalyst finds the underline Spark Column function
so it makes no difference in perfomance,if macro or function.
In case a udf is needed its very simple to make one.
1)Declare a "Spark function" (like normal clojure function,just add return type here :long)
(defudf b1 :long [x]
(if (= x 1) 10 x))
2)Use an existin Clojure function,create the b2 udf
(defn f2 [x] (* x 2))
(defudf b2 f2 1 :long)
or
(defudf b2 (fn [x] (* x 2)) 1 :long)
*they can be used wherever for example filters/binds etc
Filters
(() () ...) means (and filter1 filter2 ...)
For example
((contains_? ?StockCode "DOT") (>_ ?UnitPrice 600))
For simplicity
((?qvar)) means ((true_? ?qvar))
((!?qvar)) mean ((false_? ?qvar))
Binds
((*_ ?price 0.18) ?tax)
It is translated to withColumn and adds the "tax" Column
Select
SQL style select,for example
[?qvar (+_ 5 ?qvar) ((*_ 10 ?qvar) ?newqvar))]
?qvar just select
(+_ 5 ?qvar) anonymous bind,new column name=name of the functions
((*_ 10 ?qvar) ?newqvar)) bind with name using alias inside
It the same like
(select (col "qvar") (+ 5 (col "qvar")) (.alias (+ 10 (col "qvar") "newqvar"))
To select all others
[?qvar (+_ 5 ?qvar) ((*_ 10 ?qvar) ?newqvar)) *]
or
[?qvar (+_ 5 ?qvar) ((*_ 10 ?qvar) ?newqvar)) __]
Its simple select,without the word select,and with the simple bind synstax instead
the more verbose .alias etc
Like normal select,we can use aggregate functions also.
Logic-select/Logic-select-join
For example
(q (person ?id _ - ?gid )
(graduateProgram ?gid __)
.show)
This is select/rename/join qform
If variable same name=>join
The first is logic-select (nothing coming from above to join with)
Meaning select from person the columns
1 "id"
2 "initial_name" (_ mean select the column,with its initial name)
3 "gid (- means ignore the variable at original 3rd position)
ignore all others that come next
The second is logic-select-join on gid
Means select the first column as "gid",and the others with their original names (.. __)
And join on same named columns here "gid",we can make louna join only on variables i
manually named to avoid un-intented joins.(or warn at least)
Logic-select/Logic-select-join qvars synstax/order
The header can be very big,or the name of the column it can be not valid Clojure symbols
in cases like that,we use the string notation,and the out-of-order select
In Louna
"?qvar"==?qvar==(col "qvar")
"?:qvar"==?:qvar=="qvar"
The ?:qvar notation is a simple way to difference a String from a Column name
for functions that need the Column name in String.
In order (always first)
?p - _
- "a" 5 ?x _ ;;constants also
"?avg(x)" ;;"?qvar"==?qvar,the string is used only for names that cant be clojure
;;symbols like avg(x)
Out of order(always after the in ordered qvars)
?price:p (means the price in original header,rename it to p)
?8:p (the 8th in original header renamed to p)
?price:_ (the price in original header and no rename)
Select all (always in the end of ordered,and unordered)
(... __) select also the ones i didnt mention with their original names
(... --) dont select the ones i ddint mention(its the default so we dont need it)
Join safety
Louna joins if same column by default,
If you want to control which qvars to be joined,its possible to warn when join on
variable that i didnt namely select (for example ?gid is manually selected so no warning)
but if join in variable selected with _ => warn.
For more safety exception could be thrown if unitentended join.
If still problem maybe ??qvar notation will be used to say exactly what is supposed to be joined.
In case that the headers are very big,and many names overlap,louna is not restrictive,we can
use the more verbose join method of spark but still headers with same name columns are confusing
so in louna its preferend to easily rename the columns,and avoid this problems.
Aggregations
Syntax is like Spark,but binds can be used also.
Dataframe aggregations (the dataframe=1group) (no groupBy first)
Using select and aggregation functions,binds syntax works also as alias
[(agrfun1 ...) ((agrfun2 ...) ?myQvar) ...]
I can do the same using the .agg method of dataset so its obvious that its aggregation
but for now binds dont work (sometime maybe make one agg functions,and one extra group-agg for 2.2) case )
GroupBy first (=> RelationalGroupedDataset object)
1)If 1 aggregate function in 1 or more columns
=> use of methods of the RelationalGroupedDataset object (if exist the one i need)
Example
(q df
(groupBy ?InvoiceNo ?CustomerId)
.count ;;method tou RelationalGroupedDataset
(.show))
2)1 or more aggregate functions => (aggr + functions of sql.functions)
Example
(q df
(groupBy ?InvoiceNo)
(agg (sum ?Quantity) ((count_ ?Quantity) ?quan)) ;;bind syntax works also
(.show))
*udaf (user defined aggregation functions,if needed something not provided)
See udafAvg.clj and udaf.clj
Its a simple class definition,that extends the org.apache.spark.sql.expressions.UserDefinedAggregateFunction
and implements the necessary methods in that example calculates the average
Other methods for structured API
Louna provides some other wrappers over Spark Java api,inside the louna.q.run.
Also others inside the Datasets package.
For example in Louna rename of columns can be used like this
(q person
(rename ?id ?id2) ;;rename 1 column
(rename {?name ?name1
?age ?new-age}) ;;map to rename many
.show)
Datasets
They work like in Java Spark using normal Clojure functions.
A library is used to easily create the beans.
(clj-bean.core/defbean sparkdefinitive.ch11Datasets.Flight
[[String DEST_COUNTRY_NAME]
[String ORIGIN_COUNTRY_NAME]
[long count]])
(def flightsDF
(-> (get-session)
.read
(.parquet (str (settings/get-base-path)
"/data/flight-data/parquet/2010-summary.parquet/"))))
(def flightsDS (.as flightsDF (Encoders/bean sparkdefinitive.ch11Datasets.Flight)))
;;Define the Java Bean,read data and make the Dataset
(def flightONotsameD
(q flightsDS
(filter_ (fn [flight] (not= (.getORIGIN_COUNTRY_NAME flight) (.getDEST_COUNTRY_NAME flight))))
.collect))
;;Filter the dataset using normal clojure function
RDDs
Inside the RDDs are wrappers,for some basic tranformations,its simple to use them with Clojure
functions.
(def counts
(-> (louna.state.settings/get-sc)
(.textFile (str (louna.state.settings/get-base-path) "/data/wordcount")) ;;[line1 line2 ...]
(flatMap (fn [x] (clojure.string/split x #" "))) ;;[word1 word2] and flat
(mapToPair (fn [x] [x 1])) ;;[[word1 1] [word2 1]]
(reduceByKey (fn [v1 v2] (+ v1 v2))))) ;;if same key add the second => [word sum]
;;WordCount
Louna is work in progress,Spark Java API can be used when something is missing,so its usable,
now also.