Sequence to Dataframe
val myManualSchema = new StructType(Array(
new StructField("some", StringType, true),
new StructField("col", StringType, true),
new StructField("names", LongType, false)))
val myRows = Seq(Row("Hello", null, 1L))
val myRDD = spark.sparkContext.parallelize(myRows)
val myDf = spark.createDataFrame(myRDD, myManualSchema)
|
(def myManualSchema [["some"] ["col"] ["names" :long false]])
(def mydf (sql/seq->df [["Hello" nil 1]] myManualSchema))
|
Select
df.select("DEST_COUNTRY_NAME").show(2)
|
(q df
[?DEST_COUNTRY_NAME]
(.show 2))
|
Select and rename
df.select(col("DEST_COUNTRY_NAME").alias("dest")).show(2)
|
(q (df - ?dest) ;;second place in header
(.show 2))
or
(q (df ?DEST_COUNTRY_NAME:dest) ;;in any place in header
(.show 2))
|
Filter
df.where(col("count") < 2)
.where(col("ORIGIN_COUNTRY_NAME") == "Croatia")
.show(2)
|
(q df
((<_ ?count 2) (=_ ?ORIGIN_COUNTRY_NAME "Croatia"))
(.show 2))
|
Bind Filter Select
val DOTCodeFilter = col("StockCode") === "DOT"
val priceFilter = col("UnitPrice") > 600
val descripFilter = col("Description").contains("POSTAGE")
df.withColumn("isExpensive",
DOTCodeFilter.and(priceFilter.or(descripFilter)))
.where("isExpensive")
.select("unitPrice", "isExpensive")
.show(5)
|
(q df
((.and (=_ ?StockCode "DOT")
(.or (>_ ?UnitPrice 600)
(includes? ?Description "POSTAGE"))) ?expensive)
((?expensive))
[?expensive ?unitPrice]
(.show 5))
|
Complex Types
df.select(split(col("Description"), " ").alias("array_col"))
.selectExpr("array_col[0]").show(2 false)
|
(q df
((split ?Description " ") ?myarray)
[(.getItem ?myarray 0)]
(.show 2 false))
|
User Defined Functions (UDFs)
def mul3(double_value):
return double_value * 3
val mul3udf = udf(mul3(_:Double):Double)
df.select(mul3udf(col("num"))).show()
|
(defudf mul3udf (fn [x] (* x 3)) 1 :double)
or
(defn mul3 [x] (* x 3))
(defudf mul3udf mul3 1 :double)
or
(defudf mul3udf :double [x]
(* x 3))
(q df [(mul3udf ?num)] .show)
|
Join (inner default,and outer)
val joinExpression = person.col("graduate_program")
=== graduateProgram.col("id")
person.join(graduateProgram, joinExpression).show()
val joinExpression = person.col("graduate_program")
=== graduateProgram.col("id")
person.join(graduateProgram, joinExpression, "outer").show()
|
(q (person _ _ ?gid)
(graduateProgram ?gid __)
.show)
(q (person _ _ ?gid)
(graduateProgram ?gid __ :outer)
.show)
|
Aggregate on Dataframe (agg and select)
df.select(
count("Quantity").alias("ttransactions"),
sum("Quantity").alias("tpurchases"),
avg("Quantity").alias("avg_purchases"),
expr("mean(Quantity)").alias("mean_purchases"))
.selectExpr(
"tpurchases/ttransactions",
"avg_purchases",
"mean_purchases").show()
|
(q df
[((count_ ?Quantity) ?tTransactions)
((sum ?Quantity) ?tPurchases)
((avg ?Quantity) ?avgPurchases)
((mean ?Quantity) ?meanPurchases)]
[(div ?tPurchases ?tTransactions) ?avgPurchases ?meanPurchases]
.show)
|
Aggregate on groups
df.groupBy("InvoiceNo", "CustomerId").count().show()
df.groupBy("InvoiceNo").agg(
count("Quantity").alias("quan"),
sum("Quantity")).show()
|
(q df
(groupBy ?InvoiceNo ?CustomerId)
.count
(.show))
(q df
(groupBy ?InvoiceNo)
(agg ((count_ ?Quantity) ?quan) (sum ?Quantity))
(.show))
|
Window Spec
val dfWithDate = df.withColumn("date", to_date(col("InvoiceDate"),
"MM/d/yyyy H:mm"))
dfWithDate.createOrReplaceTempView("dfWithDate")
val windowSpec = Window
.partitionBy("CustomerId", "date")
.orderBy(col("Quantity").desc)
.rowsBetween(Window.unboundedPreceding,
Window.currentRow)
val maxPurchaseQuantity = max(col("Quantity")).over(windowSpec)
val purchaseDenseRank = dense_rank().over(windowSpec)
val purchaseRank = rank().over(windowSpec)
dfWithDate.where("CustomerId IS NOT NULL").orderBy("CustomerId")
.select(
col("CustomerId"),col("date"),
col("Quantity"),
purchaseRank.alias("rank"),
purchaseDenseRank.alias("denseRank"),
maxPurchaseQuantity.alias("maxq")).show()
|
(let [myWindowSpec
(q (Window/partitionBy (ca ?CustomerId ?date))
(order-by (desc ?Quantity))
(.rowsBetween (Window/unboundedPreceding)
(Window/currentRow)))]
(q df
((functions/to_date ?InvoiceDate "MM/d/yyyy H:mm") ?date)
((not-nil_? ?CustomerId))
(order-by ?CustomerId)
[?CustomerId ?date ?Quantity
((.over (functions/rank) myWindowSpec) ?rank)
((.over (functions/dense_rank) myWindowSpec) ?denseRank)
((.over (max_ ?Quantity) myWindowSpec) ?maxq)]
.show))
|
Datasets create and Filter
case class Flight(DEST_COUNTRY_NAME: String,
ORIGIN_COUNTRY_NAME: String,
count: BigInt)
val flightsDF = spark.read
.parquet("/data/flight-data/parquet/2010-summary.parquet/")
val flights = flightsDF.as[Flight]
def originIsDestination(flight_row: Flight): Boolean = {
return flight_row.ORIGIN_COUNTRY_NAME
==
flight_row.DEST_COUNTRY_NAME
}
flights.filter(flight_row => originIsDestination(flight_row))
|
(clj-bean.core/defbean sparkdefinitive.ch11Datasets.Flight
[[String DEST_COUNTRY_NAME]
[String ORIGIN_COUNTRY_NAME]
[long count]])
(def flightsDS
(-> spark
.read
(.parquet (str (settings/get-base-path)
"/data/flight-data/parquet/2010-summary.parquet/"))
(.as (Encoders/bean sparkdefinitive.ch11Datasets.Flight))))
(def flightOsameD
(q flightsDS
(filter_ (fn [flight] (= (.getORIGIN_COUNTRY_NAME flight)
(.getDEST_COUNTRY_NAME flight))))))
|
WordCount
val counts = textFile
.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
|
(def counts
(-> textfile
(flatMap (fn [line] (s/split line #" ")))
(mapToPair (fn [word] [word 1]))
(reduceByKey (fn [v1 v2] (+ v1 v2)))))
|
Streaming
val readStream = spark.readStream.schema(dataSchema)
.option("maxFilesPerTrigger", 1)
.json("/data/activity-data")
streaming.withColumn("stairs", expr("gt like '%stairs%'"))
.where("stairs")
.where("gt is not null")
.select("gt", "model", "arrival_time", "creation_time")
.writeStream
.queryName("simple_transform")
.format("memory")
.outputMode("append")
.start()
|
(def readStream
(-> (get-session)
(.readStream)
(.schema dataSchema)
(.option "maxFilesPerTrigger" 1)
(.json "/data/activity-data/")))
(q readStream
((includes? ?gt "stairs") ?stairs)
((true_? ?stairs) (not-nil_? ?gt))
[?gt ?model ?arrival_time ?creation_time]
(.writeStream)
(.queryName "simple_transform")
(.format "memory")
(.outputMode "append")
(.start))
|