{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "## Using Spark SQL\n", "\n", "Let us understand how we can use Spark SQL to process data in Metastore Tables and Temporary Views.\n", "\n", "* Once tables in metastore or temporary views are created, we can run queries against the tables or temporary views to perform all standard transformations.\n", "* We will create metastore tables for orders and order_items data sets. We will also create temporary view for products data set.\n", "* We will create metastore tables using `spark.sql` by passing `CREATE TABLE` statements as strings.\n", "* Using Spark SQL, we will join metastore tables and temporary view in the same query." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let us start spark context for this Notebook so that we can execute the code provided. You can sign up for our [10 node state of the art cluster/labs](https://labs.itversity.com/plans) to learn Spark SQL using our unique integrated LMS." ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "from pyspark.sql import SparkSession\n", "\n", "import getpass\n", "username = getpass.getuser()\n", "\n", "spark = SparkSession. \\\n", " builder. \\\n", " config('spark.ui.port', '0'). \\\n", " config(\"spark.sql.warehouse.dir\", f\"/user/{username}/warehouse\"). \\\n", " enableHiveSupport(). \\\n", " appName(f'{username} | Python - Spark Metastore'). \\\n", " master('yarn'). \\\n", " getOrCreate()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.\n", "\n", "**Using Spark SQL**\n", "\n", "```\n", "spark2-sql \\\n", " --master yarn \\\n", " --conf spark.ui.port=0 \\\n", " --conf spark.sql.warehouse.dir=/user/${USER}/warehouse\n", "```\n", "\n", "**Using Scala**\n", "\n", "```\n", "spark2-shell \\\n", " --master yarn \\\n", " --conf spark.ui.port=0 \\\n", " --conf spark.sql.warehouse.dir=/user/${USER}/warehouse\n", "```\n", "\n", "**Using Pyspark**\n", "\n", "```\n", "pyspark2 \\\n", " --master yarn \\\n", " --conf spark.ui.port=0 \\\n", " --conf spark.sql.warehouse.dir=/user/${USER}/warehouse\n", "```" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "spark.conf.set('spark.sql.shuffle.partitions', '2')" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "import getpass\n", "username = getpass.getuser()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* Drop retail database if it exists using cascade and recreate the database." ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", "\n", "
\n" ], "text/plain": [ "++\n", "||\n", "++\n", "++" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "spark.sql(f'DROP DATABASE IF EXISTS {username}_retail CASCADE')" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", "\n", "
\n" ], "text/plain": [ "++\n", "||\n", "++\n", "++" ] }, "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ "spark.sql(f'CREATE DATABASE {username}_retail')" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", "\n", "
\n" ], "text/plain": [ "++\n", "||\n", "++\n", "++" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "spark.sql(f'USE {username}_retail')" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+------------------+\n", "|current_database()|\n", "+------------------+\n", "| itversity_retail|\n", "+------------------+\n", "\n" ] } ], "source": [ "spark.sql('SELECT current_database()').show()" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------+---------+-----------+\n", "|database|tableName|isTemporary|\n", "+--------+---------+-----------+\n", "+--------+---------+-----------+\n", "\n" ] } ], "source": [ "spark.sql('SHOW tables').show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* Create table for orders and load data into the table." ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", "\n", "
\n" ], "text/plain": [ "++\n", "||\n", "++\n", "++" ] }, "execution_count": 9, "metadata": {}, "output_type": "execute_result" } ], "source": [ "spark.sql(\"\"\"\n", " CREATE TABLE orders (\n", " order_id INT,\n", " order_date STRING,\n", " order_customer_id INT,\n", " order_status STRING\n", " ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','\n", "\"\"\")" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", "\n", "
\n" ], "text/plain": [ "++\n", "||\n", "++\n", "++" ] }, "execution_count": 10, "metadata": {}, "output_type": "execute_result" } ], "source": [ "spark.sql(\"\"\"\n", " LOAD DATA LOCAL INPATH '/data/retail_db/orders' INTO TABLE orders\n", "\"\"\")" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------+--------------------+-----------------+---------------+\n", "|order_id| order_date|order_customer_id| order_status|\n", "+--------+--------------------+-----------------+---------------+\n", "| 1|2013-07-25 00:00:...| 11599| CLOSED|\n", "| 2|2013-07-25 00:00:...| 256|PENDING_PAYMENT|\n", "| 3|2013-07-25 00:00:...| 12111| COMPLETE|\n", "| 4|2013-07-25 00:00:...| 8827| CLOSED|\n", "| 5|2013-07-25 00:00:...| 11318| COMPLETE|\n", "| 6|2013-07-25 00:00:...| 7130| COMPLETE|\n", "| 7|2013-07-25 00:00:...| 4530| COMPLETE|\n", "| 8|2013-07-25 00:00:...| 2911| PROCESSING|\n", "| 9|2013-07-25 00:00:...| 5657|PENDING_PAYMENT|\n", "| 10|2013-07-25 00:00:...| 5648|PENDING_PAYMENT|\n", "| 11|2013-07-25 00:00:...| 918| PAYMENT_REVIEW|\n", "| 12|2013-07-25 00:00:...| 1837| CLOSED|\n", "| 13|2013-07-25 00:00:...| 9149|PENDING_PAYMENT|\n", "| 14|2013-07-25 00:00:...| 9842| PROCESSING|\n", "| 15|2013-07-25 00:00:...| 2568| COMPLETE|\n", "| 16|2013-07-25 00:00:...| 7276|PENDING_PAYMENT|\n", "| 17|2013-07-25 00:00:...| 2667| COMPLETE|\n", "| 18|2013-07-25 00:00:...| 1205| CLOSED|\n", "| 19|2013-07-25 00:00:...| 9488|PENDING_PAYMENT|\n", "| 20|2013-07-25 00:00:...| 9198| PROCESSING|\n", "+--------+--------------------+-----------------+---------------+\n", "only showing top 20 rows\n", "\n" ] } ], "source": [ "spark.sql(\"SELECT * FROM orders\").show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* Create table for order_items and load data into the table." ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", "\n", "
\n" ], "text/plain": [ "++\n", "||\n", "++\n", "++" ] }, "execution_count": 12, "metadata": {}, "output_type": "execute_result" } ], "source": [ "spark.sql(\"\"\"\n", " CREATE TABLE order_items (\n", " order_item_id INT,\n", " order_item_order_id INT,\n", " order_item_product_id INT,\n", " order_item_quantity INT,\n", " order_item_subtotal FLOAT,\n", " order_item_product_price FLOAT\n", " ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','\n", "\"\"\")" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", "\n", "
\n" ], "text/plain": [ "++\n", "||\n", "++\n", "++" ] }, "execution_count": 13, "metadata": {}, "output_type": "execute_result" } ], "source": [ "spark.sql(\"\"\"\n", " LOAD DATA LOCAL INPATH '/data/retail_db/order_items' INTO TABLE order_items\n", "\"\"\")" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------------+-------------------+---------------------+-------------------+-------------------+------------------------+\n", "|order_item_id|order_item_order_id|order_item_product_id|order_item_quantity|order_item_subtotal|order_item_product_price|\n", "+-------------+-------------------+---------------------+-------------------+-------------------+------------------------+\n", "| 1| 1| 957| 1| 299.98| 299.98|\n", "| 2| 2| 1073| 1| 199.99| 199.99|\n", "| 3| 2| 502| 5| 250.0| 50.0|\n", "| 4| 2| 403| 1| 129.99| 129.99|\n", "| 5| 4| 897| 2| 49.98| 24.99|\n", "| 6| 4| 365| 5| 299.95| 59.99|\n", "| 7| 4| 502| 3| 150.0| 50.0|\n", "| 8| 4| 1014| 4| 199.92| 49.98|\n", "| 9| 5| 957| 1| 299.98| 299.98|\n", "| 10| 5| 365| 5| 299.95| 59.99|\n", "| 11| 5| 1014| 2| 99.96| 49.98|\n", "| 12| 5| 957| 1| 299.98| 299.98|\n", "| 13| 5| 403| 1| 129.99| 129.99|\n", "| 14| 7| 1073| 1| 199.99| 199.99|\n", "| 15| 7| 957| 1| 299.98| 299.98|\n", "| 16| 7| 926| 5| 79.95| 15.99|\n", "| 17| 8| 365| 3| 179.97| 59.99|\n", "| 18| 8| 365| 5| 299.95| 59.99|\n", "| 19| 8| 1014| 4| 199.92| 49.98|\n", "| 20| 8| 502| 1| 50.0| 50.0|\n", "+-------------+-------------------+---------------------+-------------------+-------------------+------------------------+\n", "only showing top 20 rows\n", "\n" ] } ], "source": [ "spark.sql(\"SELECT * FROM order_items\").show()" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----------------+-----------+-----------+\n", "| database| tableName|isTemporary|\n", "+----------------+-----------+-----------+\n", "|itversity_retail|order_items| false|\n", "|itversity_retail| orders| false|\n", "+----------------+-----------+-----------+\n", "\n" ] } ], "source": [ "spark.sql(\"SHOW tables\").show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* Create Dataframe for products data using json files under **/public/retail_db_json/products**" ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [], "source": [ "products = spark. \\\n", " read. \\\n", " json('/public/retail_db_json/products')" ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- product_category_id: long (nullable = true)\n", " |-- product_description: string (nullable = true)\n", " |-- product_id: long (nullable = true)\n", " |-- product_image: string (nullable = true)\n", " |-- product_name: string (nullable = true)\n", " |-- product_price: double (nullable = true)\n", "\n" ] } ], "source": [ "products.printSchema()" ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------------------+-------------------+----------+--------------------+--------------------+-------------+\n", "|product_category_id|product_description|product_id| product_image| product_name|product_price|\n", "+-------------------+-------------------+----------+--------------------+--------------------+-------------+\n", "| 2| | 1|http://images.acm...|Quest Q64 10 FT. ...| 59.98|\n", "| 2| | 2|http://images.acm...|Under Armour Men'...| 129.99|\n", "| 2| | 3|http://images.acm...|Under Armour Men'...| 89.99|\n", "| 2| | 4|http://images.acm...|Under Armour Men'...| 89.99|\n", "| 2| | 5|http://images.acm...|Riddell Youth Rev...| 199.99|\n", "| 2| | 6|http://images.acm...|Jordan Men's VI R...| 134.99|\n", "| 2| | 7|http://images.acm...|Schutt Youth Recr...| 99.99|\n", "| 2| | 8|http://images.acm...|Nike Men's Vapor ...| 129.99|\n", "| 2| | 9|http://images.acm...|Nike Adult Vapor ...| 50.0|\n", "| 2| | 10|http://images.acm...|Under Armour Men'...| 129.99|\n", "| 2| | 11|http://images.acm...|Fitness Gear 300 ...| 209.99|\n", "| 2| | 12|http://images.acm...|Under Armour Men'...| 139.99|\n", "| 2| | 13|http://images.acm...|Under Armour Men'...| 89.99|\n", "| 2| | 14|http://images.acm...|Quik Shade Summit...| 199.99|\n", "| 2| | 15|http://images.acm...|Under Armour Kids...| 59.99|\n", "| 2| | 16|http://images.acm...|Riddell Youth 360...| 299.99|\n", "| 2| | 17|http://images.acm...|Under Armour Men'...| 129.99|\n", "| 2| | 18|http://images.acm...|Reebok Men's Full...| 29.97|\n", "| 2| | 19|http://images.acm...|Nike Men's Finger...| 124.99|\n", "| 2| | 20|http://images.acm...|Under Armour Men'...| 129.99|\n", "+-------------------+-------------------+----------+--------------------+--------------------+-------------+\n", "only showing top 20 rows\n", "\n" ] } ], "source": [ "products.show()" ] }, { "cell_type": "code", "execution_count": 24, "metadata": {}, "outputs": [], "source": [ "products.createOrReplaceTempView('products_v')" ] }, { "cell_type": "code", "execution_count": 25, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----------------+-----------+-----------+\n", "| database| tableName|isTemporary|\n", "+----------------+-----------+-----------+\n", "|itversity_retail|order_items| false|\n", "|itversity_retail| orders| false|\n", "| | products_v| true|\n", "+----------------+-----------+-----------+\n", "\n" ] } ], "source": [ "spark.sql(\"SHOW tables\").show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* Join all the 3 tables to get daily product revenue using orders.order_date, products.product_id, products.product_name and order_items.order_item_subtotal. Also, consider only orders whose status is either COMPLETE or CLOSED." ] }, { "cell_type": "code", "execution_count": 26, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------------------+----------+--------------------+-------+\n", "| order_date|product_id| product_name|revenue|\n", "+--------------------+----------+--------------------+-------+\n", "|2013-07-25 00:00:...| 1004|Field & Stream Sp...|5599.72|\n", "|2013-07-25 00:00:...| 191|Nike Men's Free 5...|5099.49|\n", "|2013-07-25 00:00:...| 957|Diamondback Women...| 4499.7|\n", "|2013-07-25 00:00:...| 365|Perfect Fitness P...|3359.44|\n", "|2013-07-25 00:00:...| 1073|Pelican Sunstream...|2999.85|\n", "|2013-07-25 00:00:...| 1014|O'Brien Men's Neo...|2798.88|\n", "|2013-07-25 00:00:...| 403|Nike Men's CJ Eli...|1949.85|\n", "|2013-07-25 00:00:...| 502|Nike Men's Dri-FI...| 1650.0|\n", "|2013-07-25 00:00:...| 627|Under Armour Girl...|1079.73|\n", "|2013-07-25 00:00:...| 226|Bowflex SelectTec...| 599.99|\n", "|2013-07-25 00:00:...| 24|Elevation Trainin...| 319.96|\n", "|2013-07-25 00:00:...| 821|Titleist Pro V1 H...| 207.96|\n", "|2013-07-25 00:00:...| 625|Nike Men's Kobe I...| 199.99|\n", "|2013-07-25 00:00:...| 705|Cleveland Golf Wo...| 119.99|\n", "|2013-07-25 00:00:...| 572|TYR Boys' Team Di...| 119.97|\n", "|2013-07-25 00:00:...| 666|Merrell Men's All...| 109.99|\n", "|2013-07-25 00:00:...| 725|LIJA Women's Butt...| 108.0|\n", "|2013-07-25 00:00:...| 134|Nike Women's Lege...| 100.0|\n", "|2013-07-25 00:00:...| 906|Team Golf Tenness...| 99.96|\n", "|2013-07-25 00:00:...| 828|Bridgestone e6 St...| 95.97|\n", "+--------------------+----------+--------------------+-------+\n", "only showing top 20 rows\n", "\n" ] } ], "source": [ "spark.sql(\"\"\"\n", " SELECT o.order_date,\n", " p.product_id,\n", " p.product_name,\n", " round(sum(oi.order_item_subtotal), 2) AS revenue\n", " FROM orders AS o JOIN order_items AS oi\n", " ON o.order_id = oi.order_item_order_id\n", " JOIN products_v AS p\n", " ON p.product_id = oi.order_item_product_id\n", " WHERE o.order_status IN ('COMPLETE', 'CLOSED')\n", " GROUP BY o.order_date,\n", " p.product_id,\n", " p.product_name\n", " ORDER BY o.order_date, revenue DESC\n", "\"\"\"). \\\n", " show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Pyspark 2", "language": "python", "name": "pyspark2" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.12" } }, "nbformat": 4, "nbformat_minor": 4 }