Exploring Spark CatalogΒΆ

Let us get an overview of Spark Catalog to manage Spark Metastore tables as well as temporary views.

  • Let us say spark is of type SparkSession. There is an attribute as part of spark called as catalog and it is of type pyspark.sql.catalog.Catalog.

  • We can access catalog using spark.catalog.

  • We can permanently or temporarily create tables or views on top of data in a Data Frame.

  • Metadata such as table names, column names, data types etc for the permanent tables or views will be stored in Metastore. We can access the metadata using spark.catalog which is exposed as part of SparkSession object.

  • spark.catalog also provide us the details related to temporary views that are being created. Metadata of these temporary views will not be stored in Spark Metastore.

  • Permanent tables are typically created using databases in spark metastore. If not specified, the tables will be created in default database.

  • There are several methods that are part of spark.catalog. We will explore them in the later topics.

  • Following are some of the tasks that can be performed using spark.catalog object.

    • Check current database and switch to different databases.

    • Create permanent table in metastore.

    • Create or drop temporary views.

    • Register functions.

  • All the above tasks can be performed using SQL style commands passed to spark.sql.

Let us start spark context for this Notebook so that we can execute the code provided. You can sign up for our 10 node state of the art cluster/labs to learn Spark SQL using our unique integrated LMS.

from pyspark.sql import SparkSession

import getpass
username = getpass.getuser()

spark = SparkSession. \
    builder. \
    config('spark.ui.port', '0'). \
    config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
    enableHiveSupport(). \
    appName(f'{username} | Python - Spark Metastore'). \
    master('yarn'). \
    getOrCreate()

If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.

Using Spark SQL

spark2-sql \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse

Using Scala

spark2-shell \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse

Using Pyspark

pyspark2 \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
spark.catalog
<pyspark.sql.catalog.Catalog at 0x7fbe54f639e8>
help(spark.catalog)
Help on Catalog in module pyspark.sql.catalog object:

class Catalog(builtins.object)
 |  User-facing catalog API, accessible through `SparkSession.catalog`.
 |  
 |  This is a thin wrapper around its Scala implementation org.apache.spark.sql.catalog.Catalog.
 |  
 |  Methods defined here:
 |  
 |  __init__(self, sparkSession)
 |      Create a new Catalog that wraps the underlying JVM object.
 |  
 |  cacheTable(self, tableName)
 |      Caches the specified table in-memory.
 |      
 |      .. versionadded:: 2.0
 |  
 |  clearCache(self)
 |      Removes all cached tables from the in-memory cache.
 |      
 |      .. versionadded:: 2.0
 |  
 |  createExternalTable(self, tableName, path=None, source=None, schema=None, **options)
 |      Creates a table based on the dataset in a data source.
 |      
 |      It returns the DataFrame associated with the external table.
 |      
 |      The data source is specified by the ``source`` and a set of ``options``.
 |      If ``source`` is not specified, the default data source configured by
 |      ``spark.sql.sources.default`` will be used.
 |      
 |      Optionally, a schema can be provided as the schema of the returned :class:`DataFrame` and
 |      created external table.
 |      
 |      :return: :class:`DataFrame`
 |      
 |      .. versionadded:: 2.0
 |  
 |  createTable(self, tableName, path=None, source=None, schema=None, **options)
 |      Creates a table based on the dataset in a data source.
 |      
 |      It returns the DataFrame associated with the table.
 |      
 |      The data source is specified by the ``source`` and a set of ``options``.
 |      If ``source`` is not specified, the default data source configured by
 |      ``spark.sql.sources.default`` will be used. When ``path`` is specified, an external table is
 |      created from the data at the given path. Otherwise a managed table is created.
 |      
 |      Optionally, a schema can be provided as the schema of the returned :class:`DataFrame` and
 |      created table.
 |      
 |      :return: :class:`DataFrame`
 |      
 |      .. versionadded:: 2.2
 |  
 |  currentDatabase(self)
 |      Returns the current default database in this session.
 |      
 |      .. versionadded:: 2.0
 |  
 |  dropGlobalTempView(self, viewName)
 |      Drops the global temporary view with the given view name in the catalog.
 |      If the view has been cached before, then it will also be uncached.
 |      Returns true if this view is dropped successfully, false otherwise.
 |      
 |      >>> spark.createDataFrame([(1, 1)]).createGlobalTempView("my_table")
 |      >>> spark.table("global_temp.my_table").collect()
 |      [Row(_1=1, _2=1)]
 |      >>> spark.catalog.dropGlobalTempView("my_table")
 |      >>> spark.table("global_temp.my_table") # doctest: +IGNORE_EXCEPTION_DETAIL
 |      Traceback (most recent call last):
 |          ...
 |      AnalysisException: ...
 |      
 |      .. versionadded:: 2.1
 |  
 |  dropTempView(self, viewName)
 |      Drops the local temporary view with the given view name in the catalog.
 |      If the view has been cached before, then it will also be uncached.
 |      Returns true if this view is dropped successfully, false otherwise.
 |      
 |      Note that, the return type of this method was None in Spark 2.0, but changed to Boolean
 |      in Spark 2.1.
 |      
 |      >>> spark.createDataFrame([(1, 1)]).createTempView("my_table")
 |      >>> spark.table("my_table").collect()
 |      [Row(_1=1, _2=1)]
 |      >>> spark.catalog.dropTempView("my_table")
 |      >>> spark.table("my_table") # doctest: +IGNORE_EXCEPTION_DETAIL
 |      Traceback (most recent call last):
 |          ...
 |      AnalysisException: ...
 |      
 |      .. versionadded:: 2.0
 |  
 |  isCached(self, tableName)
 |      Returns true if the table is currently cached in-memory.
 |      
 |      .. versionadded:: 2.0
 |  
 |  listColumns(self, tableName, dbName=None)
 |      Returns a list of columns for the given table/view in the specified database.
 |      
 |      If no database is specified, the current database is used.
 |      
 |      Note: the order of arguments here is different from that of its JVM counterpart
 |      because Python does not support method overloading.
 |      
 |      .. versionadded:: 2.0
 |  
 |  listDatabases(self)
 |      Returns a list of databases available across all sessions.
 |      
 |      .. versionadded:: 2.0
 |  
 |  listFunctions(self, dbName=None)
 |      Returns a list of functions registered in the specified database.
 |      
 |      If no database is specified, the current database is used.
 |      This includes all temporary functions.
 |      
 |      .. versionadded:: 2.0
 |  
 |  listTables(self, dbName=None)
 |      Returns a list of tables/views in the specified database.
 |      
 |      If no database is specified, the current database is used.
 |      This includes all temporary views.
 |      
 |      .. versionadded:: 2.0
 |  
 |  recoverPartitions(self, tableName)
 |      Recovers all the partitions of the given table and update the catalog.
 |      
 |      Only works with a partitioned table, and not a view.
 |      
 |      .. versionadded:: 2.1.1
 |  
 |  refreshByPath(self, path)
 |      Invalidates and refreshes all the cached data (and the associated metadata) for any
 |      DataFrame that contains the given data source path.
 |      
 |      .. versionadded:: 2.2.0
 |  
 |  refreshTable(self, tableName)
 |      Invalidates and refreshes all the cached data and metadata of the given table.
 |      
 |      .. versionadded:: 2.0
 |  
 |  registerFunction(self, name, f, returnType=None)
 |      An alias for :func:`spark.udf.register`.
 |      See :meth:`pyspark.sql.UDFRegistration.register`.
 |      
 |      .. note:: Deprecated in 2.3.0. Use :func:`spark.udf.register` instead.
 |      
 |      .. versionadded:: 2.0
 |  
 |  setCurrentDatabase(self, dbName)
 |      Sets the current default database in this session.
 |      
 |      .. versionadded:: 2.0
 |  
 |  uncacheTable(self, tableName)
 |      Removes the specified table from the in-memory cache.
 |      
 |      .. versionadded:: 2.0
 |  
 |  ----------------------------------------------------------------------
 |  Data descriptors defined here:
 |  
 |  __dict__
 |      dictionary for instance variables (if defined)
 |  
 |  __weakref__
 |      list of weak references to the object (if defined)