Creating Metastore Tables using catalog

Data Frames can be written into Metastore Tables using APIs such as saveAsTable and insertInto available as part of write on top of objects of type Data Frame.

  • We can create a new table using Data Frame using saveAsTable. We can also create an empty table by using spark.catalog.createTable or spark.catalog.createExternalTable.

  • We can also prefix the database name to write data into tables belonging to a particular database. If the database is not specified then the session will be attached to default database.

  • We can also attach or connect the current session to a specific database using spark.catalog.setCurrentDatabase.

  • Databases can be created using spark.sql("CREATE DATABASE database_name"). We can list Databases using spark.sql or spark.catalog.listDatabases()

  • We can use modes such as append, overwrite and error with saveAsTable. Default is error.

  • We can use modes such as append and overwrite with insertInto. Default is append.

  • When we use saveAsTable, following happens:

    • Check for table if the table already exists. By default saveAsTable will throw exception.

    • If the table does not exists the table will be created.

    • Data from Data Frame will be copied into the table.

    • We can alter the behavior by using mode. We can overwrite the existing table or we can append into it.

  • We can list the tables using spark.catalog.listTables after switching to appropriate database using spark.catalog.setCurrentDatabase.

  • We can also switch the database and list tables using spark.sql.

Let us start spark context for this Notebook so that we can execute the code provided. You can sign up for our 10 node state of the art cluster/labs to learn Spark SQL using our unique integrated LMS.

from pyspark.sql import SparkSession

import getpass
username = getpass.getuser()

spark = SparkSession. \
    builder. \
    config('spark.ui.port', '0'). \
    config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
    enableHiveSupport(). \
    appName(f'{username} | Python - Spark Metastore'). \
    master('yarn'). \
    getOrCreate()

If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.

Using Spark SQL

spark2-sql \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse

Using Scala

spark2-shell \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse

Using Pyspark

pyspark2 \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
spark.conf.set('spark.sql.shuffle.partitions', '2')
spark.catalog?
Type:        property
String form: <property object at 0x7fe1e92d4728>
Docstring:  
Interface through which the user may create, drop, alter or query underlying
databases, tables, functions, etc.

:return: :class:`Catalog`

.. versionadded:: 2.0
help(spark.catalog)

Tasks

Let us perform few tasks to understand how to write a Data Frame into Metastore tables and also list them.

  • Create database by name demo_db in the metastore. We need to use spark.sql as there is no function to create database under spark.catalog.

import getpass
username = getpass.getuser()
username
'itversity'
spark.sql(f"DROP DATABASE IF EXISTS {username}_demo_db CASCADE")
spark.sql(f"CREATE DATABASE {username}_demo_db")
spark.catalog.setCurrentDatabase(f'{username}_demo_db')
  • List the databases using both API as well as SQL approach. As we have too many databases in our environment, it might take too much time to return the results

spark.catalog.listDatabases()
spark.catalog.currentDatabase()
'itversity_demo_db'
  • Create a Data Frame which contain one column by name dummy and one row with value X.

l = [("X", )]
df = spark.createDataFrame(l, schema="dummy STRING")
spark.catalog.listTables()
[]
df.show()
+-----+
|dummy|
+-----+
|    X|
+-----+
  • Create a table by name dual for the above Data Frame in the database created.

df.write.saveAsTable?
Signature:
df.write.saveAsTable(
    name,
    format=None,
    mode=None,
    partitionBy=None,
    **options,
)
Docstring:
Saves the content of the :class:`DataFrame` as the specified table.

In the case the table already exists, behavior of this function depends on the
save mode, specified by the `mode` function (default to throwing an exception).
When `mode` is `Overwrite`, the schema of the :class:`DataFrame` does not need to be
the same as that of the existing table.

* `append`: Append contents of this :class:`DataFrame` to existing data.
* `overwrite`: Overwrite existing data.
* `error` or `errorifexists`: Throw an exception if data already exists.
* `ignore`: Silently ignore this operation if data already exists.

:param name: the table name
:param format: the format used to save
:param mode: one of `append`, `overwrite`, `error`, `errorifexists`, `ignore`                      (default: error)
:param partitionBy: names of partitioning columns
:param options: all other string options

.. versionadded:: 1.4
File:      /opt/spark-3.0.1-bin-hadoop3.2/python/pyspark/sql/readwriter.py
Type:      method
df.write.saveAsTable("dual", mode='overwrite')
spark.catalog.listTables()
[Table(name='dual', database='itversity_demo_db', description=None, tableType='MANAGED', isTemporary=False)]
spark.read.table("dual").show()
+-----+
|dummy|
+-----+
|    X|
+-----+
spark.sql('SELECT * FROM dual').show()
+-----+
|dummy|
+-----+
|    X|
+-----+
  • Create Empty table and insert data into it.

spark.sql("DROP TABLE dual")
spark.catalog.listTables()
[]
df.show()
+-----+
|dummy|
+-----+
|    X|
+-----+
schema = df.schema
schema
StructType(List(StructField(dummy,StringType,true)))
spark.catalog.createTable?
Signature:
spark.catalog.createTable(
    tableName,
    path=None,
    source=None,
    schema=None,
    **options,
)
Docstring:
Creates a table based on the dataset in a data source.

It returns the DataFrame associated with the table.

The data source is specified by the ``source`` and a set of ``options``.
If ``source`` is not specified, the default data source configured by
``spark.sql.sources.default`` will be used. When ``path`` is specified, an external table is
created from the data at the given path. Otherwise a managed table is created.

Optionally, a schema can be provided as the schema of the returned :class:`DataFrame` and
created table.

:return: :class:`DataFrame`

.. versionadded:: 2.2
File:      /opt/spark-3.0.1-bin-hadoop3.2/python/pyspark/sql/catalog.py
Type:      method
spark.catalog.createTable('dual', schema=schema)
dummy
spark.catalog.listTables()
[Table(name='dual', database='itversity_demo_db', description=None, tableType='MANAGED', isTemporary=False)]
df.write.insertInto?
Signature: df.write.insertInto(tableName, overwrite=None)
Docstring:
Inserts the content of the :class:`DataFrame` to the specified table.

It requires that the schema of the :class:`DataFrame` is the same as the
schema of the table.

Optionally overwriting any existing data.

.. versionadded:: 1.4
File:      /opt/spark-3.0.1-bin-hadoop3.2/python/pyspark/sql/readwriter.py
Type:      method
df.write.insertInto('dual')
spark.read.table("dual").show()
+-----+
|dummy|
+-----+
|    X|
+-----+
spark.sql('SELECT * FROM dual').show()
+-----+
|dummy|
+-----+
|    X|
+-----+
  • Let us drop the table dual and then database db. We need to use spark.sql as spark.catalog does not have API to drop the tables or databases.

spark.sql("DROP TABLE dual")
spark.sql(f"DROP DATABASE {username}_demo_db")
# We can use CASCADE to drop database along with tables.
spark.sql(f"DROP DATABASE IF EXISTS {username}_demo_db CASCADE")