## Solutions - Problem 8

Get number of orders and revenue for each year, month and date using orders which are either COMPLETE or CLOSED. Make sure that daily metrics are rolled up at month and year as well.
* Read data from orders and filter for COMPLETE or CLOSED.
* Read data from order_items
* Join orders and order_items using order_id
* Group data by order_date and get number of orders as well as revenue for each day using rollup.
* Sort the data using order_date.

Let us start spark context for this Notebook so that we can execute the code provided. You can sign up for our [10 node state of the art cluster/labs](https://labs.itversity.com/plans) to learn Spark SQL using our unique integrated LMS.

In [1]:
from pyspark.sql import SparkSession

import getpass
username = getpass.getuser()

spark = SparkSession. \
    builder. \
    config('spark.ui.port', '0'). \
    config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
    enableHiveSupport(). \
    appName(f'{username} | Python - Joining Data Sets'). \
    master('yarn'). \
    getOrCreate()

If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.

**Using Spark SQL**

```
spark2-sql \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
```

**Using Scala**

```
spark2-shell \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
```

**Using Pyspark**

```
pyspark2 \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
```

In [2]:
spark.conf.set('spark.sql.shuffle.partitions', '2')

In [3]:
orders = spark.read.json('/public/retail_db_json/orders')

In [4]:
orders.count()

68883

In [5]:
orders_filtered = orders. \
    filter("order_status IN ('COMPLETE', 'CLOSED')")

In [6]:
orders_filtered.count()

30455

In [7]:
order_items = spark.read.json('/public/retail_db_json/order_items')

In [8]:
orders_join = orders_filtered. \
    join(order_items, orders_filtered.order_id == order_items.order_item_order_id)

In [9]:
from pyspark.sql.functions import col, countDistinct, sum, round, date_format, year

In [15]:
revenue = orders_join. \
    rollup(
        year('order_date').alias('order_year'),
        date_format(col('order_date'), 'yyyyMM').alias('order_month'),
        'order_date'
    ). \
    agg(
        countDistinct('order_id').alias('order_count'),
        round(sum('order_item_subtotal'), 2).alias('revenue')
    ). \
    orderBy('order_year', 'order_month', 'order_date')

In [16]:
revenue.show(50, truncate=False)

+----------+-----------+---------------------+-----------+-------------+
|order_year|order_month|order_date           |order_count|revenue      |
+----------+-----------+---------------------+-----------+-------------+
|null      |null       |null                 |25266      |1.501298248E7|
|2013      |null       |null                 |11266      |6686892.0    |
|2013      |201307     |null                 |564        |333465.45    |
|2013      |201307     |2013-07-25 00:00:00.0|51         |31547.23     |
|2013      |201307     |2013-07-26 00:00:00.0|99         |54713.23     |
|2013      |201307     |2013-07-27 00:00:00.0|80         |48411.48     |
|2013      |201307     |2013-07-28 00:00:00.0|67         |35672.03     |
|2013      |201307     |2013-07-29 00:00:00.0|90         |54579.7      |
|2013      |201307     |2013-07-30 00:00:00.0|90         |49329.29     |
|2013      |201307     |2013-07-31 00:00:00.0|87         |59212.49     |
|2013      |201308     |null                 |2073 

In [17]:
revenue = orders_join. \
    rollup(
        year('order_date').alias('order_year'),
        date_format(col('order_date'), 'yyyyMM').alias('order_month'),
        'order_date'
    ). \
    agg(
        countDistinct('order_id').alias('order_count'),
        round(sum('order_item_subtotal'), 2).alias('revenue')
    ). \
    orderBy(
        col('order_year').asc_nulls_last(), 
        col('order_month').asc_nulls_last(), 
        col('order_date').asc_nulls_last()
    )

In [18]:
revenue.show(50, truncate=False)

+----------+-----------+---------------------+-----------+---------+
|order_year|order_month|order_date           |order_count|revenue  |
+----------+-----------+---------------------+-----------+---------+
|2013      |201307     |2013-07-25 00:00:00.0|51         |31547.23 |
|2013      |201307     |2013-07-26 00:00:00.0|99         |54713.23 |
|2013      |201307     |2013-07-27 00:00:00.0|80         |48411.48 |
|2013      |201307     |2013-07-28 00:00:00.0|67         |35672.03 |
|2013      |201307     |2013-07-29 00:00:00.0|90         |54579.7  |
|2013      |201307     |2013-07-30 00:00:00.0|90         |49329.29 |
|2013      |201307     |2013-07-31 00:00:00.0|87         |59212.49 |
|2013      |201307     |null                 |564        |333465.45|
|2013      |201308     |2013-08-01 00:00:00.0|82         |49160.08 |
|2013      |201308     |2013-08-02 00:00:00.0|90         |50688.58 |
|2013      |201308     |2013-08-03 00:00:00.0|72         |43416.74 |
|2013      |201308     |2013-08-04

In [13]:
revenue.count()

380