Solutions - Problem 2

Get number of flights which are delayed in departure and number of flights delayed in arrival for each day along with number of flights departed for each day.

  • Output should contain 4 columns - FlightDate, FlightCount, DepDelayedCount, ArrDelayedCount

  • FlightDate should be of yyyy-MM-dd format.

  • Data should be sorted in ascending order by flightDate

Let us start spark context for this Notebook so that we can execute the code provided. You can sign up for our 10 node state of the art cluster/labs to learn Spark SQL using our unique integrated LMS.

from pyspark.sql import SparkSession

import getpass
username = getpass.getuser()

spark = SparkSession. \
    builder. \
    config('spark.ui.port', '0'). \
    config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
    enableHiveSupport(). \
    appName(f'{username} | Python - Basic Transformations'). \
    master('yarn'). \
    getOrCreate()

If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.

Using Spark SQL

spark2-sql \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse

Using Scala

spark2-shell \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse

Using Pyspark

pyspark2 \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse

Reading airtraffic data

airtraffic_path = "/public/airtraffic_all/airtraffic-part/flightmonth=200801"
airtraffic = spark. \
    read. \
    parquet(airtraffic_path)
airtraffic.printSchema()
root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- CRSElapsedTime: integer (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: string (nullable = true)
 |-- TaxiIn: string (nullable = true)
 |-- TaxiOut: string (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: integer (nullable = true)
 |-- CarrierDelay: string (nullable = true)
 |-- WeatherDelay: string (nullable = true)
 |-- NASDelay: string (nullable = true)
 |-- SecurityDelay: string (nullable = true)
 |-- LateAircraftDelay: string (nullable = true)
 |-- IsArrDelayed: string (nullable = true)
 |-- IsDepDelayed: string (nullable = true)

Grouping Data by Flight Date

from pyspark.sql.functions import lit, concat, lpad
airtraffic. \
  groupBy(concat("Year", lit("-"), 
                 lpad("Month", 2, "0"), lit("-"), 
                 lpad("DayOfMonth", 2, "0")).
          alias("FlightDate"))
<pyspark.sql.group.GroupedData at 0x7fc369243940>

Getting Counts by FlightDate

from pyspark.sql.functions import count
airtraffic. \
    filter('Cancelled = 0'). \
    groupBy(concat("Year", lit("-"), 
                   lpad("Month", 2, "0"), lit("-"), 
                   lpad("DayOfMonth", 2, "0")).
            alias("FlightDate")). \
    agg(count(lit(1)).alias("FlightCount")). \
    show(31)
+----------+-----------+
|FlightDate|FlightCount|
+----------+-----------+
|2008-01-15|      19204|
|2008-01-21|      19658|
|2008-01-11|      19825|
|2008-01-19|      15373|
|2008-01-02|      20442|
|2008-01-06|      19210|
|2008-01-29|      18596|
|2008-01-30|      19072|
|2008-01-17|      19401|
|2008-01-31|      19179|
|2008-01-01|      18623|
|2008-01-24|      19935|
|2008-01-07|      19762|
|2008-01-09|      19443|
|2008-01-04|      20160|
|2008-01-08|      19140|
|2008-01-05|      17610|
|2008-01-25|      19787|
|2008-01-26|      15860|
|2008-01-12|      16346|
|2008-01-16|      19232|
|2008-01-23|      19239|
|2008-01-18|      20117|
|2008-01-22|      18716|
|2008-01-28|      19493|
|2008-01-13|      18587|
|2008-01-10|      19956|
|2008-01-14|      19267|
|2008-01-20|      18406|
|2008-01-27|      18265|
|2008-01-03|      20462|
+----------+-----------+
# Alternative to get the count with out using agg
# We will not be able to provide alias for aggregated fields
airtraffic. \
    filter('Cancelled = 0'). \
    groupBy(concat("Year", lit("-"), 
                   lpad("Month", 2, "0"), lit("-"), 
                   lpad("DayOfMonth", 2, "0")).
            alias("FlightDate")). \
    count(). \
    show()
+----------+-----+
|FlightDate|count|
+----------+-----+
|2008-01-15|19204|
|2008-01-21|19658|
|2008-01-11|19825|
|2008-01-19|15373|
|2008-01-02|20442|
|2008-01-06|19210|
|2008-01-29|18596|
|2008-01-30|19072|
|2008-01-17|19401|
|2008-01-31|19179|
|2008-01-01|18623|
|2008-01-24|19935|
|2008-01-07|19762|
|2008-01-09|19443|
|2008-01-04|20160|
|2008-01-08|19140|
|2008-01-05|17610|
|2008-01-25|19787|
|2008-01-26|15860|
|2008-01-12|16346|
+----------+-----+
only showing top 20 rows

Getting total as well as delayed counts for each day

from pyspark.sql.functions import sum, expr
airtraffic. \
    filter('Cancelled = 0'). \
    groupBy(concat("Year", lit("-"), 
                   lpad("Month", 2, "0"), lit("-"), 
                   lpad("DayOfMonth", 2, "0")).
            alias("FlightDate")). \
    agg(count(lit(1)).alias("FlightCount"),
        sum(expr("CASE WHEN IsDepDelayed = 'YES' THEN 1 ELSE 0 END")).alias("DepDelayedCount"),
        sum(expr("CASE WHEN IsArrDelayed = 'YES' THEN 1 ELSE 0 END")).alias("ArrDelayedCount")
       ). \
    show()
+----------+-----------+---------------+---------------+
|FlightDate|FlightCount|DepDelayedCount|ArrDelayedCount|
+----------+-----------+---------------+---------------+
|2008-01-15|      19204|           5304|           6389|
|2008-01-21|      19658|          10055|          11032|
|2008-01-11|      19825|           7255|           8164|
|2008-01-19|      15373|           6399|           6810|
|2008-01-02|      20442|          13294|          13749|
|2008-01-06|      19210|          10542|          10705|
|2008-01-29|      18596|           6324|           8370|
|2008-01-30|      19072|           6655|           7814|
|2008-01-17|      19401|           9635|          11229|
|2008-01-31|      19179|           9127|          11304|
|2008-01-01|      18623|          10501|          11173|
|2008-01-24|      19935|           8112|           9972|
|2008-01-07|      19762|           8122|           8683|
|2008-01-09|      19443|           5962|           6857|
|2008-01-04|      20160|           9406|           9824|
|2008-01-08|      19140|           7483|           8938|
|2008-01-05|      17610|           9051|           9345|
|2008-01-25|      19787|           8826|          10479|
|2008-01-26|      15860|           5740|           7163|
|2008-01-12|      16346|           3902|           4078|
+----------+-----------+---------------+---------------+
only showing top 20 rows

Sorting Data By FlightDate

help(airtraffic.sort)
Help on method sort in module pyspark.sql.dataframe:

sort(*cols, **kwargs) method of pyspark.sql.dataframe.DataFrame instance
    Returns a new :class:`DataFrame` sorted by the specified column(s).
    
    :param cols: list of :class:`Column` or column names to sort by.
    :param ascending: boolean or list of boolean (default ``True``).
        Sort ascending vs. descending. Specify list for multiple sort orders.
        If a list is specified, length of the list must equal length of the `cols`.
    
    >>> df.sort(df.age.desc()).collect()
    [Row(age=5, name='Bob'), Row(age=2, name='Alice')]
    >>> df.sort("age", ascending=False).collect()
    [Row(age=5, name='Bob'), Row(age=2, name='Alice')]
    >>> df.orderBy(df.age.desc()).collect()
    [Row(age=5, name='Bob'), Row(age=2, name='Alice')]
    >>> from pyspark.sql.functions import *
    >>> df.sort(asc("age")).collect()
    [Row(age=2, name='Alice'), Row(age=5, name='Bob')]
    >>> df.orderBy(desc("age"), "name").collect()
    [Row(age=5, name='Bob'), Row(age=2, name='Alice')]
    >>> df.orderBy(["age", "name"], ascending=[0, 1]).collect()
    [Row(age=5, name='Bob'), Row(age=2, name='Alice')]
    
    .. versionadded:: 1.3
help(airtraffic.orderBy)
Help on method sort in module pyspark.sql.dataframe:

sort(*cols, **kwargs) method of pyspark.sql.dataframe.DataFrame instance
    Returns a new :class:`DataFrame` sorted by the specified column(s).
    
    :param cols: list of :class:`Column` or column names to sort by.
    :param ascending: boolean or list of boolean (default ``True``).
        Sort ascending vs. descending. Specify list for multiple sort orders.
        If a list is specified, length of the list must equal length of the `cols`.
    
    >>> df.sort(df.age.desc()).collect()
    [Row(age=5, name='Bob'), Row(age=2, name='Alice')]
    >>> df.sort("age", ascending=False).collect()
    [Row(age=5, name='Bob'), Row(age=2, name='Alice')]
    >>> df.orderBy(df.age.desc()).collect()
    [Row(age=5, name='Bob'), Row(age=2, name='Alice')]
    >>> from pyspark.sql.functions import *
    >>> df.sort(asc("age")).collect()
    [Row(age=2, name='Alice'), Row(age=5, name='Bob')]
    >>> df.orderBy(desc("age"), "name").collect()
    [Row(age=5, name='Bob'), Row(age=2, name='Alice')]
    >>> df.orderBy(["age", "name"], ascending=[0, 1]).collect()
    [Row(age=5, name='Bob'), Row(age=2, name='Alice')]
    
    .. versionadded:: 1.3
airtraffic. \
    filter('Cancelled = 0'). \
    groupBy(concat("Year", lit("-"), 
                   lpad("Month", 2, "0"), lit("-"), 
                   lpad("DayOfMonth", 2, "0")).
            alias("FlightDate")). \
    agg(count(lit(1)).alias("FlightCount"),
        sum(expr("CASE WHEN IsDepDelayed = 'YES' THEN 1 ELSE 0 END")).alias("DepDelayedCount"),
        sum(expr("CASE WHEN IsArrDelayed = 'YES' THEN 1 ELSE 0 END")).alias("ArrDelayedCount")
       ). \
    orderBy("FlightDate"). \
    show(31)
+----------+-----------+---------------+---------------+
|FlightDate|FlightCount|DepDelayedCount|ArrDelayedCount|
+----------+-----------+---------------+---------------+
|2008-01-01|      18623|          10501|          11173|
|2008-01-02|      20442|          13294|          13749|
|2008-01-03|      20462|          11819|          12013|
|2008-01-04|      20160|           9406|           9824|
|2008-01-05|      17610|           9051|           9345|
|2008-01-06|      19210|          10542|          10705|
|2008-01-07|      19762|           8122|           8683|
|2008-01-08|      19140|           7483|           8938|
|2008-01-09|      19443|           5962|           6857|
|2008-01-10|      19956|           7033|           8565|
|2008-01-11|      19825|           7255|           8164|
|2008-01-12|      16346|           3902|           4078|
|2008-01-13|      18587|           6634|           7473|
|2008-01-14|      19267|           5921|           7104|
|2008-01-15|      19204|           5304|           6389|
|2008-01-16|      19232|           5102|           6228|
|2008-01-17|      19401|           9635|          11229|
|2008-01-18|      20117|          10038|          10860|
|2008-01-19|      15373|           6399|           6810|
|2008-01-20|      18406|           6700|           7005|
|2008-01-21|      19658|          10055|          11032|
|2008-01-22|      18716|           9129|          10669|
|2008-01-23|      19239|           7349|           9324|
|2008-01-24|      19935|           8112|           9972|
|2008-01-25|      19787|           8826|          10479|
|2008-01-26|      15860|           5740|           7163|
|2008-01-27|      18265|           8905|          10331|
|2008-01-28|      19493|           7580|           9013|
|2008-01-29|      18596|           6324|           8370|
|2008-01-30|      19072|           6655|           7814|
|2008-01-31|      19179|           9127|          11304|
+----------+-----------+---------------+---------------+

Sorting Data in descending order by count

from pyspark.sql.functions import lit, concat, lpad, sum, expr, col
airtraffic. \
    groupBy(concat("Year", lit("-"), 
                   lpad("Month", 2, "0"), lit("-"), 
                   lpad("DayOfMonth", 2, "0")).
            alias("FlightDate")). \
    agg(count(lit(1)).alias("FlightCount"),
        sum(expr("CASE WHEN IsDepDelayed = 'YES' THEN 1 ELSE 0 END")).alias("DepDelayedCount"),
        sum(expr("CASE WHEN IsArrDelayed = 'YES' THEN 1 ELSE 0 END")).alias("ArrDelayedCount")
       ). \
    orderBy(col("FlightCount").desc()). \
    show()
+----------+-----------+---------------+---------------+
|FlightDate|FlightCount|DepDelayedCount|ArrDelayedCount|
+----------+-----------+---------------+---------------+
|2008-01-02|      20953|          13805|          14260|
|2008-01-03|      20937|          12294|          12488|
|2008-01-04|      20929|          10175|          10593|
|2008-01-11|      20349|           7779|           8688|
|2008-01-18|      20347|          10268|          11090|
|2008-01-07|      20341|           8701|           9262|
|2008-01-25|      20313|           9352|          11005|
|2008-01-10|      20297|           7374|           8906|
|2008-01-17|      20273|          10507|          12101|
|2008-01-31|      20260|          10208|          12385|
|2008-01-24|      20257|           8434|          10294|
|2008-01-14|      20176|           6830|           8013|
|2008-01-28|      20147|           8234|           9667|
|2008-01-21|      20133|          10530|          11507|
|2008-01-06|      19893|          11225|          11388|
|2008-01-09|      19820|           6339|           7234|
|2008-01-23|      19769|           7879|           9854|
|2008-01-30|      19766|           7349|           8508|
|2008-01-16|      19764|           5634|           6760|
|2008-01-08|      19603|           7946|           9401|
+----------+-----------+---------------+---------------+
only showing top 20 rows