Aggregate Functions

Let us see how to perform aggregations within each group while projecting the raw data that is used to perform the aggregation.

  • We have functions such as sum, avg, min, max etc which can be used to aggregate the data.

  • We need to create WindowSpec object using partitionBy to get aggregations within each group.

  • Typically we don’t need to sort the data to perform aggregations, however if we want to perform cumulative aggregations using rowsBetween, then we have to sort the data using cumulative criteria.

  • Let us try to get total departure delay, minimum departure delay, maximum departure delay and average departure delay for each day for each airport. We will ignore all those flights which are departed early or ontime.

Let us start spark context for this Notebook so that we can execute the code provided. You can sign up for our 10 node state of the art cluster/labs to learn Spark SQL using our unique integrated LMS.

from pyspark.sql import SparkSession

import getpass
username = getpass.getuser()

spark = SparkSession. \
    builder. \
    config('spark.ui.port', '0'). \
    config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
    enableHiveSupport(). \
    appName(f'{username} | Python - Windowing Functions'). \
    master('yarn'). \
    getOrCreate()

If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.

Using Spark SQL

spark2-sql \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse

Using Scala

spark2-shell \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse

Using Pyspark

pyspark2 \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
spark.conf.set('spark.sql.shuffle.partitions', '2')
airtraffic_path = "/public/airtraffic_all/airtraffic-part/flightmonth=200801"
airtraffic = spark. \
  read. \
  parquet(airtraffic_path)
airtraffic.printSchema()
root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- CRSElapsedTime: integer (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: string (nullable = true)
 |-- TaxiIn: string (nullable = true)
 |-- TaxiOut: string (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: integer (nullable = true)
 |-- CarrierDelay: string (nullable = true)
 |-- WeatherDelay: string (nullable = true)
 |-- NASDelay: string (nullable = true)
 |-- SecurityDelay: string (nullable = true)
 |-- LateAircraftDelay: string (nullable = true)
 |-- IsArrDelayed: string (nullable = true)
 |-- IsDepDelayed: string (nullable = true)
from pyspark.sql.functions import col, lit, lpad, concat
from pyspark.sql.functions import min, max, sum, avg
from pyspark.sql.window import Window
help(Window.partitionBy)
Help on function partitionBy in module pyspark.sql.window:

partitionBy(*cols)
    Creates a :class:`WindowSpec` with the partitioning defined.
    
    .. versionadded:: 1.4
spec = Window. \
    partitionBy("FlightDate", "Origin")
type(spec)
pyspark.sql.window.WindowSpec
airtraffic. \
    filter("IsDepDelayed = 'YES' and Cancelled = 0"). \
    select(concat("Year", 
                  lpad("Month", 2, "0"), 
                  lpad("DayOfMonth", 2, "0")
                 ).alias("FlightDate"),
           "Origin",
           "UniqueCarrier",
           "FlightNum",
           "CRSDepTime",
           "IsDepDelayed",
           col("DepDelay").cast("int").alias("DepDelay")
          ). \
    withColumn("DepDelayMin", min("DepDelay").over(spec)). \
    withColumn("DepDelayMax", max("DepDelay").over(spec)). \
    withColumn("DepDelaySum", sum("DepDelay").over(spec)). \
    withColumn("DepDelayAvg", avg("DepDelay").over(spec)). \
    orderBy("FlightDate", "Origin", "DepDelay"). \
    show()
+----------+------+-------------+---------+----------+------------+--------+-----------+-----------+-----------+------------------+
|FlightDate|Origin|UniqueCarrier|FlightNum|CRSDepTime|IsDepDelayed|DepDelay|DepDelayMin|DepDelayMax|DepDelaySum|       DepDelayAvg|
+----------+------+-------------+---------+----------+------------+--------+-----------+-----------+-----------+------------------+
|  20080101|   ABE|           OO|     5873|       720|         YES|       1|          1|        175|        487|            60.875|
|  20080101|   ABE|           OH|     5457|      1720|         YES|      14|          1|        175|        487|            60.875|
|  20080101|   ABE|           XE|     2578|      1410|         YES|      22|          1|        175|        487|            60.875|
|  20080101|   ABE|           9E|     2936|      1615|         YES|      34|          1|        175|        487|            60.875|
|  20080101|   ABE|           XE|     2594|      1740|         YES|      34|          1|        175|        487|            60.875|
|  20080101|   ABE|           9E|     2940|      1215|         YES|      70|          1|        175|        487|            60.875|
|  20080101|   ABE|           YV|     7263|      1230|         YES|     137|          1|        175|        487|            60.875|
|  20080101|   ABE|           YV|     7138|      1741|         YES|     175|          1|        175|        487|            60.875|
|  20080101|   ABI|           MQ|     3214|      1735|         YES|       3|          3|          3|          3|               3.0|
|  20080101|   ABQ|           WN|     2976|      1040|         YES|       1|          1|        218|       1580|32.916666666666664|
|  20080101|   ABQ|           WN|      972|      1810|         YES|       1|          1|        218|       1580|32.916666666666664|
|  20080101|   ABQ|           WN|       61|      1320|         YES|       1|          1|        218|       1580|32.916666666666664|
|  20080101|   ABQ|           WN|     3425|      1440|         YES|       2|          1|        218|       1580|32.916666666666664|
|  20080101|   ABQ|           WN|       88|       755|         YES|       2|          1|        218|       1580|32.916666666666664|
|  20080101|   ABQ|           WN|     2284|      1520|         YES|       3|          1|        218|       1580|32.916666666666664|
|  20080101|   ABQ|           XE|     2771|      1430|         YES|       3|          1|        218|       1580|32.916666666666664|
|  20080101|   ABQ|           WN|     1493|      2020|         YES|       4|          1|        218|       1580|32.916666666666664|
|  20080101|   ABQ|           WN|      360|      1800|         YES|       5|          1|        218|       1580|32.916666666666664|
|  20080101|   ABQ|           WN|      644|      1725|         YES|       5|          1|        218|       1580|32.916666666666664|
|  20080101|   ABQ|           WN|      729|      1530|         YES|       7|          1|        218|       1580|32.916666666666664|
+----------+------+-------------+---------+----------+------------+--------+-----------+-----------+-----------+------------------+
only showing top 20 rows