## Solutions - Problem 3
Get all the flights which are departed late but arrived early or on time (**IsArrDelayed is NO**).
* Output should contain - **FlightCRSDepTime**, **UniqueCarrier**, **FlightNum**, **Origin**, **Dest**, **DepDelay**, **ArrDelay**
* **FlightCRSDepTime** need to be computed using **Year**, **Month**, **DayOfMonth**, **CRSDepTime**
* **FlightCRSDepTime** should be displayed using **yyyy-MM-dd HH:mm** format.
* Output should be sorted by **FlightCRSDepTime** and then by the difference between **DepDelay** and **ArrDelay**
* Also get the count of such flights


Let us start spark context for this Notebook so that we can execute the code provided. You can sign up for our [10 node state of the art cluster/labs](https://labs.itversity.com/plans) to learn Spark SQL using our unique integrated LMS.

In [1]:
from pyspark.sql import SparkSession

import getpass
username = getpass.getuser()

spark = SparkSession. \
    builder. \
    config('spark.ui.port', '0'). \
    config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
    enableHiveSupport(). \
    appName(f'{username} | Python - Basic Transformations'). \
    master('yarn'). \
    getOrCreate()

If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.

**Using Spark SQL**

```
spark2-sql \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
```

**Using Scala**

```
spark2-shell \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
```

**Using Pyspark**

```
pyspark2 \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
```

### Reading airtraffic data

In [2]:
airtraffic_path = "/public/airtraffic_all/airtraffic-part/flightmonth=200801"

In [3]:
airtraffic = spark. \
    read. \
    parquet(airtraffic_path)

In [4]:
airtraffic.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- CRSElapsedTime: integer (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: string (nullable = true)
 |-- TaxiIn: string (nullable = true)
 |-- TaxiOut: string (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: integer (nullable = true)
 |-- Car

In [5]:
airtraffic.select('Year', 'Month', 'DayOfMonth', 'CRSDepTime').show()

+----+-----+----------+----------+
|Year|Month|DayOfMonth|CRSDepTime|
+----+-----+----------+----------+
|2008|    1|        16|      1735|
|2008|    1|        17|      1701|
|2008|    1|        17|      1225|
|2008|    1|        17|      1530|
|2008|    1|        17|      1205|
|2008|    1|        18|      1150|
|2008|    1|        18|      1009|
|2008|    1|        19|       835|
|2008|    1|        20|      1935|
|2008|    1|        20|       830|
|2008|    1|        21|      1640|
|2008|    1|        21|      1204|
|2008|    1|        21|      1935|
|2008|    1|        21|      1830|
|2008|    1|        21|       700|
|2008|    1|        22|      1910|
|2008|    1|        22|      1320|
|2008|    1|        23|       908|
|2008|    1|        23|      1252|
|2008|    1|        23|       635|
+----+-----+----------+----------+
only showing top 20 rows



In [6]:
l = [(2008, 1, 23, 700),
     (2008, 1, 10, 1855),
    ]

In [7]:
df = spark.createDataFrame(l, "Year INT, Month INT, DayOfMonth INT, DepTime INT")

In [8]:
df.show()

+----+-----+----------+-------+
|Year|Month|DayOfMonth|DepTime|
+----+-----+----------+-------+
|2008|    1|        23|    700|
|2008|    1|        10|   1855|
+----+-----+----------+-------+



In [9]:
from pyspark.sql.functions import col, date_format, lpad

In [10]:
df.select("DepTime", lpad('DepTime', 4, "0").alias('DepTime')).show()

+-------+-------+
|DepTime|DepTime|
+-------+-------+
|    700|   0700|
|   1855|   1855|
+-------+-------+



In [11]:
from pyspark.sql.functions import substring, concat, lit

In [12]:
df. \
    withColumn(
        'FlightCRSDepTime', 
        concat(
            "Year", lit("-"), 
            lpad("Month", 2, "0"), lit("-"), 
            lpad("DayOfMonth", 2, "0"), lit(" "),
            concat(
                substring(lpad("DepTime", 4, "0"), 1, 2), 
                lit(':'), 
                substring(lpad("DepTime", 4, "0"), 3, 2)
            )
        )
    ). \
    show()

+----+-----+----------+-------+----------------+
|Year|Month|DayOfMonth|DepTime|FlightCRSDepTime|
+----+-----+----------+-------+----------------+
|2008|    1|        23|    700|2008-01-23 07:00|
|2008|    1|        10|   1855|2008-01-10 18:55|
+----+-----+----------+-------+----------------+



In [13]:
from pyspark.sql.functions import lit, col, concat, lpad, sum, expr, substring

In [14]:
flightsFiltered = airtraffic. \
    filter("IsDepDelayed = 'YES' AND IsArrDelayed = 'NO' AND Cancelled = 0"). \
    select(
        concat(
            "Year", lit("-"), 
            lpad("Month", 2, "0"), lit("-"), 
            lpad("DayOfMonth", 2, "0"), lit(" "),
            concat(
                substring(lpad("CRSDepTime", 4, "0"), 1, 2), lit(':'), 
                substring(lpad("CRSDepTime", 4, "0"), 3, 2)
            )
        ).alias("FlightCRSDepTime"),
        "UniqueCarrier", "FlightNum", "Origin", 
        "Dest", "DepDelay", "ArrDelay"
    ). \
    orderBy("FlightCRSDepTime", col("DepDelay") - col("ArrDelay")). \
    show()

+----------------+-------------+---------+------+----+--------+--------+
|FlightCRSDepTime|UniqueCarrier|FlightNum|Origin|Dest|DepDelay|ArrDelay|
+----------------+-------------+---------+------+----+--------+--------+
|2008-01-01 00:55|           DL|      560|   LAX| ATL|      15|      -4|
|2008-01-01 01:00|           AA|     2466|   SFO| DFW|       2|      -7|
|2008-01-01 03:00|           CO|      488|   SJU| EWR|       3|     -12|
|2008-01-01 04:00|           B6|      724|   BQN| MCO|       4|     -23|
|2008-01-01 05:15|           XE|     2140|   SLC| IAH|       1|     -10|
|2008-01-01 05:25|           XE|     2429|   MSP| IAH|       1|      -4|
|2008-01-01 05:30|           XE|     2785|   BTR| IAH|       6|       0|
|2008-01-01 05:40|           OO|     5793|   CIC| SFO|       1|      -4|
|2008-01-01 05:40|           UA|      422|   SEA| DEN|       4|     -10|
|2008-01-01 06:00|           DL|      970|   FLL| ATL|       2|       0|
|2008-01-01 06:00|           AA|     1734|   SAT| D

### Getting Count

In [15]:
flightsFiltered = airtraffic. \
    filter("IsDepDelayed = 'YES' AND IsArrDelayed = 'NO' AND Cancelled = 0"). \
    select(concat("Year", lit("-"), 
                  lpad("Month", 2, "0"), lit("-"), 
                  lpad("DayOfMonth", 2, "0"), lit(" "),
                  lpad("CRSDepTime", 4, "0")
                 ).alias("FlightCRSDepTime"),
           "UniqueCarrier", "FlightNum", "Origin", 
           "Dest", "DepDelay", "ArrDelay"
          ). \
    count()

In [16]:
airtraffic.count()

605659

In [17]:
airtraffic.filter("IsDepDelayed = 'NO' AND Cancelled = 0").count()

340461

In [18]:
flightsFiltered

54233