{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "## Using Spark SQL\n", "\n", "Let us understand how we can use Spark SQL to process data in Metastore Tables and Temporary Views.\n", "\n", "* Once tables in metastore or temporary views are created, we can run queries against the tables or temporary views to perform all standard transformations.\n", "* We will create metastore tables for orders and order_items data sets. We will also create temporary view for products data set.\n", "* We will create metastore tables using `spark.sql` by passing `CREATE TABLE` statements as strings.\n", "* Using Spark SQL, we will join metastore tables and temporary view in the same query." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let us start spark context for this Notebook so that we can execute the code provided. You can sign up for our [10 node state of the art cluster/labs](https://labs.itversity.com/plans) to learn Spark SQL using our unique integrated LMS." ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "from pyspark.sql import SparkSession\n", "\n", "import getpass\n", "username = getpass.getuser()\n", "\n", "spark = SparkSession. \\\n", " builder. \\\n", " config('spark.ui.port', '0'). \\\n", " config(\"spark.sql.warehouse.dir\", f\"/user/{username}/warehouse\"). \\\n", " enableHiveSupport(). \\\n", " appName(f'{username} | Python - Spark Metastore'). \\\n", " master('yarn'). \\\n", " getOrCreate()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.\n", "\n", "**Using Spark SQL**\n", "\n", "```\n", "spark2-sql \\\n", " --master yarn \\\n", " --conf spark.ui.port=0 \\\n", " --conf spark.sql.warehouse.dir=/user/${USER}/warehouse\n", "```\n", "\n", "**Using Scala**\n", "\n", "```\n", "spark2-shell \\\n", " --master yarn \\\n", " --conf spark.ui.port=0 \\\n", " --conf spark.sql.warehouse.dir=/user/${USER}/warehouse\n", "```\n", "\n", "**Using Pyspark**\n", "\n", "```\n", "pyspark2 \\\n", " --master yarn \\\n", " --conf spark.ui.port=0 \\\n", " --conf spark.sql.warehouse.dir=/user/${USER}/warehouse\n", "```" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "spark.conf.set('spark.sql.shuffle.partitions', '2')" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "import getpass\n", "username = getpass.getuser()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* Drop retail database if it exists using cascade and recreate the database." ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "data": { "text/html": [ "