Solutions - Problem 7ΒΆ
Get revenue for each date using orders which are either COMPLETE or CLOSED.
Read data from orders and filter for COMPLETE or CLOSED.
Read data from order_items
Join orders and order_items using order_id
Group data by order_date and get revenue for each day.
Sort the data using order_date.
Let us start spark context for this Notebook so that we can execute the code provided. You can sign up for our 10 node state of the art cluster/labs to learn Spark SQL using our unique integrated LMS.
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession. \
builder. \
config('spark.ui.port', '0'). \
config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
enableHiveSupport(). \
appName(f'{username} | Python - Joining Data Sets'). \
master('yarn'). \
getOrCreate()
If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.
Using Spark SQL
spark2-sql \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
Using Scala
spark2-shell \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
Using Pyspark
pyspark2 \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
spark.conf.set('spark.sql.shuffle.partitions', '2')
orders = spark.read.json('/public/retail_db_json/orders')
orders.count()
68883
orders.show()
+-----------------+--------------------+--------+---------------+
|order_customer_id| order_date|order_id| order_status|
+-----------------+--------------------+--------+---------------+
| 11599|2013-07-25 00:00:...| 1| CLOSED|
| 256|2013-07-25 00:00:...| 2|PENDING_PAYMENT|
| 12111|2013-07-25 00:00:...| 3| COMPLETE|
| 8827|2013-07-25 00:00:...| 4| CLOSED|
| 11318|2013-07-25 00:00:...| 5| COMPLETE|
| 7130|2013-07-25 00:00:...| 6| COMPLETE|
| 4530|2013-07-25 00:00:...| 7| COMPLETE|
| 2911|2013-07-25 00:00:...| 8| PROCESSING|
| 5657|2013-07-25 00:00:...| 9|PENDING_PAYMENT|
| 5648|2013-07-25 00:00:...| 10|PENDING_PAYMENT|
| 918|2013-07-25 00:00:...| 11| PAYMENT_REVIEW|
| 1837|2013-07-25 00:00:...| 12| CLOSED|
| 9149|2013-07-25 00:00:...| 13|PENDING_PAYMENT|
| 9842|2013-07-25 00:00:...| 14| PROCESSING|
| 2568|2013-07-25 00:00:...| 15| COMPLETE|
| 7276|2013-07-25 00:00:...| 16|PENDING_PAYMENT|
| 2667|2013-07-25 00:00:...| 17| COMPLETE|
| 1205|2013-07-25 00:00:...| 18| CLOSED|
| 9488|2013-07-25 00:00:...| 19|PENDING_PAYMENT|
| 9198|2013-07-25 00:00:...| 20| PROCESSING|
+-----------------+--------------------+--------+---------------+
only showing top 20 rows
orders_filtered = orders. \
filter("order_status IN ('COMPLETE', 'CLOSED')")
orders_filtered.count()
30455
order_items = spark.read.json('/public/retail_db_json/order_items')
order_items.count()
172198
order_items.show()
+-------------+-------------------+---------------------+------------------------+-------------------+-------------------+
|order_item_id|order_item_order_id|order_item_product_id|order_item_product_price|order_item_quantity|order_item_subtotal|
+-------------+-------------------+---------------------+------------------------+-------------------+-------------------+
| 1| 1| 957| 299.98| 1| 299.98|
| 2| 2| 1073| 199.99| 1| 199.99|
| 3| 2| 502| 50.0| 5| 250.0|
| 4| 2| 403| 129.99| 1| 129.99|
| 5| 4| 897| 24.99| 2| 49.98|
| 6| 4| 365| 59.99| 5| 299.95|
| 7| 4| 502| 50.0| 3| 150.0|
| 8| 4| 1014| 49.98| 4| 199.92|
| 9| 5| 957| 299.98| 1| 299.98|
| 10| 5| 365| 59.99| 5| 299.95|
| 11| 5| 1014| 49.98| 2| 99.96|
| 12| 5| 957| 299.98| 1| 299.98|
| 13| 5| 403| 129.99| 1| 129.99|
| 14| 7| 1073| 199.99| 1| 199.99|
| 15| 7| 957| 299.98| 1| 299.98|
| 16| 7| 926| 15.99| 5| 79.95|
| 17| 8| 365| 59.99| 3| 179.97|
| 18| 8| 365| 59.99| 5| 299.95|
| 19| 8| 1014| 49.98| 4| 199.92|
| 20| 8| 502| 50.0| 1| 50.0|
+-------------+-------------------+---------------------+------------------------+-------------------+-------------------+
only showing top 20 rows
orders_join = orders_filtered. \
join(order_items, orders_filtered.order_id == order_items.order_item_order_id)
orders_join.count()
75408
from pyspark.sql.functions import sum, round
revenue_daily = orders_join. \
groupBy('order_date'). \
agg(round(sum('order_item_subtotal'), 2).alias('revenue')). \
orderBy('order_date')
revenue_daily.show()
+--------------------+--------+
| order_date| revenue|
+--------------------+--------+
|2013-07-25 00:00:...|31547.23|
|2013-07-26 00:00:...|54713.23|
|2013-07-27 00:00:...|48411.48|
|2013-07-28 00:00:...|35672.03|
|2013-07-29 00:00:...| 54579.7|
|2013-07-30 00:00:...|49329.29|
|2013-07-31 00:00:...|59212.49|
|2013-08-01 00:00:...|49160.08|
|2013-08-02 00:00:...|50688.58|
|2013-08-03 00:00:...|43416.74|
|2013-08-04 00:00:...|35093.01|
|2013-08-05 00:00:...|34025.27|
|2013-08-06 00:00:...|57843.89|
|2013-08-07 00:00:...|45525.59|
|2013-08-08 00:00:...|33549.47|
|2013-08-09 00:00:...|29225.16|
|2013-08-10 00:00:...|46435.04|
|2013-08-11 00:00:...| 31155.5|
|2013-08-12 00:00:...|59014.74|
|2013-08-13 00:00:...|17956.88|
+--------------------+--------+
only showing top 20 rows
revenue_daily.count()
364