Overview of Spark Write APIs

Let us understand how we can write Data Frames to different file formats.

  • All the batch write APIs are grouped under write which is exposed to Data Frame objects.

  • All APIs are exposed under spark.read

    • text - to write single column data to text files.

    • csv - to write to text files with delimiters. Default is a comma, but we can use other delimiters as well.

    • json - to write data to JSON files

    • orc - to write data to ORC files

    • parquet - to write data to Parquet files.

  • We can also write data to other file formats by plugging in and by using write.format, for example avro

  • We can use options based on the type using which we are writing the Data Frame to.

    • compression - Compression codec (gzip, snappy etc)

    • sep - to specify delimiters while writing into text files using csv

  • We can overwrite the directories or append to existing directories using mode

  • Create copy of orders data in parquet file format with no compression. If the folder already exists overwrite it. Target Location: /user/[YOUR_USER_NAME]/retail_db/orders

  • When you pass options, if there are typos then options will be ignored rather than failing. Be careful and make sure that output is validated.

  • By default the number of files in the output directory is equal to number of tasks that are used to process the data in the last stage. However, we might want to control number of files so that we don’t run into too many small files issue.

  • We can control number of files by using coalesce. It has to be invoked on top of Data Frame before invoking write.

Let us start spark context for this Notebook so that we can execute the code provided. You can sign up for our 10 node state of the art cluster/labs to learn Spark SQL using our unique integrated LMS.

from pyspark.sql import SparkSession

import getpass
username = getpass.getuser()

spark = SparkSession. \
    builder. \
    config('spark.ui.port', '0'). \
    config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
    enableHiveSupport(). \
    appName(f'{username} | Python - Data Processing - Overview'). \
    master('yarn'). \
    getOrCreate()

If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.

Using Spark SQL

spark2-sql \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse

Using Scala

spark2-shell \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse

Using Pyspark

pyspark2 \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
%%sh

hdfs dfs -rm -R -skipTrash /user/${USER}/retail_db
orders = spark. \
    read. \
    csv('/public/retail_db/orders',
        schema='''
            order_id INT, 
            order_date STRING, 
            order_customer_id INT, 
            order_status STRING
        '''
       )
orders.printSchema()
orders.show()
orders.count()
orders. \
    write. \
    parquet(f'/user/{username}/retail_db/orders', 
            mode='overwrite', 
            compression='none'
           )
%%sh

hdfs dfs -ls /user/${USER}/retail_db/orders

# File extension should not contain compression algorithms such as snappy.
# Alternative approach - using option
orders. \
    write. \
    mode('overwrite'). \
    option('compression', 'none'). \
    parquet(f'/user/{username}/retail_db/orders')
%%sh

hdfs dfs -ls /user/${USER}/retail_db/orders

# File extension should not contain compression algorithms such as snappy.
# Alternative approach - using format
orders. \
    write. \
    mode('overwrite'). \
    option('compression', 'none'). \
    format('parquet'). \
    save(f'/user/{username}/retail_db/orders')
%%sh

hdfs dfs -ls /user/${USER}/retail_db/orders

# File extension should not contain compression algorithms such as snappy.
  • Read order_items data from /public/retail_db_json/order_items and write it to pipe delimited files with gzip compression. Target Location: /user/[YOUR_USER_NAME]/retail_db/order_items. Make sure to validate.

  • Ignore the error if the target location already exists. Also make sure to write into only one file. We can use coalesce for it.

coalesce will be covered in detail at a later point in time

order_items = spark. \
    read. \
    json('/public/retail_db_json/order_items')
order_items.show()
order_items.printSchema()
order_items.count()
# Using format
order_items. \
    coalesce(1). \
    write. \
    mode('ignore'). \
    option('compression', 'gzip'). \
    option('sep', '|'). \
    format('csv'). \
    save(f'/user/{username}/retail_db/order_items')
%%sh

hdfs dfs -ls /user/${USER}/retail_db/order_items
# Alternative approach - using keyword arguments
order_items. \
    coalesce(1). \
    write. \
    csv(f'/user/{username}/retail_db/order_items',
        sep='|',
        mode='overwrite',
        compression='gzip'
       )
%%sh

hdfs dfs -ls /user/${USER}/retail_db/order_items