Analyze Datasets for JoinsΒΆ

Let us analyze both January 2008 airtraffic as well as airport codes data sets that are going to be used for joins.

  • We will use January 2008 airtraffic data which have all relevant flight details such as departure, arrival etc.

  • As part of the analysis, we also might want additional metadata such as city, state etc for airports. We got that information as part of airport codes data set.

  • Let us read and review both January 2008 airtraffic data as well as airport codes data set.

Let us start spark context for this Notebook so that we can execute the code provided. You can sign up for our 10 node state of the art cluster/labs to learn Spark SQL using our unique integrated LMS.

from pyspark.sql import SparkSession

import getpass
username = getpass.getuser()

spark = SparkSession. \
    builder. \
    config('spark.ui.port', '0'). \
    config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
    enableHiveSupport(). \
    appName(f'{username} | Python - Joining Data Sets'). \
    master('yarn'). \
    getOrCreate()

If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.

Using Spark SQL

spark2-sql \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse

Using Scala

spark2-shell \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse

Using Pyspark

pyspark2 \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
spark.conf.set("spark.sql.shuffle.partitions", "2")
airtraffic = spark. \
    read. \
    parquet("/public/airtraffic_all/airtraffic-part/flightmonth=200801")
airtraffic.printSchema()
root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- CRSElapsedTime: integer (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: string (nullable = true)
 |-- TaxiIn: string (nullable = true)
 |-- TaxiOut: string (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: integer (nullable = true)
 |-- CarrierDelay: string (nullable = true)
 |-- WeatherDelay: string (nullable = true)
 |-- NASDelay: string (nullable = true)
 |-- SecurityDelay: string (nullable = true)
 |-- LateAircraftDelay: string (nullable = true)
 |-- IsArrDelayed: string (nullable = true)
 |-- IsDepDelayed: string (nullable = true)
airtraffic.show()
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+------------+------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|IsArrDelayed|IsDepDelayed|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+------------+------------+
|2008|    1|        16|        3|   1725|      1735|   1959|      2021|           OH|     5367| N716CA|              154|           166|    146|     -22|     -10|   BGR| CVG|     906|     1|      7|        0|            null|       0|          NA|          NA|      NA|           NA|               NA|          NO|          NO|
|2008|    1|        17|        4|   1717|      1701|   1915|      1855|           OH|     4977| N967CA|              118|           114|    101|      20|      16|   SYR| CVG|     527|     2|     15|        0|            null|       0|          16|           0|       4|            0|                0|         YES|         YES|
|2008|    1|        17|        4|   1220|      1225|   1440|      1504|           OH|     5352| N709CA|              140|           159|    117|     -24|      -5|   SAV| BOS|     901|     8|     15|        0|            null|       0|          NA|          NA|      NA|           NA|               NA|          NO|          NO|
|2008|    1|        17|        4|   1530|      1530|   1645|      1637|           OH|     5426| N779CA|               75|            67|     45|       8|       0|   CVG| GRR|     268|     5|     25|        0|            null|       0|          NA|          NA|      NA|           NA|               NA|         YES|          NO|
|2008|    1|        17|        4|   1203|      1205|   1429|      1429|           OH|     5441| N809CA|               86|            84|     58|       0|      -2|   STL| CVG|     307|     3|     25|        0|            null|       0|          NA|          NA|      NA|           NA|               NA|          NO|          NO|
|2008|    1|        18|        5|   1150|      1150|   1457|      1524|           OH|     5220| N436CA|              127|           154|    102|     -27|       0|   STL| JFK|     892|     4|     21|        0|            null|       0|          NA|          NA|      NA|           NA|               NA|          NO|          NO|
|2008|    1|        18|        5|   1215|      1009|   1540|      1251|           OH|     5260| N446CA|              145|           102|    140|     169|     126|   MCI| CVG|     539|     2|      3|        0|            null|       0|         126|           0|      43|            0|                0|         YES|         YES|
|2008|    1|        19|        6|    835|       835|   1145|      1130|           OH|     5276| N523CA|              130|           115|     83|      15|       0|   TUL| CVG|     646|     4|     43|        0|            null|       0|           0|           0|      15|            0|                0|         YES|          NO|
|2008|    1|        20|        7|   1925|      1935|   2148|      2124|           OH|     5215| N729CA|              143|           109|     34|      24|     -10|   JFK| PHL|      94|     5|    104|        0|            null|       0|           0|           0|      24|            0|                0|         YES|          NO|
|2008|    1|        20|        7|    825|       830|   1045|      1007|           OH|     5324| N933CA|              140|            97|     92|      38|      -5|   RDU| CVG|     390|     1|     47|        0|            null|       0|           0|           0|      38|            0|                0|         YES|          NO|
|2008|    1|        21|        1|   1638|      1640|   1741|      1754|           OH|     4984| N719CA|               63|            74|     45|     -13|      -2|   CVG| DTW|     229|    10|      8|        0|            null|       0|          NA|          NA|      NA|           NA|               NA|          NO|          NO|
|2008|    1|        21|        1|   1200|      1204|   1600|      1559|           OH|     5056| N371CA|              180|           175|    147|       1|      -4|   MSY| LGA|    1183|    13|     20|        0|            null|       0|          NA|          NA|      NA|           NA|               NA|         YES|          NO|
|2008|    1|        21|        1|   1950|      1935|   2103|      2124|           OH|     5215| N965CA|               73|           109|     29|     -21|      15|   JFK| PHL|      94|     8|     36|        0|            null|       0|          NA|          NA|      NA|           NA|               NA|          NO|         YES|
|2008|    1|        21|        1|   1950|      1830|   2225|      2007|           OH|     5595| N641CA|              155|            97|    136|     138|      80|   DCA| JFK|     213|    10|      9|        0|            null|       0|           0|          80|      58|            0|                0|         YES|         YES|
|2008|    1|        21|        1|    700|       700|    955|       950|           OH|     5610| N964CA|              115|           110|     87|       5|       0|   HSV| DCA|     613|     8|     20|        0|            null|       0|          NA|          NA|      NA|           NA|               NA|         YES|          NO|
|2008|    1|        22|        2|   2020|      1910|   2223|      2125|           OH|     5032| N538CA|               63|            75|     44|      58|      70|   ORD| CVG|     264|     1|     18|        0|            null|       0|           0|          58|       0|            0|                0|         YES|         YES|
|2008|    1|        22|        2|   1320|      1320|   1600|      1528|           OH|     5331| N805CA|              160|           128|    102|      32|       0|   CVG| JFK|     589|    12|     46|        0|            null|       0|           0|           0|      32|            0|                0|         YES|          NO|
|2008|    1|        23|        3|    908|       908|   1216|      1149|           OH|     5033| N963CA|              188|           161|    124|      27|       0|   LGA| SAV|     722|     6|     58|        0|            null|       0|           0|           0|      27|            0|                0|         YES|          NO|
|2008|    1|        23|        3|   1245|      1252|   1409|      1430|           OH|     5050| N427CA|               84|            98|     73|     -21|      -7|   CLT| CVG|     335|     1|     10|        0|            null|       0|          NA|          NA|      NA|           NA|               NA|          NO|          NO|
|2008|    1|        23|        3|    630|       635|    840|       831|           OH|     5355| N926CA|              130|           116|     85|       9|      -5|   GSP| LGA|     610|    20|     25|        0|            null|       0|          NA|          NA|      NA|           NA|               NA|         YES|          NO|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+------------+------------+
only showing top 20 rows
  • We will be using another data set to get details about airports. Details include information such as State, City etc for a given airport code.

  • Let us analyze the Dataset to confirm if there is header and also how the data is structured.

airportCodesPath = "/public/airtraffic_all/airport-codes"
spark. \
    read. \
    text(airportCodesPath). \
    show(truncate=False)
+-------------------------+
|value                    |
+-------------------------+
|City	State	Country	IATA  |
|Abbotsford	BC	Canada	YXX |
|Aberdeen	SD	USA	ABR      |
|Abilene	TX	USA	ABI       |
|Akron	OH	USA	CAK         |
|Alamosa	CO	USA	ALS       |
|Albany	GA	USA	ABY        |
|Albany	NY	USA	ALB        |
|Albuquerque	NM	USA	ABQ   |
|Alexandria	LA	USA	AEX    |
|Allentown	PA	USA	ABE     |
|Alliance	NE	USA	AIA      |
|Alpena	MI	USA	APN        |
|Altoona	PA	USA	AOO       |
|Amarillo	TX	USA	AMA      |
|Anahim Lake	BC	Canada	YAA|
|Anchorage	AK	USA	ANC     |
|Appleton	WI	USA	ATW      |
|Arviat	NWT	Canada	YEK    |
|Asheville	NC	USA	AVL     |
+-------------------------+
only showing top 20 rows
  • Data is tab separated.

  • There is header for the data set.

  • Dataset have 4 fields - Country, State, City, IATA

Create DataFrame airport_codes applying appropriate Schema.

airportCodesPath = "/public/airtraffic_all/airport-codes"
airportCodes = spark. \
    read. \
    option("sep", "\t"). \
    option("header", True). \
    option("inferSchema", True). \
    csv(airportCodesPath)
  • Preview and Understand the data.

airportCodes.show()
+-----------+-----+-------+----+
|       City|State|Country|IATA|
+-----------+-----+-------+----+
| Abbotsford|   BC| Canada| YXX|
|   Aberdeen|   SD|    USA| ABR|
|    Abilene|   TX|    USA| ABI|
|      Akron|   OH|    USA| CAK|
|    Alamosa|   CO|    USA| ALS|
|     Albany|   GA|    USA| ABY|
|     Albany|   NY|    USA| ALB|
|Albuquerque|   NM|    USA| ABQ|
| Alexandria|   LA|    USA| AEX|
|  Allentown|   PA|    USA| ABE|
|   Alliance|   NE|    USA| AIA|
|     Alpena|   MI|    USA| APN|
|    Altoona|   PA|    USA| AOO|
|   Amarillo|   TX|    USA| AMA|
|Anahim Lake|   BC| Canada| YAA|
|  Anchorage|   AK|    USA| ANC|
|   Appleton|   WI|    USA| ATW|
|     Arviat|  NWT| Canada| YEK|
|  Asheville|   NC|    USA| AVL|
|      Aspen|   CO|    USA| ASE|
+-----------+-----+-------+----+
only showing top 20 rows
  • Get schema of airport_codes.

airportCodes.printSchema()
root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- IATA: string (nullable = true)
  • Get the count of records

airportCodes.count()
526
  • Get the count of unique records and see if it is the same as total count.

airportCodes. \
    select("IATA"). \
    distinct(). \
    count()
524
  • If they are not equal, analyze the data and identify IATA codes which are repeated more than once.

from pyspark.sql.functions import lit, count
duplicateIATACount = airportCodes. \
    groupBy("IATA"). \
    agg(count(lit(1)).alias("iata_count")). \
    filter("iata_count > 1")
duplicateIATACount.show()
+----+----------+
|IATA|iata_count|
+----+----------+
| Big|         3|
+----+----------+
  • Filter out the duplicates using the most appropriate one and discard others.

airportCodes. \
    filter("IATA = 'Big'"). \
    show()
+-----------+------+-------+----+
|       City| State|Country|IATA|
+-----------+------+-------+----+
|       Hilo|    HI|    USA| Big|
|Kailua-Kona|Hawaii|    USA| Big|
|    Kamuela|Hawaii|    USA| Big|
+-----------+------+-------+----+
airportCodes. \
    filter("!(State = 'Hawaii' AND IATA = 'Big')"). \
    show()
+-----------+-----+-------+----+
|       City|State|Country|IATA|
+-----------+-----+-------+----+
| Abbotsford|   BC| Canada| YXX|
|   Aberdeen|   SD|    USA| ABR|
|    Abilene|   TX|    USA| ABI|
|      Akron|   OH|    USA| CAK|
|    Alamosa|   CO|    USA| ALS|
|     Albany|   GA|    USA| ABY|
|     Albany|   NY|    USA| ALB|
|Albuquerque|   NM|    USA| ABQ|
| Alexandria|   LA|    USA| AEX|
|  Allentown|   PA|    USA| ABE|
|   Alliance|   NE|    USA| AIA|
|     Alpena|   MI|    USA| APN|
|    Altoona|   PA|    USA| AOO|
|   Amarillo|   TX|    USA| AMA|
|Anahim Lake|   BC| Canada| YAA|
|  Anchorage|   AK|    USA| ANC|
|   Appleton|   WI|    USA| ATW|
|     Arviat|  NWT| Canada| YEK|
|  Asheville|   NC|    USA| AVL|
|      Aspen|   CO|    USA| ASE|
+-----------+-----+-------+----+
only showing top 20 rows
airportCodes. \
    filter("!(State = 'Hawaii' AND IATA = 'Big')"). \
    count()
524
  • Get number of airports (IATA Codes) for each state in the US. Sort the data in descending order by count.

airportCodesPath = "/public/airtraffic_all/airport-codes"
airportCodes = spark. \
    read. \
    option("sep", "\t"). \
    option("header", True). \
    option("inferSchema", True). \
    csv(airportCodesPath). \
    filter("!(State = 'Hawaii' AND IATA = 'Big') AND Country = 'USA'")
airportCodes.count()
443
from pyspark.sql.functions import count, col, lit
airportCountByState = airportCodes. \
    groupBy("Country", "State"). \
    agg(count(lit(1)).alias("IATACount")). \
    orderBy(col("IATACount").desc())
airportCountByState.show(51)
+-------+-----+---------+
|Country|State|IATACount|
+-------+-----+---------+
|    USA|   CA|       29|
|    USA|   TX|       26|
|    USA|   AK|       25|
|    USA|   NY|       18|
|    USA|   MI|       18|
|    USA|   FL|       18|
|    USA|   MT|       14|
|    USA|   PA|       13|
|    USA|   IL|       12|
|    USA|   CO|       12|
|    USA|   WY|       10|
|    USA|   NC|       10|
|    USA|   WI|        9|
|    USA|   NE|        9|
|    USA|   GA|        9|
|    USA|   NM|        9|
|    USA|   HI|        9|
|    USA|   WA|        9|
|    USA|   KS|        9|
|    USA|   ND|        8|
|    USA|   MO|        8|
|    USA|   AR|        8|
|    USA|   MA|        8|
|    USA|   MN|        8|
|    USA|   AZ|        8|
|    USA|   WV|        8|
|    USA|   IA|        8|
|    USA|   SD|        7|
|    USA|   ME|        7|
|    USA|   VA|        7|
|    USA|   LA|        7|
|    USA|   MS|        7|
|    USA|   OR|        7|
|    USA|   TN|        6|
|    USA|   AL|        6|
|    USA|   OH|        6|
|    USA|   IN|        6|
|    USA|   ID|        6|
|    USA|   SC|        6|
|    USA|   OK|        5|
|    USA|   KY|        4|
|    USA|   VT|        3|
|    USA|   NV|        3|
|    USA|   NJ|        3|
|    USA|   NH|        3|
|    USA|   MD|        3|
|    USA| null|        3|
|    USA|   UT|        2|
|    USA|   CT|        2|
|    USA|   DE|        1|
|    USA|   RI|        1|
+-------+-----+---------+
airportCountByState.count()
51