## Creating Partitioned Tables

We can also create partitioned tables as part of Spark Metastore Tables.

* There are some challenges in creating partitioned tables directly using `spark.catalog.createTable`.
* But if the directories are similar to partitioned tables with data, we should be able to create partitioned tables.
* Let us create partitioned table for `orders` by `order_month`.

Let us start spark context for this Notebook so that we can execute the code provided. You can sign up for our [10 node state of the art cluster/labs](https://labs.itversity.com/plans) to learn Spark SQL using our unique integrated LMS.

In [1]:
from pyspark.sql import SparkSession

import getpass
username = getpass.getuser()

spark = SparkSession. \
    builder. \
    config('spark.ui.port', '0'). \
    config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
    enableHiveSupport(). \
    appName(f'{username} | Python - Spark Metastore'). \
    master('yarn'). \
    getOrCreate()

If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.

**Using Spark SQL**

```
spark2-sql \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
```

**Using Scala**

```
spark2-shell \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
```

**Using Pyspark**

```
pyspark2 \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
```

In [2]:
spark.conf.set('spark.sql.shuffle.partitions', '2')

### Tasks

Let us perform tasks related to partitioned tables.
* Read data from file into data frame.
* Add additional column which will be used to partition the data.
* Write the data into the target location on which we are going to create the table.
* Create partitioned table using the location to which we have copied the data and validate.
* We can recover partitions by running `MSCK REPAIR TABLE` using `spark.sql` or by invoking `spark.catalog.recoverPartitions`.
* When we use `createTable` to create partitioned table, we have to recover partitions so that partitions are visible.

In [3]:
import getpass
username = getpass.getuser()

In [4]:
spark.sql(f'CREATE DATABASE IF NOT EXISTS {username}_retail')

In [5]:
spark.catalog.setCurrentDatabase(f'{username}_retail')

In [6]:
spark.catalog.currentDatabase()

'itversity_retail'

In [7]:
orders_path = '/public/retail_db/orders'

In [8]:
%%sh

hdfs dfs -ls /public/retail_db/orders

Found 1 items
-rw-r--r--   2 hdfs supergroup    2999944 2021-01-28 09:27 /public/retail_db/orders/part-00000


In [9]:
spark.sql('DROP TABLE IF EXISTS orders_part')

In [10]:
%%sh

hdfs dfs -ls /user/`whoami`/retail_db/orders_part

Found 14 items
-rw-r--r--   3 itversity itversity          0 2021-03-13 15:25 /user/itversity/retail_db/orders_part/_SUCCESS
drwxr-xr-x   - itversity itversity          0 2021-03-13 15:25 /user/itversity/retail_db/orders_part/order_month=201307
drwxr-xr-x   - itversity itversity          0 2021-03-13 15:25 /user/itversity/retail_db/orders_part/order_month=201308
drwxr-xr-x   - itversity itversity          0 2021-03-13 15:25 /user/itversity/retail_db/orders_part/order_month=201309
drwxr-xr-x   - itversity itversity          0 2021-03-13 15:25 /user/itversity/retail_db/orders_part/order_month=201310
drwxr-xr-x   - itversity itversity          0 2021-03-13 15:25 /user/itversity/retail_db/orders_part/order_month=201311
drwxr-xr-x   - itversity itversity          0 2021-03-13 15:25 /user/itversity/retail_db/orders_part/order_month=201312
drwxr-xr-x   - itversity itversity          0 2021-03-13 15:25 /user/itversity/retail_db/orders_part/order_month=201401
drwxr-xr-x   - itversity itversity 

In [11]:
%%sh

hdfs dfs -rm -R -skipTrash /user/`whoami`/retail_db/orders_part

2021-03-13 15:31:04,403 INFO fs.TrashPolicyDefault: Moved: 'hdfs://m01.itversity.com:9000/user/itversity/retail_db/orders_part' to trash at: hdfs://m01.itversity.com:9000/user/itversity/.Trash/Current/user/itversity/retail_db/orders_part1615667464378


In [12]:
from pyspark.sql.functions import date_format

In [13]:
spark. \
    read. \
    csv(orders_path,
        schema='''order_id INT, order_date DATE,
                  order_customer_id INT, order_status STRING
               '''
       ). \
    withColumn('order_month', date_format('order_date', 'yyyyMM')). \
    write. \
    partitionBy('order_month'). \
    parquet(f'/user/{username}/retail_db/orders_part')

In [14]:
%%sh

hdfs dfs -ls /user/`whoami`/retail_db/orders_part

Found 14 items
-rw-r--r--   3 itversity itversity          0 2021-03-13 15:31 /user/itversity/retail_db/orders_part/_SUCCESS
drwxr-xr-x   - itversity itversity          0 2021-03-13 15:31 /user/itversity/retail_db/orders_part/order_month=201307
drwxr-xr-x   - itversity itversity          0 2021-03-13 15:31 /user/itversity/retail_db/orders_part/order_month=201308
drwxr-xr-x   - itversity itversity          0 2021-03-13 15:31 /user/itversity/retail_db/orders_part/order_month=201309
drwxr-xr-x   - itversity itversity          0 2021-03-13 15:31 /user/itversity/retail_db/orders_part/order_month=201310
drwxr-xr-x   - itversity itversity          0 2021-03-13 15:31 /user/itversity/retail_db/orders_part/order_month=201311
drwxr-xr-x   - itversity itversity          0 2021-03-13 15:31 /user/itversity/retail_db/orders_part/order_month=201312
drwxr-xr-x   - itversity itversity          0 2021-03-13 15:31 /user/itversity/retail_db/orders_part/order_month=201401
drwxr-xr-x   - itversity itversity 

In [15]:
%%sh

hdfs dfs -ls -R /user/`whoami`/retail_db/orders_part

-rw-r--r--   3 itversity itversity          0 2021-03-13 15:31 /user/itversity/retail_db/orders_part/_SUCCESS
drwxr-xr-x   - itversity itversity          0 2021-03-13 15:31 /user/itversity/retail_db/orders_part/order_month=201307
-rw-r--r--   3 itversity itversity      14435 2021-03-13 15:31 /user/itversity/retail_db/orders_part/order_month=201307/part-00000-ccc05948-3707-47fe-9855-f041cd350b7e.c000.snappy.parquet
drwxr-xr-x   - itversity itversity          0 2021-03-13 15:31 /user/itversity/retail_db/orders_part/order_month=201308
-rw-r--r--   3 itversity itversity      49997 2021-03-13 15:31 /user/itversity/retail_db/orders_part/order_month=201308/part-00000-ccc05948-3707-47fe-9855-f041cd350b7e.c000.snappy.parquet
drwxr-xr-x   - itversity itversity          0 2021-03-13 15:31 /user/itversity/retail_db/orders_part/order_month=201309
-rw-r--r--   3 itversity itversity      51358 2021-03-13 15:31 /user/itversity/retail_db/orders_part/order_month=201309/part-00000-ccc05948-3707-47fe-9855

In [16]:
spark. \
    read. \
    parquet(f'/user/{username}/retail_db/orders_part/order_month=201308'). \
    show()

+--------+----------+-----------------+---------------+
|order_id|order_date|order_customer_id|   order_status|
+--------+----------+-----------------+---------------+
|    1297|2013-08-01|            11607|       COMPLETE|
|    1298|2013-08-01|             5105|         CLOSED|
|    1299|2013-08-01|             7802|       COMPLETE|
|    1300|2013-08-01|              553|PENDING_PAYMENT|
|    1301|2013-08-01|             1604|PENDING_PAYMENT|
|    1302|2013-08-01|             1695|       COMPLETE|
|    1303|2013-08-01|             7018|     PROCESSING|
|    1304|2013-08-01|             2059|       COMPLETE|
|    1305|2013-08-01|             3844|       COMPLETE|
|    1306|2013-08-01|            11672|PENDING_PAYMENT|
|    1307|2013-08-01|             4474|       COMPLETE|
|    1308|2013-08-01|            11645|        PENDING|
|    1309|2013-08-01|             2367|         CLOSED|
|    1310|2013-08-01|             5602|        PENDING|
|    1311|2013-08-01|             5396|PENDING_P

In [17]:
spark. \
    read. \
    parquet(f'/user/{username}/retail_db/orders_part'). \
    show()

+--------+----------+-----------------+---------------+-----------+
|order_id|order_date|order_customer_id|   order_status|order_month|
+--------+----------+-----------------+---------------+-----------+
|   15488|2013-11-01|             8987|PENDING_PAYMENT|     201311|
|   15489|2013-11-01|             5359|PENDING_PAYMENT|     201311|
|   15490|2013-11-01|            10149|       COMPLETE|     201311|
|   15491|2013-11-01|            10635|        ON_HOLD|     201311|
|   15492|2013-11-01|             7784|PENDING_PAYMENT|     201311|
|   15493|2013-11-01|             1104|        ON_HOLD|     201311|
|   15494|2013-11-01|             7313|     PROCESSING|     201311|
|   15495|2013-11-01|             7067|         CLOSED|     201311|
|   15496|2013-11-01|            12153|PENDING_PAYMENT|     201311|
|   15497|2013-11-01|            11115|PENDING_PAYMENT|     201311|
|   15498|2013-11-01|            11195|       COMPLETE|     201311|
|   15499|2013-11-01|             7113|         

In [18]:
spark. \
    catalog. \
    createTable('orders_part',
                path=f'/user/{username}/retail_db/orders_part',
                source='parquet'
               )

order_id,order_date,order_customer_id,order_status,order_month


In [21]:
spark.read.table('orders_part').show()

+--------+----------+-----------------+------------+-----------+
|order_id|order_date|order_customer_id|order_status|order_month|
+--------+----------+-----------------+------------+-----------+
+--------+----------+-----------------+------------+-----------+



In [22]:
spark.sql('SHOW PARTITIONS orders_part').show()

+---------+
|partition|
+---------+
+---------+



In [23]:
spark.catalog.recoverPartitions('orders_part')

In [24]:
spark.sql('SHOW PARTITIONS orders_part').show()

+------------------+
|         partition|
+------------------+
|order_month=201307|
|order_month=201308|
|order_month=201309|
|order_month=201310|
|order_month=201311|
|order_month=201312|
|order_month=201401|
|order_month=201402|
|order_month=201403|
|order_month=201404|
|order_month=201405|
|order_month=201406|
|order_month=201407|
+------------------+



In [25]:
spark.read.table('orders_part').show()

+--------+----------+-----------------+---------------+-----------+
|order_id|order_date|order_customer_id|   order_status|order_month|
+--------+----------+-----------------+---------------+-----------+
|   15488|2013-11-01|             8987|PENDING_PAYMENT|     201311|
|   15489|2013-11-01|             5359|PENDING_PAYMENT|     201311|
|   15490|2013-11-01|            10149|       COMPLETE|     201311|
|   15491|2013-11-01|            10635|        ON_HOLD|     201311|
|   15492|2013-11-01|             7784|PENDING_PAYMENT|     201311|
|   15493|2013-11-01|             1104|        ON_HOLD|     201311|
|   15494|2013-11-01|             7313|     PROCESSING|     201311|
|   15495|2013-11-01|             7067|         CLOSED|     201311|
|   15496|2013-11-01|            12153|PENDING_PAYMENT|     201311|
|   15497|2013-11-01|            11115|PENDING_PAYMENT|     201311|
|   15498|2013-11-01|            11195|       COMPLETE|     201311|
|   15499|2013-11-01|             7113|         

In [26]:
spark.sql('SELECT order_month, count(1) FROM orders_part GROUP BY order_month').show()

+-----------+--------+
|order_month|count(1)|
+-----------+--------+
|     201405|    5467|
|     201311|    6381|
|     201401|    5908|
|     201309|    5841|
|     201308|    5680|
|     201404|    5657|
|     201402|    5635|
|     201310|    5335|
|     201406|    5308|
|     201407|    4468|
|     201307|    1533|
|     201312|    5892|
|     201403|    5778|
+-----------+--------+



In [27]:
spark.read.table('orders_part'). \
    groupBy('order_month'). \
    count(). \
    show()

+-----------+-----+
|order_month|count|
+-----------+-----+
|     201311| 6381|
|     201401| 5908|
|     201309| 5841|
|     201308| 5680|
|     201404| 5657|
|     201405| 5467|
|     201312| 5892|
|     201403| 5778|
|     201402| 5635|
|     201310| 5335|
|     201406| 5308|
|     201407| 4468|
|     201307| 1533|
+-----------+-----+

