{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "## Creating Partitioned Tables\n", "\n", "We can also create partitioned tables as part of Spark Metastore Tables.\n", "\n", "* There are some challenges in creating partitioned tables directly using `spark.catalog.createTable`.\n", "* But if the directories are similar to partitioned tables with data, we should be able to create partitioned tables.\n", "* Let us create partitioned table for `orders` by `order_month`." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let us start spark context for this Notebook so that we can execute the code provided. You can sign up for our [10 node state of the art cluster/labs](https://labs.itversity.com/plans) to learn Spark SQL using our unique integrated LMS." ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "from pyspark.sql import SparkSession\n", "\n", "import getpass\n", "username = getpass.getuser()\n", "\n", "spark = SparkSession. \\\n", " builder. \\\n", " config('spark.ui.port', '0'). \\\n", " config(\"spark.sql.warehouse.dir\", f\"/user/{username}/warehouse\"). \\\n", " enableHiveSupport(). \\\n", " appName(f'{username} | Python - Spark Metastore'). \\\n", " master('yarn'). \\\n", " getOrCreate()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.\n", "\n", "**Using Spark SQL**\n", "\n", "```\n", "spark2-sql \\\n", " --master yarn \\\n", " --conf spark.ui.port=0 \\\n", " --conf spark.sql.warehouse.dir=/user/${USER}/warehouse\n", "```\n", "\n", "**Using Scala**\n", "\n", "```\n", "spark2-shell \\\n", " --master yarn \\\n", " --conf spark.ui.port=0 \\\n", " --conf spark.sql.warehouse.dir=/user/${USER}/warehouse\n", "```\n", "\n", "**Using Pyspark**\n", "\n", "```\n", "pyspark2 \\\n", " --master yarn \\\n", " --conf spark.ui.port=0 \\\n", " --conf spark.sql.warehouse.dir=/user/${USER}/warehouse\n", "```" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "spark.conf.set('spark.sql.shuffle.partitions', '2')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Tasks\n", "\n", "Let us perform tasks related to partitioned tables.\n", "* Read data from file into data frame.\n", "* Add additional column which will be used to partition the data.\n", "* Write the data into the target location on which we are going to create the table.\n", "* Create partitioned table using the location to which we have copied the data and validate.\n", "* We can recover partitions by running `MSCK REPAIR TABLE` using `spark.sql` or by invoking `spark.catalog.recoverPartitions`.\n", "* When we use `createTable` to create partitioned table, we have to recover partitions so that partitions are visible." ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "import getpass\n", "username = getpass.getuser()" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
order_id | order_date | order_customer_id | order_status | order_month |
---|