Exploring Spark CatalogΒΆ
Let us get an overview of Spark Catalog to manage Spark Metastore tables as well as temporary views.
- Let us say - sparkis of type- SparkSession. There is an attribute as part of- sparkcalled as catalog and it is of type pyspark.sql.catalog.Catalog.
- We can access catalog using - spark.catalog.
- We can permanently or temporarily create tables or views on top of data in a Data Frame. 
- Metadata such as table names, column names, data types etc for the permanent tables or views will be stored in Metastore. We can access the metadata using - spark.catalogwhich is exposed as part of SparkSession object.
- spark.catalogalso provide us the details related to temporary views that are being created. Metadata of these temporary views will not be stored in Spark Metastore.
- Permanent tables are typically created using databases in spark metastore. If not specified, the tables will be created in default database. 
- There are several methods that are part of - spark.catalog. We will explore them in the later topics.
- Following are some of the tasks that can be performed using - spark.catalogobject.- Check current database and switch to different databases. 
- Create permanent table in metastore. 
- Create or drop temporary views. 
- Register functions. 
 
- All the above tasks can be performed using SQL style commands passed to - spark.sql.
Let us start spark context for this Notebook so that we can execute the code provided. You can sign up for our 10 node state of the art cluster/labs to learn Spark SQL using our unique integrated LMS.
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession. \
    builder. \
    config('spark.ui.port', '0'). \
    config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
    enableHiveSupport(). \
    appName(f'{username} | Python - Spark Metastore'). \
    master('yarn'). \
    getOrCreate()
If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.
Using Spark SQL
spark2-sql \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
Using Scala
spark2-shell \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
Using Pyspark
pyspark2 \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
spark.catalog
<pyspark.sql.catalog.Catalog at 0x7fbe54f639e8>
help(spark.catalog)
Help on Catalog in module pyspark.sql.catalog object:
class Catalog(builtins.object)
 |  User-facing catalog API, accessible through `SparkSession.catalog`.
 |  
 |  This is a thin wrapper around its Scala implementation org.apache.spark.sql.catalog.Catalog.
 |  
 |  Methods defined here:
 |  
 |  __init__(self, sparkSession)
 |      Create a new Catalog that wraps the underlying JVM object.
 |  
 |  cacheTable(self, tableName)
 |      Caches the specified table in-memory.
 |      
 |      .. versionadded:: 2.0
 |  
 |  clearCache(self)
 |      Removes all cached tables from the in-memory cache.
 |      
 |      .. versionadded:: 2.0
 |  
 |  createExternalTable(self, tableName, path=None, source=None, schema=None, **options)
 |      Creates a table based on the dataset in a data source.
 |      
 |      It returns the DataFrame associated with the external table.
 |      
 |      The data source is specified by the ``source`` and a set of ``options``.
 |      If ``source`` is not specified, the default data source configured by
 |      ``spark.sql.sources.default`` will be used.
 |      
 |      Optionally, a schema can be provided as the schema of the returned :class:`DataFrame` and
 |      created external table.
 |      
 |      :return: :class:`DataFrame`
 |      
 |      .. versionadded:: 2.0
 |  
 |  createTable(self, tableName, path=None, source=None, schema=None, **options)
 |      Creates a table based on the dataset in a data source.
 |      
 |      It returns the DataFrame associated with the table.
 |      
 |      The data source is specified by the ``source`` and a set of ``options``.
 |      If ``source`` is not specified, the default data source configured by
 |      ``spark.sql.sources.default`` will be used. When ``path`` is specified, an external table is
 |      created from the data at the given path. Otherwise a managed table is created.
 |      
 |      Optionally, a schema can be provided as the schema of the returned :class:`DataFrame` and
 |      created table.
 |      
 |      :return: :class:`DataFrame`
 |      
 |      .. versionadded:: 2.2
 |  
 |  currentDatabase(self)
 |      Returns the current default database in this session.
 |      
 |      .. versionadded:: 2.0
 |  
 |  dropGlobalTempView(self, viewName)
 |      Drops the global temporary view with the given view name in the catalog.
 |      If the view has been cached before, then it will also be uncached.
 |      Returns true if this view is dropped successfully, false otherwise.
 |      
 |      >>> spark.createDataFrame([(1, 1)]).createGlobalTempView("my_table")
 |      >>> spark.table("global_temp.my_table").collect()
 |      [Row(_1=1, _2=1)]
 |      >>> spark.catalog.dropGlobalTempView("my_table")
 |      >>> spark.table("global_temp.my_table") # doctest: +IGNORE_EXCEPTION_DETAIL
 |      Traceback (most recent call last):
 |          ...
 |      AnalysisException: ...
 |      
 |      .. versionadded:: 2.1
 |  
 |  dropTempView(self, viewName)
 |      Drops the local temporary view with the given view name in the catalog.
 |      If the view has been cached before, then it will also be uncached.
 |      Returns true if this view is dropped successfully, false otherwise.
 |      
 |      Note that, the return type of this method was None in Spark 2.0, but changed to Boolean
 |      in Spark 2.1.
 |      
 |      >>> spark.createDataFrame([(1, 1)]).createTempView("my_table")
 |      >>> spark.table("my_table").collect()
 |      [Row(_1=1, _2=1)]
 |      >>> spark.catalog.dropTempView("my_table")
 |      >>> spark.table("my_table") # doctest: +IGNORE_EXCEPTION_DETAIL
 |      Traceback (most recent call last):
 |          ...
 |      AnalysisException: ...
 |      
 |      .. versionadded:: 2.0
 |  
 |  isCached(self, tableName)
 |      Returns true if the table is currently cached in-memory.
 |      
 |      .. versionadded:: 2.0
 |  
 |  listColumns(self, tableName, dbName=None)
 |      Returns a list of columns for the given table/view in the specified database.
 |      
 |      If no database is specified, the current database is used.
 |      
 |      Note: the order of arguments here is different from that of its JVM counterpart
 |      because Python does not support method overloading.
 |      
 |      .. versionadded:: 2.0
 |  
 |  listDatabases(self)
 |      Returns a list of databases available across all sessions.
 |      
 |      .. versionadded:: 2.0
 |  
 |  listFunctions(self, dbName=None)
 |      Returns a list of functions registered in the specified database.
 |      
 |      If no database is specified, the current database is used.
 |      This includes all temporary functions.
 |      
 |      .. versionadded:: 2.0
 |  
 |  listTables(self, dbName=None)
 |      Returns a list of tables/views in the specified database.
 |      
 |      If no database is specified, the current database is used.
 |      This includes all temporary views.
 |      
 |      .. versionadded:: 2.0
 |  
 |  recoverPartitions(self, tableName)
 |      Recovers all the partitions of the given table and update the catalog.
 |      
 |      Only works with a partitioned table, and not a view.
 |      
 |      .. versionadded:: 2.1.1
 |  
 |  refreshByPath(self, path)
 |      Invalidates and refreshes all the cached data (and the associated metadata) for any
 |      DataFrame that contains the given data source path.
 |      
 |      .. versionadded:: 2.2.0
 |  
 |  refreshTable(self, tableName)
 |      Invalidates and refreshes all the cached data and metadata of the given table.
 |      
 |      .. versionadded:: 2.0
 |  
 |  registerFunction(self, name, f, returnType=None)
 |      An alias for :func:`spark.udf.register`.
 |      See :meth:`pyspark.sql.UDFRegistration.register`.
 |      
 |      .. note:: Deprecated in 2.3.0. Use :func:`spark.udf.register` instead.
 |      
 |      .. versionadded:: 2.0
 |  
 |  setCurrentDatabase(self, dbName)
 |      Sets the current default database in this session.
 |      
 |      .. versionadded:: 2.0
 |  
 |  uncacheTable(self, tableName)
 |      Removes the specified table from the in-memory cache.
 |      
 |      .. versionadded:: 2.0
 |  
 |  ----------------------------------------------------------------------
 |  Data descriptors defined here:
 |  
 |  __dict__
 |      dictionary for instance variables (if defined)
 |  
 |  __weakref__
 |      list of weak references to the object (if defined)