Solutions - Problem 3

Get all the flights which are departed late but arrived early or on time (IsArrDelayed is NO).

  • Output should contain - FlightCRSDepTime, UniqueCarrier, FlightNum, Origin, Dest, DepDelay, ArrDelay

  • FlightCRSDepTime need to be computed using Year, Month, DayOfMonth, CRSDepTime

  • FlightCRSDepTime should be displayed using yyyy-MM-dd HH:mm format.

  • Output should be sorted by FlightCRSDepTime and then by the difference between DepDelay and ArrDelay

  • Also get the count of such flights

from pyspark.sql import SparkSession

import getpass
username = getpass.getuser()

spark = SparkSession. \
    builder. \
    config('spark.ui.port', '0'). \
    config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
    enableHiveSupport(). \
    appName(f'{username} | Python - Basic Transformations'). \
    master('yarn'). \

Reading airtraffic data

airtraffic_path = "/public/airtraffic_all/airtraffic-part/flightmonth=200801"
airtraffic = spark. \
    read. \
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- CRSElapsedTime: integer (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: string (nullable = true)
 |-- TaxiIn: string (nullable = true)
 |-- TaxiOut: string (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: integer (nullable = true)
 |-- CarrierDelay: string (nullable = true)
 |-- WeatherDelay: string (nullable = true)
 |-- NASDelay: string (nullable = true)
 |-- SecurityDelay: string (nullable = true)
 |-- LateAircraftDelay: string (nullable = true)
 |-- IsArrDelayed: string (nullable = true)
 |-- IsDepDelayed: string (nullable = true)'Year', 'Month', 'DayOfMonth', 'CRSDepTime').show()
|2008|    1|        16|      1735|
|2008|    1|        17|      1701|
|2008|    1|        17|      1225|
|2008|    1|        17|      1530|
|2008|    1|        17|      1205|
|2008|    1|        18|      1150|
|2008|    1|        18|      1009|
|2008|    1|        19|       835|
|2008|    1|        20|      1935|
|2008|    1|        20|       830|
|2008|    1|        21|      1640|
|2008|    1|        21|      1204|
|2008|    1|        21|      1935|
|2008|    1|        21|      1830|
|2008|    1|        21|       700|
|2008|    1|        22|      1910|
|2008|    1|        22|      1320|
|2008|    1|        23|       908|
|2008|    1|        23|      1252|
|2008|    1|        23|       635|
only showing top 20 rows
l = [(2008, 1, 23, 700),
     (2008, 1, 10, 1855),
df = spark.createDataFrame(l, "Year INT, Month INT, DayOfMonth INT, DepTime INT")
|2008|    1|        23|    700|
|2008|    1|        10|   1855|
from pyspark.sql.functions import col, date_format, lpad"DepTime", lpad('DepTime', 4, "0").alias('DepTime')).show()
|    700|   0700|
|   1855|   1855|
from pyspark.sql.functions import substring, concat, lit
df. \
            "Year", lit("-"), 
            lpad("Month", 2, "0"), lit("-"), 
            lpad("DayOfMonth", 2, "0"), lit(" "),
                substring(lpad("DepTime", 4, "0"), 1, 2), 
                substring(lpad("DepTime", 4, "0"), 3, 2)
    ). \
|2008|    1|        23|    700|2008-01-23 07:00|
|2008|    1|        10|   1855|2008-01-10 18:55|
from pyspark.sql.functions import lit, col, concat, lpad, sum, expr, substring
flightsFiltered = airtraffic. \
    filter("IsDepDelayed = 'YES' AND IsArrDelayed = 'NO' AND Cancelled = 0"). \
            "Year", lit("-"), 
            lpad("Month", 2, "0"), lit("-"), 
            lpad("DayOfMonth", 2, "0"), lit(" "),
                substring(lpad("CRSDepTime", 4, "0"), 1, 2), lit(':'), 
                substring(lpad("CRSDepTime", 4, "0"), 3, 2)
        "UniqueCarrier", "FlightNum", "Origin", 
        "Dest", "DepDelay", "ArrDelay"
    ). \
    orderBy("FlightCRSDepTime", col("DepDelay") - col("ArrDelay")). \
|2008-01-01 00:55|           DL|      560|   LAX| ATL|      15|      -4|
|2008-01-01 01:00|           AA|     2466|   SFO| DFW|       2|      -7|
|2008-01-01 03:00|           CO|      488|   SJU| EWR|       3|     -12|
|2008-01-01 04:00|           B6|      724|   BQN| MCO|       4|     -23|
|2008-01-01 05:15|           XE|     2140|   SLC| IAH|       1|     -10|
|2008-01-01 05:25|           XE|     2429|   MSP| IAH|       1|      -4|
|2008-01-01 05:30|           XE|     2785|   BTR| IAH|       6|       0|
|2008-01-01 05:40|           OO|     5793|   CIC| SFO|       1|      -4|
|2008-01-01 05:40|           UA|      422|   SEA| DEN|       4|     -10|
|2008-01-01 06:00|           DL|      970|   FLL| ATL|       2|       0|
|2008-01-01 06:00|           AA|     1734|   SAT| DFW|       2|      -1|
|2008-01-01 06:00|           MQ|     3191|   LAX| SJC|       2|      -3|
|2008-01-01 06:00|           UA|     1294|   PDX| DEN|       2|      -3|
|2008-01-01 06:00|           UA|      265|   ORD| DEN|       2|      -5|
|2008-01-01 06:00|           AA|     1157|   AUS| DFW|       1|      -6|
|2008-01-01 06:00|           AQ|      441|   LAS| OAK|       7|       0|
|2008-01-01 06:00|           AA|      802|   LAX| DFW|       1|      -7|
|2008-01-01 06:00|           EV|     4484|   DSM| ATL|       3|      -7|
|2008-01-01 06:00|           AA|     1150|   LAS| DFW|       3|      -7|
|2008-01-01 06:00|           OO|     1985|   MKE| ATL|       2|     -10|
only showing top 20 rows

Getting Count

flightsFiltered = airtraffic. \
    filter("IsDepDelayed = 'YES' AND IsArrDelayed = 'NO' AND Cancelled = 0"). \
    select(concat("Year", lit("-"), 
                  lpad("Month", 2, "0"), lit("-"), 
                  lpad("DayOfMonth", 2, "0"), lit(" "),
                  lpad("CRSDepTime", 4, "0")
           "UniqueCarrier", "FlightNum", "Origin", 
           "Dest", "DepDelay", "ArrDelay"
          ). \
airtraffic.filter("IsDepDelayed = 'NO' AND Cancelled = 0").count()