Solutions - Problem 2¶
Get number of flights which are delayed in departure and number of flights delayed in arrival for each day along with number of flights departed for each day.
Output should contain 4 columns - FlightDate, FlightCount, DepDelayedCount, ArrDelayedCount
FlightDate should be of yyyy-MM-dd format.
Data should be sorted in ascending order by flightDate
Let us start spark context for this Notebook so that we can execute the code provided. You can sign up for our 10 node state of the art cluster/labs to learn Spark SQL using our unique integrated LMS.
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession. \
builder. \
config('spark.ui.port', '0'). \
config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
enableHiveSupport(). \
appName(f'{username} | Python - Basic Transformations'). \
master('yarn'). \
getOrCreate()
If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.
Using Spark SQL
spark2-sql \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
Using Scala
spark2-shell \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
Using Pyspark
pyspark2 \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
Reading airtraffic data¶
airtraffic_path = "/public/airtraffic_all/airtraffic-part/flightmonth=200801"
airtraffic = spark. \
read. \
parquet(airtraffic_path)
airtraffic.printSchema()
root
|-- Year: integer (nullable = true)
|-- Month: integer (nullable = true)
|-- DayofMonth: integer (nullable = true)
|-- DayOfWeek: integer (nullable = true)
|-- DepTime: string (nullable = true)
|-- CRSDepTime: integer (nullable = true)
|-- ArrTime: string (nullable = true)
|-- CRSArrTime: integer (nullable = true)
|-- UniqueCarrier: string (nullable = true)
|-- FlightNum: integer (nullable = true)
|-- TailNum: string (nullable = true)
|-- ActualElapsedTime: string (nullable = true)
|-- CRSElapsedTime: integer (nullable = true)
|-- AirTime: string (nullable = true)
|-- ArrDelay: string (nullable = true)
|-- DepDelay: string (nullable = true)
|-- Origin: string (nullable = true)
|-- Dest: string (nullable = true)
|-- Distance: string (nullable = true)
|-- TaxiIn: string (nullable = true)
|-- TaxiOut: string (nullable = true)
|-- Cancelled: integer (nullable = true)
|-- CancellationCode: string (nullable = true)
|-- Diverted: integer (nullable = true)
|-- CarrierDelay: string (nullable = true)
|-- WeatherDelay: string (nullable = true)
|-- NASDelay: string (nullable = true)
|-- SecurityDelay: string (nullable = true)
|-- LateAircraftDelay: string (nullable = true)
|-- IsArrDelayed: string (nullable = true)
|-- IsDepDelayed: string (nullable = true)
Grouping Data by Flight Date¶
from pyspark.sql.functions import lit, concat, lpad
airtraffic. \
groupBy(concat("Year", lit("-"),
lpad("Month", 2, "0"), lit("-"),
lpad("DayOfMonth", 2, "0")).
alias("FlightDate"))
<pyspark.sql.group.GroupedData at 0x7fc369243940>
Getting Counts by FlightDate¶
from pyspark.sql.functions import count
airtraffic. \
filter('Cancelled = 0'). \
groupBy(concat("Year", lit("-"),
lpad("Month", 2, "0"), lit("-"),
lpad("DayOfMonth", 2, "0")).
alias("FlightDate")). \
agg(count(lit(1)).alias("FlightCount")). \
show(31)
+----------+-----------+
|FlightDate|FlightCount|
+----------+-----------+
|2008-01-15| 19204|
|2008-01-21| 19658|
|2008-01-11| 19825|
|2008-01-19| 15373|
|2008-01-02| 20442|
|2008-01-06| 19210|
|2008-01-29| 18596|
|2008-01-30| 19072|
|2008-01-17| 19401|
|2008-01-31| 19179|
|2008-01-01| 18623|
|2008-01-24| 19935|
|2008-01-07| 19762|
|2008-01-09| 19443|
|2008-01-04| 20160|
|2008-01-08| 19140|
|2008-01-05| 17610|
|2008-01-25| 19787|
|2008-01-26| 15860|
|2008-01-12| 16346|
|2008-01-16| 19232|
|2008-01-23| 19239|
|2008-01-18| 20117|
|2008-01-22| 18716|
|2008-01-28| 19493|
|2008-01-13| 18587|
|2008-01-10| 19956|
|2008-01-14| 19267|
|2008-01-20| 18406|
|2008-01-27| 18265|
|2008-01-03| 20462|
+----------+-----------+
# Alternative to get the count with out using agg
# We will not be able to provide alias for aggregated fields
airtraffic. \
filter('Cancelled = 0'). \
groupBy(concat("Year", lit("-"),
lpad("Month", 2, "0"), lit("-"),
lpad("DayOfMonth", 2, "0")).
alias("FlightDate")). \
count(). \
show()
+----------+-----+
|FlightDate|count|
+----------+-----+
|2008-01-15|19204|
|2008-01-21|19658|
|2008-01-11|19825|
|2008-01-19|15373|
|2008-01-02|20442|
|2008-01-06|19210|
|2008-01-29|18596|
|2008-01-30|19072|
|2008-01-17|19401|
|2008-01-31|19179|
|2008-01-01|18623|
|2008-01-24|19935|
|2008-01-07|19762|
|2008-01-09|19443|
|2008-01-04|20160|
|2008-01-08|19140|
|2008-01-05|17610|
|2008-01-25|19787|
|2008-01-26|15860|
|2008-01-12|16346|
+----------+-----+
only showing top 20 rows
Getting total as well as delayed counts for each day¶
from pyspark.sql.functions import sum, expr
airtraffic. \
filter('Cancelled = 0'). \
groupBy(concat("Year", lit("-"),
lpad("Month", 2, "0"), lit("-"),
lpad("DayOfMonth", 2, "0")).
alias("FlightDate")). \
agg(count(lit(1)).alias("FlightCount"),
sum(expr("CASE WHEN IsDepDelayed = 'YES' THEN 1 ELSE 0 END")).alias("DepDelayedCount"),
sum(expr("CASE WHEN IsArrDelayed = 'YES' THEN 1 ELSE 0 END")).alias("ArrDelayedCount")
). \
show()
+----------+-----------+---------------+---------------+
|FlightDate|FlightCount|DepDelayedCount|ArrDelayedCount|
+----------+-----------+---------------+---------------+
|2008-01-15| 19204| 5304| 6389|
|2008-01-21| 19658| 10055| 11032|
|2008-01-11| 19825| 7255| 8164|
|2008-01-19| 15373| 6399| 6810|
|2008-01-02| 20442| 13294| 13749|
|2008-01-06| 19210| 10542| 10705|
|2008-01-29| 18596| 6324| 8370|
|2008-01-30| 19072| 6655| 7814|
|2008-01-17| 19401| 9635| 11229|
|2008-01-31| 19179| 9127| 11304|
|2008-01-01| 18623| 10501| 11173|
|2008-01-24| 19935| 8112| 9972|
|2008-01-07| 19762| 8122| 8683|
|2008-01-09| 19443| 5962| 6857|
|2008-01-04| 20160| 9406| 9824|
|2008-01-08| 19140| 7483| 8938|
|2008-01-05| 17610| 9051| 9345|
|2008-01-25| 19787| 8826| 10479|
|2008-01-26| 15860| 5740| 7163|
|2008-01-12| 16346| 3902| 4078|
+----------+-----------+---------------+---------------+
only showing top 20 rows
Sorting Data By FlightDate¶
help(airtraffic.sort)
Help on method sort in module pyspark.sql.dataframe:
sort(*cols, **kwargs) method of pyspark.sql.dataframe.DataFrame instance
Returns a new :class:`DataFrame` sorted by the specified column(s).
:param cols: list of :class:`Column` or column names to sort by.
:param ascending: boolean or list of boolean (default ``True``).
Sort ascending vs. descending. Specify list for multiple sort orders.
If a list is specified, length of the list must equal length of the `cols`.
>>> df.sort(df.age.desc()).collect()
[Row(age=5, name='Bob'), Row(age=2, name='Alice')]
>>> df.sort("age", ascending=False).collect()
[Row(age=5, name='Bob'), Row(age=2, name='Alice')]
>>> df.orderBy(df.age.desc()).collect()
[Row(age=5, name='Bob'), Row(age=2, name='Alice')]
>>> from pyspark.sql.functions import *
>>> df.sort(asc("age")).collect()
[Row(age=2, name='Alice'), Row(age=5, name='Bob')]
>>> df.orderBy(desc("age"), "name").collect()
[Row(age=5, name='Bob'), Row(age=2, name='Alice')]
>>> df.orderBy(["age", "name"], ascending=[0, 1]).collect()
[Row(age=5, name='Bob'), Row(age=2, name='Alice')]
.. versionadded:: 1.3
help(airtraffic.orderBy)
Help on method sort in module pyspark.sql.dataframe:
sort(*cols, **kwargs) method of pyspark.sql.dataframe.DataFrame instance
Returns a new :class:`DataFrame` sorted by the specified column(s).
:param cols: list of :class:`Column` or column names to sort by.
:param ascending: boolean or list of boolean (default ``True``).
Sort ascending vs. descending. Specify list for multiple sort orders.
If a list is specified, length of the list must equal length of the `cols`.
>>> df.sort(df.age.desc()).collect()
[Row(age=5, name='Bob'), Row(age=2, name='Alice')]
>>> df.sort("age", ascending=False).collect()
[Row(age=5, name='Bob'), Row(age=2, name='Alice')]
>>> df.orderBy(df.age.desc()).collect()
[Row(age=5, name='Bob'), Row(age=2, name='Alice')]
>>> from pyspark.sql.functions import *
>>> df.sort(asc("age")).collect()
[Row(age=2, name='Alice'), Row(age=5, name='Bob')]
>>> df.orderBy(desc("age"), "name").collect()
[Row(age=5, name='Bob'), Row(age=2, name='Alice')]
>>> df.orderBy(["age", "name"], ascending=[0, 1]).collect()
[Row(age=5, name='Bob'), Row(age=2, name='Alice')]
.. versionadded:: 1.3
airtraffic. \
filter('Cancelled = 0'). \
groupBy(concat("Year", lit("-"),
lpad("Month", 2, "0"), lit("-"),
lpad("DayOfMonth", 2, "0")).
alias("FlightDate")). \
agg(count(lit(1)).alias("FlightCount"),
sum(expr("CASE WHEN IsDepDelayed = 'YES' THEN 1 ELSE 0 END")).alias("DepDelayedCount"),
sum(expr("CASE WHEN IsArrDelayed = 'YES' THEN 1 ELSE 0 END")).alias("ArrDelayedCount")
). \
orderBy("FlightDate"). \
show(31)
+----------+-----------+---------------+---------------+
|FlightDate|FlightCount|DepDelayedCount|ArrDelayedCount|
+----------+-----------+---------------+---------------+
|2008-01-01| 18623| 10501| 11173|
|2008-01-02| 20442| 13294| 13749|
|2008-01-03| 20462| 11819| 12013|
|2008-01-04| 20160| 9406| 9824|
|2008-01-05| 17610| 9051| 9345|
|2008-01-06| 19210| 10542| 10705|
|2008-01-07| 19762| 8122| 8683|
|2008-01-08| 19140| 7483| 8938|
|2008-01-09| 19443| 5962| 6857|
|2008-01-10| 19956| 7033| 8565|
|2008-01-11| 19825| 7255| 8164|
|2008-01-12| 16346| 3902| 4078|
|2008-01-13| 18587| 6634| 7473|
|2008-01-14| 19267| 5921| 7104|
|2008-01-15| 19204| 5304| 6389|
|2008-01-16| 19232| 5102| 6228|
|2008-01-17| 19401| 9635| 11229|
|2008-01-18| 20117| 10038| 10860|
|2008-01-19| 15373| 6399| 6810|
|2008-01-20| 18406| 6700| 7005|
|2008-01-21| 19658| 10055| 11032|
|2008-01-22| 18716| 9129| 10669|
|2008-01-23| 19239| 7349| 9324|
|2008-01-24| 19935| 8112| 9972|
|2008-01-25| 19787| 8826| 10479|
|2008-01-26| 15860| 5740| 7163|
|2008-01-27| 18265| 8905| 10331|
|2008-01-28| 19493| 7580| 9013|
|2008-01-29| 18596| 6324| 8370|
|2008-01-30| 19072| 6655| 7814|
|2008-01-31| 19179| 9127| 11304|
+----------+-----------+---------------+---------------+
Sorting Data in descending order by count¶
from pyspark.sql.functions import lit, concat, lpad, sum, expr, col
airtraffic. \
groupBy(concat("Year", lit("-"),
lpad("Month", 2, "0"), lit("-"),
lpad("DayOfMonth", 2, "0")).
alias("FlightDate")). \
agg(count(lit(1)).alias("FlightCount"),
sum(expr("CASE WHEN IsDepDelayed = 'YES' THEN 1 ELSE 0 END")).alias("DepDelayedCount"),
sum(expr("CASE WHEN IsArrDelayed = 'YES' THEN 1 ELSE 0 END")).alias("ArrDelayedCount")
). \
orderBy(col("FlightCount").desc()). \
show()
+----------+-----------+---------------+---------------+
|FlightDate|FlightCount|DepDelayedCount|ArrDelayedCount|
+----------+-----------+---------------+---------------+
|2008-01-02| 20953| 13805| 14260|
|2008-01-03| 20937| 12294| 12488|
|2008-01-04| 20929| 10175| 10593|
|2008-01-11| 20349| 7779| 8688|
|2008-01-18| 20347| 10268| 11090|
|2008-01-07| 20341| 8701| 9262|
|2008-01-25| 20313| 9352| 11005|
|2008-01-10| 20297| 7374| 8906|
|2008-01-17| 20273| 10507| 12101|
|2008-01-31| 20260| 10208| 12385|
|2008-01-24| 20257| 8434| 10294|
|2008-01-14| 20176| 6830| 8013|
|2008-01-28| 20147| 8234| 9667|
|2008-01-21| 20133| 10530| 11507|
|2008-01-06| 19893| 11225| 11388|
|2008-01-09| 19820| 6339| 7234|
|2008-01-23| 19769| 7879| 9854|
|2008-01-30| 19766| 7349| 8508|
|2008-01-16| 19764| 5634| 6760|
|2008-01-08| 19603| 7946| 9401|
+----------+-----------+---------------+---------------+
only showing top 20 rows