Analyze Datasets for JoinsΒΆ
Let us analyze both January 2008 airtraffic as well as airport codes data sets that are going to be used for joins.
We will use January 2008 airtraffic data which have all relevant flight details such as departure, arrival etc.
As part of the analysis, we also might want additional metadata such as city, state etc for airports. We got that information as part of airport codes data set.
Let us read and review both January 2008 airtraffic data as well as airport codes data set.
Let us start spark context for this Notebook so that we can execute the code provided. You can sign up for our 10 node state of the art cluster/labs to learn Spark SQL using our unique integrated LMS.
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession. \
builder. \
config('spark.ui.port', '0'). \
config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
enableHiveSupport(). \
appName(f'{username} | Python - Joining Data Sets'). \
master('yarn'). \
getOrCreate()
If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.
Using Spark SQL
spark2-sql \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
Using Scala
spark2-shell \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
Using Pyspark
pyspark2 \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
spark.conf.set("spark.sql.shuffle.partitions", "2")
airtraffic = spark. \
read. \
parquet("/public/airtraffic_all/airtraffic-part/flightmonth=200801")
airtraffic.printSchema()
root
|-- Year: integer (nullable = true)
|-- Month: integer (nullable = true)
|-- DayofMonth: integer (nullable = true)
|-- DayOfWeek: integer (nullable = true)
|-- DepTime: string (nullable = true)
|-- CRSDepTime: integer (nullable = true)
|-- ArrTime: string (nullable = true)
|-- CRSArrTime: integer (nullable = true)
|-- UniqueCarrier: string (nullable = true)
|-- FlightNum: integer (nullable = true)
|-- TailNum: string (nullable = true)
|-- ActualElapsedTime: string (nullable = true)
|-- CRSElapsedTime: integer (nullable = true)
|-- AirTime: string (nullable = true)
|-- ArrDelay: string (nullable = true)
|-- DepDelay: string (nullable = true)
|-- Origin: string (nullable = true)
|-- Dest: string (nullable = true)
|-- Distance: string (nullable = true)
|-- TaxiIn: string (nullable = true)
|-- TaxiOut: string (nullable = true)
|-- Cancelled: integer (nullable = true)
|-- CancellationCode: string (nullable = true)
|-- Diverted: integer (nullable = true)
|-- CarrierDelay: string (nullable = true)
|-- WeatherDelay: string (nullable = true)
|-- NASDelay: string (nullable = true)
|-- SecurityDelay: string (nullable = true)
|-- LateAircraftDelay: string (nullable = true)
|-- IsArrDelayed: string (nullable = true)
|-- IsDepDelayed: string (nullable = true)
airtraffic.show()
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+------------+------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|IsArrDelayed|IsDepDelayed|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+------------+------------+
|2008| 1| 16| 3| 1725| 1735| 1959| 2021| OH| 5367| N716CA| 154| 166| 146| -22| -10| BGR| CVG| 906| 1| 7| 0| null| 0| NA| NA| NA| NA| NA| NO| NO|
|2008| 1| 17| 4| 1717| 1701| 1915| 1855| OH| 4977| N967CA| 118| 114| 101| 20| 16| SYR| CVG| 527| 2| 15| 0| null| 0| 16| 0| 4| 0| 0| YES| YES|
|2008| 1| 17| 4| 1220| 1225| 1440| 1504| OH| 5352| N709CA| 140| 159| 117| -24| -5| SAV| BOS| 901| 8| 15| 0| null| 0| NA| NA| NA| NA| NA| NO| NO|
|2008| 1| 17| 4| 1530| 1530| 1645| 1637| OH| 5426| N779CA| 75| 67| 45| 8| 0| CVG| GRR| 268| 5| 25| 0| null| 0| NA| NA| NA| NA| NA| YES| NO|
|2008| 1| 17| 4| 1203| 1205| 1429| 1429| OH| 5441| N809CA| 86| 84| 58| 0| -2| STL| CVG| 307| 3| 25| 0| null| 0| NA| NA| NA| NA| NA| NO| NO|
|2008| 1| 18| 5| 1150| 1150| 1457| 1524| OH| 5220| N436CA| 127| 154| 102| -27| 0| STL| JFK| 892| 4| 21| 0| null| 0| NA| NA| NA| NA| NA| NO| NO|
|2008| 1| 18| 5| 1215| 1009| 1540| 1251| OH| 5260| N446CA| 145| 102| 140| 169| 126| MCI| CVG| 539| 2| 3| 0| null| 0| 126| 0| 43| 0| 0| YES| YES|
|2008| 1| 19| 6| 835| 835| 1145| 1130| OH| 5276| N523CA| 130| 115| 83| 15| 0| TUL| CVG| 646| 4| 43| 0| null| 0| 0| 0| 15| 0| 0| YES| NO|
|2008| 1| 20| 7| 1925| 1935| 2148| 2124| OH| 5215| N729CA| 143| 109| 34| 24| -10| JFK| PHL| 94| 5| 104| 0| null| 0| 0| 0| 24| 0| 0| YES| NO|
|2008| 1| 20| 7| 825| 830| 1045| 1007| OH| 5324| N933CA| 140| 97| 92| 38| -5| RDU| CVG| 390| 1| 47| 0| null| 0| 0| 0| 38| 0| 0| YES| NO|
|2008| 1| 21| 1| 1638| 1640| 1741| 1754| OH| 4984| N719CA| 63| 74| 45| -13| -2| CVG| DTW| 229| 10| 8| 0| null| 0| NA| NA| NA| NA| NA| NO| NO|
|2008| 1| 21| 1| 1200| 1204| 1600| 1559| OH| 5056| N371CA| 180| 175| 147| 1| -4| MSY| LGA| 1183| 13| 20| 0| null| 0| NA| NA| NA| NA| NA| YES| NO|
|2008| 1| 21| 1| 1950| 1935| 2103| 2124| OH| 5215| N965CA| 73| 109| 29| -21| 15| JFK| PHL| 94| 8| 36| 0| null| 0| NA| NA| NA| NA| NA| NO| YES|
|2008| 1| 21| 1| 1950| 1830| 2225| 2007| OH| 5595| N641CA| 155| 97| 136| 138| 80| DCA| JFK| 213| 10| 9| 0| null| 0| 0| 80| 58| 0| 0| YES| YES|
|2008| 1| 21| 1| 700| 700| 955| 950| OH| 5610| N964CA| 115| 110| 87| 5| 0| HSV| DCA| 613| 8| 20| 0| null| 0| NA| NA| NA| NA| NA| YES| NO|
|2008| 1| 22| 2| 2020| 1910| 2223| 2125| OH| 5032| N538CA| 63| 75| 44| 58| 70| ORD| CVG| 264| 1| 18| 0| null| 0| 0| 58| 0| 0| 0| YES| YES|
|2008| 1| 22| 2| 1320| 1320| 1600| 1528| OH| 5331| N805CA| 160| 128| 102| 32| 0| CVG| JFK| 589| 12| 46| 0| null| 0| 0| 0| 32| 0| 0| YES| NO|
|2008| 1| 23| 3| 908| 908| 1216| 1149| OH| 5033| N963CA| 188| 161| 124| 27| 0| LGA| SAV| 722| 6| 58| 0| null| 0| 0| 0| 27| 0| 0| YES| NO|
|2008| 1| 23| 3| 1245| 1252| 1409| 1430| OH| 5050| N427CA| 84| 98| 73| -21| -7| CLT| CVG| 335| 1| 10| 0| null| 0| NA| NA| NA| NA| NA| NO| NO|
|2008| 1| 23| 3| 630| 635| 840| 831| OH| 5355| N926CA| 130| 116| 85| 9| -5| GSP| LGA| 610| 20| 25| 0| null| 0| NA| NA| NA| NA| NA| YES| NO|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+------------+------------+
only showing top 20 rows
We will be using another data set to get details about airports. Details include information such as State, City etc for a given airport code.
Let us analyze the Dataset to confirm if there is header and also how the data is structured.
airportCodesPath = "/public/airtraffic_all/airport-codes"
spark. \
read. \
text(airportCodesPath). \
show(truncate=False)
+-------------------------+
|value |
+-------------------------+
|City State Country IATA |
|Abbotsford BC Canada YXX |
|Aberdeen SD USA ABR |
|Abilene TX USA ABI |
|Akron OH USA CAK |
|Alamosa CO USA ALS |
|Albany GA USA ABY |
|Albany NY USA ALB |
|Albuquerque NM USA ABQ |
|Alexandria LA USA AEX |
|Allentown PA USA ABE |
|Alliance NE USA AIA |
|Alpena MI USA APN |
|Altoona PA USA AOO |
|Amarillo TX USA AMA |
|Anahim Lake BC Canada YAA|
|Anchorage AK USA ANC |
|Appleton WI USA ATW |
|Arviat NWT Canada YEK |
|Asheville NC USA AVL |
+-------------------------+
only showing top 20 rows
Data is tab separated.
There is header for the data set.
Dataset have 4 fields - Country, State, City, IATA
Create DataFrame airport_codes applying appropriate Schema.
airportCodesPath = "/public/airtraffic_all/airport-codes"
airportCodes = spark. \
read. \
option("sep", "\t"). \
option("header", True). \
option("inferSchema", True). \
csv(airportCodesPath)
Preview and Understand the data.
airportCodes.show()
+-----------+-----+-------+----+
| City|State|Country|IATA|
+-----------+-----+-------+----+
| Abbotsford| BC| Canada| YXX|
| Aberdeen| SD| USA| ABR|
| Abilene| TX| USA| ABI|
| Akron| OH| USA| CAK|
| Alamosa| CO| USA| ALS|
| Albany| GA| USA| ABY|
| Albany| NY| USA| ALB|
|Albuquerque| NM| USA| ABQ|
| Alexandria| LA| USA| AEX|
| Allentown| PA| USA| ABE|
| Alliance| NE| USA| AIA|
| Alpena| MI| USA| APN|
| Altoona| PA| USA| AOO|
| Amarillo| TX| USA| AMA|
|Anahim Lake| BC| Canada| YAA|
| Anchorage| AK| USA| ANC|
| Appleton| WI| USA| ATW|
| Arviat| NWT| Canada| YEK|
| Asheville| NC| USA| AVL|
| Aspen| CO| USA| ASE|
+-----------+-----+-------+----+
only showing top 20 rows
Get schema of airport_codes.
airportCodes.printSchema()
root
|-- City: string (nullable = true)
|-- State: string (nullable = true)
|-- Country: string (nullable = true)
|-- IATA: string (nullable = true)
Get the count of records
airportCodes.count()
526
Get the count of unique records and see if it is the same as total count.
airportCodes. \
select("IATA"). \
distinct(). \
count()
524
If they are not equal, analyze the data and identify IATA codes which are repeated more than once.
from pyspark.sql.functions import lit, count
duplicateIATACount = airportCodes. \
groupBy("IATA"). \
agg(count(lit(1)).alias("iata_count")). \
filter("iata_count > 1")
duplicateIATACount.show()
+----+----------+
|IATA|iata_count|
+----+----------+
| Big| 3|
+----+----------+
Filter out the duplicates using the most appropriate one and discard others.
airportCodes. \
filter("IATA = 'Big'"). \
show()
+-----------+------+-------+----+
| City| State|Country|IATA|
+-----------+------+-------+----+
| Hilo| HI| USA| Big|
|Kailua-Kona|Hawaii| USA| Big|
| Kamuela|Hawaii| USA| Big|
+-----------+------+-------+----+
airportCodes. \
filter("!(State = 'Hawaii' AND IATA = 'Big')"). \
show()
+-----------+-----+-------+----+
| City|State|Country|IATA|
+-----------+-----+-------+----+
| Abbotsford| BC| Canada| YXX|
| Aberdeen| SD| USA| ABR|
| Abilene| TX| USA| ABI|
| Akron| OH| USA| CAK|
| Alamosa| CO| USA| ALS|
| Albany| GA| USA| ABY|
| Albany| NY| USA| ALB|
|Albuquerque| NM| USA| ABQ|
| Alexandria| LA| USA| AEX|
| Allentown| PA| USA| ABE|
| Alliance| NE| USA| AIA|
| Alpena| MI| USA| APN|
| Altoona| PA| USA| AOO|
| Amarillo| TX| USA| AMA|
|Anahim Lake| BC| Canada| YAA|
| Anchorage| AK| USA| ANC|
| Appleton| WI| USA| ATW|
| Arviat| NWT| Canada| YEK|
| Asheville| NC| USA| AVL|
| Aspen| CO| USA| ASE|
+-----------+-----+-------+----+
only showing top 20 rows
airportCodes. \
filter("!(State = 'Hawaii' AND IATA = 'Big')"). \
count()
524
Get number of airports (IATA Codes) for each state in the US. Sort the data in descending order by count.
airportCodesPath = "/public/airtraffic_all/airport-codes"
airportCodes = spark. \
read. \
option("sep", "\t"). \
option("header", True). \
option("inferSchema", True). \
csv(airportCodesPath). \
filter("!(State = 'Hawaii' AND IATA = 'Big') AND Country = 'USA'")
airportCodes.count()
443
from pyspark.sql.functions import count, col, lit
airportCountByState = airportCodes. \
groupBy("Country", "State"). \
agg(count(lit(1)).alias("IATACount")). \
orderBy(col("IATACount").desc())
airportCountByState.show(51)
+-------+-----+---------+
|Country|State|IATACount|
+-------+-----+---------+
| USA| CA| 29|
| USA| TX| 26|
| USA| AK| 25|
| USA| NY| 18|
| USA| MI| 18|
| USA| FL| 18|
| USA| MT| 14|
| USA| PA| 13|
| USA| IL| 12|
| USA| CO| 12|
| USA| WY| 10|
| USA| NC| 10|
| USA| WI| 9|
| USA| NE| 9|
| USA| GA| 9|
| USA| NM| 9|
| USA| HI| 9|
| USA| WA| 9|
| USA| KS| 9|
| USA| ND| 8|
| USA| MO| 8|
| USA| AR| 8|
| USA| MA| 8|
| USA| MN| 8|
| USA| AZ| 8|
| USA| WV| 8|
| USA| IA| 8|
| USA| SD| 7|
| USA| ME| 7|
| USA| VA| 7|
| USA| LA| 7|
| USA| MS| 7|
| USA| OR| 7|
| USA| TN| 6|
| USA| AL| 6|
| USA| OH| 6|
| USA| IN| 6|
| USA| ID| 6|
| USA| SC| 6|
| USA| OK| 5|
| USA| KY| 4|
| USA| VT| 3|
| USA| NV| 3|
| USA| NJ| 3|
| USA| NH| 3|
| USA| MD| 3|
| USA| null| 3|
| USA| UT| 2|
| USA| CT| 2|
| USA| DE| 1|
| USA| RI| 1|
+-------+-----+---------+
airportCountByState.count()
51