{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "## Read and Process data from Metastore Tables\n", "\n", "Let us see how we can read tables using functions such as `spark.read.table` and process data using Data Frame APIs.\n", "\n", "* Using Data Frame APIs - `spark.read.table(\"table_name\")`.\n", "* We can also prefix the database name to read tables belonging to a particular database.\n", "* When we read the table, it will result in a Data Frame.\n", "* Once Data Frame is created we can use functions such as `filter` or `where`, `groupBy`, `sort` or `orderBy` to process the data in the Data Frame." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let us start spark context for this Notebook so that we can execute the code provided. You can sign up for our [10 node state of the art cluster/labs](https://labs.itversity.com/plans) to learn Spark SQL using our unique integrated LMS." ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "from pyspark.sql import SparkSession\n", "\n", "import getpass\n", "username = getpass.getuser()\n", "\n", "spark = SparkSession. \\\n", " builder. \\\n", " config('spark.ui.port', '0'). \\\n", " config(\"spark.sql.warehouse.dir\", f\"/user/{username}/warehouse\"). \\\n", " enableHiveSupport(). \\\n", " appName(f'{username} | Python - Spark Metastore'). \\\n", " master('yarn'). \\\n", " getOrCreate()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.\n", "\n", "**Using Spark SQL**\n", "\n", "```\n", "spark2-sql \\\n", " --master yarn \\\n", " --conf spark.ui.port=0 \\\n", " --conf spark.sql.warehouse.dir=/user/${USER}/warehouse\n", "```\n", "\n", "**Using Scala**\n", "\n", "```\n", "spark2-shell \\\n", " --master yarn \\\n", " --conf spark.ui.port=0 \\\n", " --conf spark.sql.warehouse.dir=/user/${USER}/warehouse\n", "```\n", "\n", "**Using Pyspark**\n", "\n", "```\n", "pyspark2 \\\n", " --master yarn \\\n", " --conf spark.ui.port=0 \\\n", " --conf spark.sql.warehouse.dir=/user/${USER}/warehouse\n", "```" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "spark.conf.set('spark.sql.shuffle.partitions', '2')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Tasks\n", "Let us see how we can create a table using data in a file and then read into a Data Frame.\n", "\n", "* Create Database for **airtraffic** data." ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "import getpass\n", "username = getpass.getuser()" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", "\n", "
\n" ], "text/plain": [ "++\n", "||\n", "++\n", "++" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "spark.sql(f\"CREATE DATABASE IF NOT EXISTS {username}_airtraffic\")" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "spark.catalog.setCurrentDatabase(f\"{username}_airtraffic\")" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "'itversity_airtraffic'" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "spark.catalog.currentDatabase()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* Create table by name **airport_codes** for file **airport-codes.txt**. The file contains header and each field in each row is delimited by a tab character." ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "airport_codes_path = f\"/user/{username}/airtraffic_all/airport-codes\"" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", "\n", "
\n" ], "text/plain": [ "++\n", "||\n", "++\n", "++" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "spark.sql(f'DROP TABLE {username}_airtraffic.airport_codes')" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [], "source": [ "airport_codes_df = spark. \\\n", " read. \\\n", " csv(airport_codes_path,\n", " sep=\"\\t\",\n", " header=True,\n", " inferSchema=True\n", " )" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [], "source": [ "airport_codes_df.write.saveAsTable(f\"{username}_airtraffic.airport_codes\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* Read data from table and get number of airports by state." ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [], "source": [ "airport_codes = spark.read.table(\"airport_codes\")" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "pyspark.sql.dataframe.DataFrame" ] }, "execution_count": 12, "metadata": {}, "output_type": "execute_result" } ], "source": [ "type(airport_codes)" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----------------------------+------------------------------------------------------------------------------------------+-------+\n", "|col_name |data_type |comment|\n", "+----------------------------+------------------------------------------------------------------------------------------+-------+\n", "|City |string |null |\n", "|State |string |null |\n", "|Country |string |null |\n", "|IATA |string |null |\n", "| | | |\n", "|# Detailed Table Information| | |\n", "|Database |itversity_airtraffic | |\n", "|Table |airport_codes | |\n", "|Owner |itversity | |\n", "|Created Time |Sat Mar 13 09:25:02 EST 2021 | |\n", "|Last Access |Wed Dec 31 19:00:00 EST 1969 | |\n", "|Created By |Spark 2.4.7 | |\n", "|Type |MANAGED | |\n", "|Provider |parquet | |\n", "|Table Properties |[transient_lastDdlTime=1615645502] | |\n", "|Statistics |9048 bytes | |\n", "|Location |hdfs://m01.itversity.com:9000/user/itversity/warehouse/itversity_airtraffic.db/airport_codes| |\n", "|Serde Library |org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe | |\n", "|InputFormat |org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat | |\n", "|OutputFormat |org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat | |\n", "|Storage Properties |[serialization.format=1] | |\n", "+----------------------------+------------------------------------------------------------------------------------------+-------+\n", "\n" ] } ], "source": [ "spark.sql('DESCRIBE FORMATTED airport_codes').show(100, False)" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- City: string (nullable = true)\n", " |-- State: string (nullable = true)\n", " |-- Country: string (nullable = true)\n", " |-- IATA: string (nullable = true)\n", "\n" ] } ], "source": [ "airport_codes.printSchema()" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-----+-----+\n", "|state|count|\n", "+-----+-----+\n", "| BC| 22|\n", "| SD| 7|\n", "| NY| 18|\n", "| NM| 9|\n", "| NE| 9|\n", "| MI| 18|\n", "| NWT| 4|\n", "| NC| 10|\n", "| NJ| 3|\n", "| MD| 3|\n", "| WV| 8|\n", "| MN| 8|\n", "| IL| 12|\n", "| ID| 6|\n", "| IA| 8|\n", "| MO| 8|\n", "| SC| 6|\n", "| VA| 7|\n", "| PEI| 1|\n", "| TN| 6|\n", "+-----+-----+\n", "only showing top 20 rows\n", "\n" ] } ], "source": [ "airport_codes. \\\n", " groupBy(\"state\"). \\\n", " count(). \\\n", " show()" ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [], "source": [ "from pyspark.sql.functions import count, lit, col" ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-----+-------------+\n", "|state|airport_count|\n", "+-----+-------------+\n", "| CA| 29|\n", "| TX| 26|\n", "| AK| 25|\n", "| BC| 22|\n", "| NY| 18|\n", "| ON| 18|\n", "| MI| 18|\n", "| FL| 18|\n", "| MT| 14|\n", "| PA| 13|\n", "| PQ| 13|\n", "| IL| 12|\n", "| CO| 12|\n", "| NC| 10|\n", "| WY| 10|\n", "| NE| 9|\n", "| WI| 9|\n", "| WA| 9|\n", "| GA| 9|\n", "| NM| 9|\n", "+-----+-------------+\n", "only showing top 20 rows\n", "\n" ] } ], "source": [ "airport_codes. \\\n", " groupBy(\"state\"). \\\n", " agg(count(lit(1)).alias('airport_count')). \\\n", " orderBy(col('airport_count').desc()). \\\n", " show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Pyspark 2", "language": "python", "name": "pyspark2" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.12" } }, "nbformat": 4, "nbformat_minor": 4 }