Scala λouna
Sequence to Dataframe

val myManualSchema = new StructType(Array( new StructField("some", StringType, true), new StructField("col", StringType, true), new StructField("names", LongType, false))) val myRows = Seq(Row("Hello", null, 1L)) val myRDD = spark.sparkContext.parallelize(myRows) val myDf = spark.createDataFrame(myRDD, myManualSchema)


(def myManualSchema [["some"] ["col"] ["names" :long false]]) (def mydf (sql/seq->df [["Hello" nil 1]] myManualSchema))
Select
df.select("DEST_COUNTRY_NAME").show(2)

(q df [?DEST_COUNTRY_NAME] (.show 2))
Select and rename
df.select(col("DEST_COUNTRY_NAME").alias("dest")).show(2)

(q (df - ?dest) ;;second place in header (.show 2)) or (q (df ?DEST_COUNTRY_NAME:dest) ;;in any place in header (.show 2))
Filter
df.where(col("count") < 2) .where(col("ORIGIN_COUNTRY_NAME") == "Croatia") .show(2)

(q df ((<_ ?count 2) (=_ ?ORIGIN_COUNTRY_NAME "Croatia")) (.show 2))
Bind Filter Select
val DOTCodeFilter = col("StockCode") === "DOT" val priceFilter = col("UnitPrice") > 600 val descripFilter = col("Description").contains("POSTAGE") df.withColumn("isExpensive", DOTCodeFilter.and(priceFilter.or(descripFilter))) .where("isExpensive") .select("unitPrice", "isExpensive") .show(5)

(q df ((.and (=_ ?StockCode "DOT") (.or (>_ ?UnitPrice 600) (includes? ?Description "POSTAGE"))) ?expensive) ((?expensive)) [?expensive ?unitPrice] (.show 5))
Complex Types
df.select(split(col("Description"), " ").alias("array_col")) .selectExpr("array_col[0]").show(2 false)

(q df ((split ?Description " ") ?myarray) [(.getItem ?myarray 0)] (.show 2 false))
User Defined Functions (UDFs)
def mul3(double_value): return double_value * 3 val mul3udf = udf(mul3(_:Double):Double) df.select(mul3udf(col("num"))).show()

(defudf mul3udf (fn [x] (* x 3)) 1 :double) or (defn mul3 [x] (* x 3)) (defudf mul3udf mul3 1 :double) or (defudf mul3udf :double [x] (* x 3)) (q df [(mul3udf ?num)] .show)
Join (inner default,and outer)

val joinExpression = person.col("graduate_program") === graduateProgram.col("id") person.join(graduateProgram, joinExpression).show() val joinExpression = person.col("graduate_program") === graduateProgram.col("id") person.join(graduateProgram, joinExpression, "outer").show()


(q (person _ _ ?gid) (graduateProgram ?gid __) .show) (q (person _ _ ?gid) (graduateProgram ?gid __ :outer) .show)
Aggregate on Dataframe (agg and select)
df.select( count("Quantity").alias("ttransactions"), sum("Quantity").alias("tpurchases"), avg("Quantity").alias("avg_purchases"), expr("mean(Quantity)").alias("mean_purchases")) .selectExpr( "tpurchases/ttransactions", "avg_purchases", "mean_purchases").show()

(q df [((count_ ?Quantity) ?tTransactions) ((sum ?Quantity) ?tPurchases) ((avg ?Quantity) ?avgPurchases) ((mean ?Quantity) ?meanPurchases)] [(div ?tPurchases ?tTransactions) ?avgPurchases ?meanPurchases] .show)
Aggregate on groups
df.groupBy("InvoiceNo", "CustomerId").count().show() df.groupBy("InvoiceNo").agg( count("Quantity").alias("quan"), sum("Quantity")).show()

(q df (groupBy ?InvoiceNo ?CustomerId) .count (.show)) (q df (groupBy ?InvoiceNo) (agg ((count_ ?Quantity) ?quan) (sum ?Quantity)) (.show))
Window Spec
val dfWithDate = df.withColumn("date", to_date(col("InvoiceDate"), "MM/d/yyyy H:mm")) dfWithDate.createOrReplaceTempView("dfWithDate") val windowSpec = Window .partitionBy("CustomerId", "date") .orderBy(col("Quantity").desc) .rowsBetween(Window.unboundedPreceding, Window.currentRow) val maxPurchaseQuantity = max(col("Quantity")).over(windowSpec) val purchaseDenseRank = dense_rank().over(windowSpec) val purchaseRank = rank().over(windowSpec) dfWithDate.where("CustomerId IS NOT NULL").orderBy("CustomerId") .select( col("CustomerId"),col("date"), col("Quantity"), purchaseRank.alias("rank"), purchaseDenseRank.alias("denseRank"), maxPurchaseQuantity.alias("maxq")).show()

(let [myWindowSpec (q (Window/partitionBy (ca ?CustomerId ?date)) (order-by (desc ?Quantity)) (.rowsBetween (Window/unboundedPreceding) (Window/currentRow)))] (q df ((functions/to_date ?InvoiceDate "MM/d/yyyy H:mm") ?date) ((not-nil_? ?CustomerId)) (order-by ?CustomerId) [?CustomerId ?date ?Quantity ((.over (functions/rank) myWindowSpec) ?rank) ((.over (functions/dense_rank) myWindowSpec) ?denseRank) ((.over (max_ ?Quantity) myWindowSpec) ?maxq)] .show))
Datasets create and Filter

case class Flight(DEST_COUNTRY_NAME: String, ORIGIN_COUNTRY_NAME: String, count: BigInt) val flightsDF = spark.read .parquet("/data/flight-data/parquet/2010-summary.parquet/") val flights = flightsDF.as[Flight] def originIsDestination(flight_row: Flight): Boolean = { return flight_row.ORIGIN_COUNTRY_NAME == flight_row.DEST_COUNTRY_NAME } flights.filter(flight_row => originIsDestination(flight_row))


(clj-bean.core/defbean sparkdefinitive.ch11Datasets.Flight [[String DEST_COUNTRY_NAME] [String ORIGIN_COUNTRY_NAME] [long count]]) (def flightsDS (-> spark .read (.parquet (str (settings/get-base-path) "/data/flight-data/parquet/2010-summary.parquet/")) (.as (Encoders/bean sparkdefinitive.ch11Datasets.Flight)))) (def flightOsameD (q flightsDS (filter_ (fn [flight] (= (.getORIGIN_COUNTRY_NAME flight) (.getDEST_COUNTRY_NAME flight))))))
WordCount
val counts = textFile .flatMap(line => line.split(" ")) .map(word => (word, 1)) .reduceByKey(_ + _)

(def counts (-> textfile (flatMap (fn [line] (s/split line #" "))) (mapToPair (fn [word] [word 1])) (reduceByKey (fn [v1 v2] (+ v1 v2)))))
Streaming
val readStream = spark.readStream.schema(dataSchema) .option("maxFilesPerTrigger", 1) .json("/data/activity-data") streaming.withColumn("stairs", expr("gt like '%stairs%'")) .where("stairs") .where("gt is not null") .select("gt", "model", "arrival_time", "creation_time") .writeStream .queryName("simple_transform") .format("memory") .outputMode("append") .start()

(def readStream (-> (get-session) (.readStream) (.schema dataSchema) (.option "maxFilesPerTrigger" 1) (.json "/data/activity-data/"))) (q readStream ((includes? ?gt "stairs") ?stairs) ((true_? ?stairs) (not-nil_? ?gt)) [?gt ?model ?arrival_time ?creation_time] (.writeStream) (.queryName "simple_transform") (.format "memory") (.outputMode "append") (.start))