Overview of Spark Write APIs¶
Let us understand how we can write Data Frames to different file formats.
All the batch write APIs are grouped under write which is exposed to Data Frame objects.
All APIs are exposed under spark.read
text
- to write single column data to text files.csv
- to write to text files with delimiters. Default is a comma, but we can use other delimiters as well.json
- to write data to JSON filesorc
- to write data to ORC filesparquet
- to write data to Parquet files.
We can also write data to other file formats by plugging in and by using
write.format
, for example avroWe can use options based on the type using which we are writing the Data Frame to.
compression
- Compression codec (gzip
,snappy
etc)sep
- to specify delimiters while writing into text files using csv
We can
overwrite
the directories orappend
to existing directories usingmode
Create copy of orders data in parquet file format with no compression. If the folder already exists overwrite it. Target Location: /user/[YOUR_USER_NAME]/retail_db/orders
When you pass options, if there are typos then options will be ignored rather than failing. Be careful and make sure that output is validated.
By default the number of files in the output directory is equal to number of tasks that are used to process the data in the last stage. However, we might want to control number of files so that we don’t run into too many small files issue.
We can control number of files by using
coalesce
. It has to be invoked on top of Data Frame before invokingwrite
.
Let us start spark context for this Notebook so that we can execute the code provided. You can sign up for our 10 node state of the art cluster/labs to learn Spark SQL using our unique integrated LMS.
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession. \
builder. \
config('spark.ui.port', '0'). \
config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
enableHiveSupport(). \
appName(f'{username} | Python - Data Processing - Overview'). \
master('yarn'). \
getOrCreate()
If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.
Using Spark SQL
spark2-sql \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
Using Scala
spark2-shell \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
Using Pyspark
pyspark2 \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
%%sh
hdfs dfs -rm -R -skipTrash /user/${USER}/retail_db
orders = spark. \
read. \
csv('/public/retail_db/orders',
schema='''
order_id INT,
order_date STRING,
order_customer_id INT,
order_status STRING
'''
)
orders.printSchema()
orders.show()
orders.count()
orders. \
write. \
parquet(f'/user/{username}/retail_db/orders',
mode='overwrite',
compression='none'
)
%%sh
hdfs dfs -ls /user/${USER}/retail_db/orders
# File extension should not contain compression algorithms such as snappy.
# Alternative approach - using option
orders. \
write. \
mode('overwrite'). \
option('compression', 'none'). \
parquet(f'/user/{username}/retail_db/orders')
%%sh
hdfs dfs -ls /user/${USER}/retail_db/orders
# File extension should not contain compression algorithms such as snappy.
# Alternative approach - using format
orders. \
write. \
mode('overwrite'). \
option('compression', 'none'). \
format('parquet'). \
save(f'/user/{username}/retail_db/orders')
%%sh
hdfs dfs -ls /user/${USER}/retail_db/orders
# File extension should not contain compression algorithms such as snappy.
Read order_items data from /public/retail_db_json/order_items and write it to pipe delimited files with gzip compression. Target Location: /user/[YOUR_USER_NAME]/retail_db/order_items. Make sure to validate.
Ignore the error if the target location already exists. Also make sure to write into only one file. We can use
coalesce
for it.
coalesce
will be covered in detail at a later point in time
order_items = spark. \
read. \
json('/public/retail_db_json/order_items')
order_items.show()
order_items.printSchema()
order_items.count()
# Using format
order_items. \
coalesce(1). \
write. \
mode('ignore'). \
option('compression', 'gzip'). \
option('sep', '|'). \
format('csv'). \
save(f'/user/{username}/retail_db/order_items')
%%sh
hdfs dfs -ls /user/${USER}/retail_db/order_items
# Alternative approach - using keyword arguments
order_items. \
coalesce(1). \
write. \
csv(f'/user/{username}/retail_db/order_items',
sep='|',
mode='overwrite',
compression='gzip'
)
%%sh
hdfs dfs -ls /user/${USER}/retail_db/order_items