Using Spark SQLΒΆ

Let us understand how we can use Spark SQL to process data in Metastore Tables and Temporary Views.

  • Once tables in metastore or temporary views are created, we can run queries against the tables or temporary views to perform all standard transformations.

  • We will create metastore tables for orders and order_items data sets. We will also create temporary view for products data set.

  • We will create metastore tables using spark.sql by passing CREATE TABLE statements as strings.

  • Using Spark SQL, we will join metastore tables and temporary view in the same query.

Let us start spark context for this Notebook so that we can execute the code provided. You can sign up for our 10 node state of the art cluster/labs to learn Spark SQL using our unique integrated LMS.

from pyspark.sql import SparkSession

import getpass
username = getpass.getuser()

spark = SparkSession. \
    builder. \
    config('spark.ui.port', '0'). \
    config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
    enableHiveSupport(). \
    appName(f'{username} | Python - Spark Metastore'). \
    master('yarn'). \
    getOrCreate()

If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.

Using Spark SQL

spark2-sql \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse

Using Scala

spark2-shell \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse

Using Pyspark

pyspark2 \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
spark.conf.set('spark.sql.shuffle.partitions', '2')
import getpass
username = getpass.getuser()
  • Drop retail database if it exists using cascade and recreate the database.

spark.sql(f'DROP DATABASE IF EXISTS {username}_retail CASCADE')
spark.sql(f'CREATE DATABASE {username}_retail')
spark.sql(f'USE {username}_retail')
spark.sql('SELECT current_database()').show()
+------------------+
|current_database()|
+------------------+
|  itversity_retail|
+------------------+
spark.sql('SHOW tables').show()
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
+--------+---------+-----------+
  • Create table for orders and load data into the table.

spark.sql("""
    CREATE TABLE orders (
        order_id INT,
        order_date STRING,
        order_customer_id INT,
        order_status STRING
    ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
""")
spark.sql("""
    LOAD DATA LOCAL INPATH '/data/retail_db/orders' INTO TABLE orders
""")
spark.sql("SELECT * FROM orders").show()
+--------+--------------------+-----------------+---------------+
|order_id|          order_date|order_customer_id|   order_status|
+--------+--------------------+-----------------+---------------+
|       1|2013-07-25 00:00:...|            11599|         CLOSED|
|       2|2013-07-25 00:00:...|              256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:...|            12111|       COMPLETE|
|       4|2013-07-25 00:00:...|             8827|         CLOSED|
|       5|2013-07-25 00:00:...|            11318|       COMPLETE|
|       6|2013-07-25 00:00:...|             7130|       COMPLETE|
|       7|2013-07-25 00:00:...|             4530|       COMPLETE|
|       8|2013-07-25 00:00:...|             2911|     PROCESSING|
|       9|2013-07-25 00:00:...|             5657|PENDING_PAYMENT|
|      10|2013-07-25 00:00:...|             5648|PENDING_PAYMENT|
|      11|2013-07-25 00:00:...|              918| PAYMENT_REVIEW|
|      12|2013-07-25 00:00:...|             1837|         CLOSED|
|      13|2013-07-25 00:00:...|             9149|PENDING_PAYMENT|
|      14|2013-07-25 00:00:...|             9842|     PROCESSING|
|      15|2013-07-25 00:00:...|             2568|       COMPLETE|
|      16|2013-07-25 00:00:...|             7276|PENDING_PAYMENT|
|      17|2013-07-25 00:00:...|             2667|       COMPLETE|
|      18|2013-07-25 00:00:...|             1205|         CLOSED|
|      19|2013-07-25 00:00:...|             9488|PENDING_PAYMENT|
|      20|2013-07-25 00:00:...|             9198|     PROCESSING|
+--------+--------------------+-----------------+---------------+
only showing top 20 rows
  • Create table for order_items and load data into the table.

spark.sql("""
    CREATE TABLE order_items (
        order_item_id INT,
        order_item_order_id INT,
        order_item_product_id INT,
        order_item_quantity INT,
        order_item_subtotal FLOAT,
        order_item_product_price FLOAT
    ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
""")
spark.sql("""
    LOAD DATA LOCAL INPATH '/data/retail_db/order_items' INTO TABLE order_items
""")
spark.sql("SELECT * FROM order_items").show()
+-------------+-------------------+---------------------+-------------------+-------------------+------------------------+
|order_item_id|order_item_order_id|order_item_product_id|order_item_quantity|order_item_subtotal|order_item_product_price|
+-------------+-------------------+---------------------+-------------------+-------------------+------------------------+
|            1|                  1|                  957|                  1|             299.98|                  299.98|
|            2|                  2|                 1073|                  1|             199.99|                  199.99|
|            3|                  2|                  502|                  5|              250.0|                    50.0|
|            4|                  2|                  403|                  1|             129.99|                  129.99|
|            5|                  4|                  897|                  2|              49.98|                   24.99|
|            6|                  4|                  365|                  5|             299.95|                   59.99|
|            7|                  4|                  502|                  3|              150.0|                    50.0|
|            8|                  4|                 1014|                  4|             199.92|                   49.98|
|            9|                  5|                  957|                  1|             299.98|                  299.98|
|           10|                  5|                  365|                  5|             299.95|                   59.99|
|           11|                  5|                 1014|                  2|              99.96|                   49.98|
|           12|                  5|                  957|                  1|             299.98|                  299.98|
|           13|                  5|                  403|                  1|             129.99|                  129.99|
|           14|                  7|                 1073|                  1|             199.99|                  199.99|
|           15|                  7|                  957|                  1|             299.98|                  299.98|
|           16|                  7|                  926|                  5|              79.95|                   15.99|
|           17|                  8|                  365|                  3|             179.97|                   59.99|
|           18|                  8|                  365|                  5|             299.95|                   59.99|
|           19|                  8|                 1014|                  4|             199.92|                   49.98|
|           20|                  8|                  502|                  1|               50.0|                    50.0|
+-------------+-------------------+---------------------+-------------------+-------------------+------------------------+
only showing top 20 rows
spark.sql("SHOW tables").show()
+----------------+-----------+-----------+
|        database|  tableName|isTemporary|
+----------------+-----------+-----------+
|itversity_retail|order_items|      false|
|itversity_retail|     orders|      false|
+----------------+-----------+-----------+
  • Create Dataframe for products data using json files under /public/retail_db_json/products

products = spark. \
    read. \
    json('/public/retail_db_json/products')
products.printSchema()
root
 |-- product_category_id: long (nullable = true)
 |-- product_description: string (nullable = true)
 |-- product_id: long (nullable = true)
 |-- product_image: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- product_price: double (nullable = true)
products.show()
+-------------------+-------------------+----------+--------------------+--------------------+-------------+
|product_category_id|product_description|product_id|       product_image|        product_name|product_price|
+-------------------+-------------------+----------+--------------------+--------------------+-------------+
|                  2|                   |         1|http://images.acm...|Quest Q64 10 FT. ...|        59.98|
|                  2|                   |         2|http://images.acm...|Under Armour Men'...|       129.99|
|                  2|                   |         3|http://images.acm...|Under Armour Men'...|        89.99|
|                  2|                   |         4|http://images.acm...|Under Armour Men'...|        89.99|
|                  2|                   |         5|http://images.acm...|Riddell Youth Rev...|       199.99|
|                  2|                   |         6|http://images.acm...|Jordan Men's VI R...|       134.99|
|                  2|                   |         7|http://images.acm...|Schutt Youth Recr...|        99.99|
|                  2|                   |         8|http://images.acm...|Nike Men's Vapor ...|       129.99|
|                  2|                   |         9|http://images.acm...|Nike Adult Vapor ...|         50.0|
|                  2|                   |        10|http://images.acm...|Under Armour Men'...|       129.99|
|                  2|                   |        11|http://images.acm...|Fitness Gear 300 ...|       209.99|
|                  2|                   |        12|http://images.acm...|Under Armour Men'...|       139.99|
|                  2|                   |        13|http://images.acm...|Under Armour Men'...|        89.99|
|                  2|                   |        14|http://images.acm...|Quik Shade Summit...|       199.99|
|                  2|                   |        15|http://images.acm...|Under Armour Kids...|        59.99|
|                  2|                   |        16|http://images.acm...|Riddell Youth 360...|       299.99|
|                  2|                   |        17|http://images.acm...|Under Armour Men'...|       129.99|
|                  2|                   |        18|http://images.acm...|Reebok Men's Full...|        29.97|
|                  2|                   |        19|http://images.acm...|Nike Men's Finger...|       124.99|
|                  2|                   |        20|http://images.acm...|Under Armour Men'...|       129.99|
+-------------------+-------------------+----------+--------------------+--------------------+-------------+
only showing top 20 rows
products.createOrReplaceTempView('products_v')
spark.sql("SHOW tables").show()
+----------------+-----------+-----------+
|        database|  tableName|isTemporary|
+----------------+-----------+-----------+
|itversity_retail|order_items|      false|
|itversity_retail|     orders|      false|
|                | products_v|       true|
+----------------+-----------+-----------+
  • Join all the 3 tables to get daily product revenue using orders.order_date, products.product_id, products.product_name and order_items.order_item_subtotal. Also, consider only orders whose status is either COMPLETE or CLOSED.

spark.sql("""
    SELECT o.order_date,
        p.product_id,
        p.product_name,
        round(sum(oi.order_item_subtotal), 2) AS revenue
    FROM orders AS o JOIN order_items AS oi
        ON o.order_id = oi.order_item_order_id
    JOIN products_v AS p
        ON p.product_id = oi.order_item_product_id
    WHERE o.order_status IN ('COMPLETE', 'CLOSED')
    GROUP BY o.order_date,
        p.product_id,
        p.product_name
    ORDER BY o.order_date, revenue DESC
"""). \
    show()
+--------------------+----------+--------------------+-------+
|          order_date|product_id|        product_name|revenue|
+--------------------+----------+--------------------+-------+
|2013-07-25 00:00:...|      1004|Field & Stream Sp...|5599.72|
|2013-07-25 00:00:...|       191|Nike Men's Free 5...|5099.49|
|2013-07-25 00:00:...|       957|Diamondback Women...| 4499.7|
|2013-07-25 00:00:...|       365|Perfect Fitness P...|3359.44|
|2013-07-25 00:00:...|      1073|Pelican Sunstream...|2999.85|
|2013-07-25 00:00:...|      1014|O'Brien Men's Neo...|2798.88|
|2013-07-25 00:00:...|       403|Nike Men's CJ Eli...|1949.85|
|2013-07-25 00:00:...|       502|Nike Men's Dri-FI...| 1650.0|
|2013-07-25 00:00:...|       627|Under Armour Girl...|1079.73|
|2013-07-25 00:00:...|       226|Bowflex SelectTec...| 599.99|
|2013-07-25 00:00:...|        24|Elevation Trainin...| 319.96|
|2013-07-25 00:00:...|       821|Titleist Pro V1 H...| 207.96|
|2013-07-25 00:00:...|       625|Nike Men's Kobe I...| 199.99|
|2013-07-25 00:00:...|       705|Cleveland Golf Wo...| 119.99|
|2013-07-25 00:00:...|       572|TYR Boys' Team Di...| 119.97|
|2013-07-25 00:00:...|       666|Merrell Men's All...| 109.99|
|2013-07-25 00:00:...|       725|LIJA Women's Butt...|  108.0|
|2013-07-25 00:00:...|       134|Nike Women's Lege...|  100.0|
|2013-07-25 00:00:...|       906|Team Golf Tenness...|  99.96|
|2013-07-25 00:00:...|       828|Bridgestone e6 St...|  95.97|
+--------------------+----------+--------------------+-------+
only showing top 20 rows