## Solutions - Problem 1
Get total number of flights as well as number of flights which are delayed in departure and number of flights delayed in arrival. 
* Output should contain 3 columns - **FlightCount**, **DepDelayedCount**, **ArrDelayedCount**

Let us start spark context for this Notebook so that we can execute the code provided. You can sign up for our [10 node state of the art cluster/labs](https://labs.itversity.com/plans) to learn Spark SQL using our unique integrated LMS.

In [1]:
from pyspark.sql import SparkSession

import getpass
username = getpass.getuser()

spark = SparkSession. \
    builder. \
    config('spark.ui.port', '0'). \
    config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
    enableHiveSupport(). \
    appName(f'{username} | Python - Basic Transformations'). \
    master('yarn'). \
    getOrCreate()

If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.

**Using Spark SQL**

```
spark2-sql \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
```

**Using Scala**

```
spark2-shell \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
```

**Using Pyspark**

```
pyspark2 \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
```

### Reading airtraffic data

In [2]:
airtraffic_path = "/public/airtraffic_all/airtraffic-part/flightmonth=200801"

In [3]:
airtraffic = spark. \
    read. \
    parquet(airtraffic_path)

In [4]:
airtraffic.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- CRSElapsedTime: integer (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: string (nullable = true)
 |-- TaxiIn: string (nullable = true)
 |-- TaxiOut: string (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: integer (nullable = true)
 |-- Car

### Get flights with delayed arrival

In [5]:
# SQL Style
airtraffic.filter("IsArrDelayed = 'YES' AND Cancelled = 0").show()

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+------------+------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|IsArrDelayed|IsDepDelayed|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+------------+------------+
|2008|    1|    

In [6]:
# Data Frame Style
airtraffic.filter((airtraffic["IsArrDelayed"] == 'YES') & (airtraffic["Cancelled"] == 0)).show()

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+------------+------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|IsArrDelayed|IsDepDelayed|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+------------+------------+
|2008|    1|    

In [7]:
airtraffic.filter((airtraffic.IsArrDelayed == 'YES') & (airtraffic.Cancelled == 0)).show()

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+------------+------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|IsArrDelayed|IsDepDelayed|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+------------+------------+
|2008|    1|    

### Get delayed counts

In [8]:
airtraffic. \
    select('IsDepDelayed', 'IsArrDelayed', 'Cancelled'). \
    distinct(). \
    show()

+------------+------------+---------+
|IsDepDelayed|IsArrDelayed|Cancelled|
+------------+------------+---------+
|          NO|          NO|        0|
|         YES|         YES|        1|
|          NO|         YES|        0|
|         YES|          NO|        0|
|         YES|         YES|        0|
+------------+------------+---------+



In [9]:
## Departure Delayed Count
airtraffic. \
    filter(airtraffic.IsDepDelayed == "YES"). \
    count()

265198

In [10]:
## Departure Delayed Count
airtraffic. \
    filter((airtraffic.IsDepDelayed == "YES") & (airtraffic.Cancelled == 0)). \
    count()

247905

In [11]:
## Arrival Delayed Count
airtraffic. \
    filter(airtraffic.IsArrDelayed == "YES"). \
    count()

297956

In [12]:
## Arrival Delayed Count
airtraffic. \
    filter((airtraffic.IsArrDelayed == "YES") & (airtraffic.Cancelled == 0)). \
    count()

280663

In [13]:
airtraffic. \
    filter("(IsDepDelayed = 'YES' OR IsArrDelayed = 'YES') AND Cancelled = 0"). \
    select('Year', 'Month', 'DayOfMonth', 
           'FlightNum', 'IsDepDelayed', 'IsArrDelayed'
          ). \
    show()

+----+-----+----------+---------+------------+------------+
|Year|Month|DayOfMonth|FlightNum|IsDepDelayed|IsArrDelayed|
+----+-----+----------+---------+------------+------------+
|2008|    1|        17|     4977|         YES|         YES|
|2008|    1|        17|     5426|          NO|         YES|
|2008|    1|        18|     5260|         YES|         YES|
|2008|    1|        19|     5276|          NO|         YES|
|2008|    1|        20|     5215|          NO|         YES|
|2008|    1|        20|     5324|          NO|         YES|
|2008|    1|        21|     5056|          NO|         YES|
|2008|    1|        21|     5215|         YES|          NO|
|2008|    1|        21|     5595|         YES|         YES|
|2008|    1|        21|     5610|          NO|         YES|
|2008|    1|        22|     5032|         YES|         YES|
|2008|    1|        22|     5331|          NO|         YES|
|2008|    1|        23|     5033|          NO|         YES|
|2008|    1|        23|     5355|       

In [14]:
from pyspark.sql.functions import col, lit, count, sum, expr

In [15]:
## Both Departure Delayed and Arrival Delayed
airtraffic. \
    filter('Cancelled = 0'). \
    agg(count(lit(1)).alias("FlightCount"),
        sum(expr("CASE WHEN IsDepDelayed = 'YES' THEN 1 ELSE 0 END")).alias("DepDelayedCount"),
        sum(expr("CASE WHEN IsArrDelayed = 'YES' THEN 1 ELSE 0 END")).alias("ArrDelayedCount")
       ). \
    show()

+-----------+---------------+---------------+
|FlightCount|DepDelayedCount|ArrDelayedCount|
+-----------+---------------+---------------+
|     588366|         247905|         280663|
+-----------+---------------+---------------+



In [16]:
from pyspark.sql.functions import when

In [18]:
## Both Departure Delayed and Arrival Delayed
airtraffic. \
    filter('Cancelled = 0'). \
    agg(count(lit(1)).alias("FlightCount"),
        sum(when(col('IsDepDelayed') == 'YES', 1).otherwise(lit(0))).alias("DepDelayedCount"),
        sum(when(col('IsArrDelayed') == lit('YES'), 1).otherwise(lit(0))).alias("ArrDelayedCount")
       ). \
    show()

+-----------+---------------+---------------+
|FlightCount|DepDelayedCount|ArrDelayedCount|
+-----------+---------------+---------------+
|     588366|         247905|         280663|
+-----------+---------------+---------------+

