Reorganizing airlines dataΒΆ

Let us reorganize our airlines data to fewer files where data is compressed and also partitioned by Month.

  • We have ~1920 files of ~64MB Size.

  • Data is in the range of 1987 October and 2008 December (255 months)

  • By default it uses ~1920 threads to process the data and it might end up with too many small files. We can avoid that by using repartition and then partition by the month.

  • Here are the steps we are going to follow to partition by flight month and save the data to /user/[YOUR_USER_NAME]/airlines.

    • Read one file first and get the schema.

    • Read the entire data by applying the schema from the previous step.

    • Add additional column flightmonth using withColumn by using lpad on month column and concat functions. We need to do this as the month in our data set is of type integer and we want to pad with 0 for months till september to format it into YYYYMM.

    • Repartition the data into 255 based on the number of months using flightmonth

    • Partition the data by partitionBy while writing the data to the target location.

    • We will use parquet file format which will automatically compresses data using Snappy algorithm.

This process will take time, once it is done we will review the target location to which data is copied by partitioning using month

Let us start spark context for this Notebook so that we can execute the code provided. You can sign up for our 10 node state of the art cluster/labs to learn Spark SQL using our unique integrated LMS.

from pyspark.sql import SparkSession

import getpass
username = getpass.getuser()

spark = SparkSession. \
    builder. \
    config('spark.ui.port', '0'). \
    config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
    enableHiveSupport(). \
    appName(f'{username} | Python - Data Processing - Overview'). \
    master('yarn'). \
    getOrCreate()

If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.

Using Spark SQL

spark2-sql \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse

Using Scala

spark2-shell \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse

Using Pyspark

pyspark2 \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
from pyspark.sql import SparkSession

# spark.stop()
spark = SparkSession. \
    builder. \
    config('spark.dynamicAllocation.enabled', 'false'). \
    config('spark.executor.instances', 40). \
    appName('Data Processing - Overview'). \
    master('yarn'). \
    getOrCreate()
spark
from pyspark.sql.functions import concat, lpad

airlines_schema = spark.read. \
    csv('/public/airlines_all/airlines/part-00000',
        header=True,
        inferSchema=True
       ). \
    schema
airlines = spark.read. \
    schema(airlines_schema). \
    csv('/public/airlines_all/airlines/part*',
        header=True
       )
airlines.printSchema()
airlines.show()
airlines.count()
airlines.distinct().count()
help(airlines.write.parquet)
spark.conf.set("spark.sql.shuffle.partitions", "255")
airlines. \
    distinct(). \
    withColumn('flightmonth', concat('year', lpad('month', 2, '0'))). \
    repartition(255, 'flightmonth'). \
    write. \
    mode('overwrite'). \
    partitionBy('flightmonth'). \
    format('parquet'). \
    save(f'/user/{username}/airlines-part')