Creating Metastore Tables using catalog¶
Data Frames can be written into Metastore Tables using APIs such as saveAsTable
and insertInto
available as part of write on top of objects of type Data Frame.
We can create a new table using Data Frame using
saveAsTable
. We can also create an empty table by usingspark.catalog.createTable
orspark.catalog.createExternalTable
.We can also prefix the database name to write data into tables belonging to a particular database. If the database is not specified then the session will be attached to default database.
We can also attach or connect the current session to a specific database using
spark.catalog.setCurrentDatabase
.Databases can be created using
spark.sql("CREATE DATABASE database_name")
. We can list Databases usingspark.sql
orspark.catalog.listDatabases()
We can use modes such as
append
,overwrite
anderror
withsaveAsTable
. Default is error.We can use modes such as
append
andoverwrite
withinsertInto
. Default is append.When we use
saveAsTable
, following happens:Check for table if the table already exists. By default
saveAsTable
will throw exception.If the table does not exists the table will be created.
Data from Data Frame will be copied into the table.
We can alter the behavior by using mode. We can overwrite the existing table or we can append into it.
We can list the tables using
spark.catalog.listTables
after switching to appropriate database usingspark.catalog.setCurrentDatabase
.We can also switch the database and list tables using
spark.sql
.
Let us start spark context for this Notebook so that we can execute the code provided. You can sign up for our 10 node state of the art cluster/labs to learn Spark SQL using our unique integrated LMS.
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession. \
builder. \
config('spark.ui.port', '0'). \
config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
enableHiveSupport(). \
appName(f'{username} | Python - Spark Metastore'). \
master('yarn'). \
getOrCreate()
If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.
Using Spark SQL
spark2-sql \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
Using Scala
spark2-shell \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
Using Pyspark
pyspark2 \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
spark.conf.set('spark.sql.shuffle.partitions', '2')
spark.catalog?
Type: property
String form: <property object at 0x7fe1e92d4728>
Docstring:
Interface through which the user may create, drop, alter or query underlying
databases, tables, functions, etc.
:return: :class:`Catalog`
.. versionadded:: 2.0
help(spark.catalog)
Tasks¶
Let us perform few tasks to understand how to write a Data Frame into Metastore tables and also list them.
Create database by name demo_db in the metastore. We need to use
spark.sql
as there is no function to create database underspark.catalog
.
import getpass
username = getpass.getuser()
username
'itversity'
spark.sql(f"DROP DATABASE IF EXISTS {username}_demo_db CASCADE")
spark.sql(f"CREATE DATABASE {username}_demo_db")
spark.catalog.setCurrentDatabase(f'{username}_demo_db')
List the databases using both API as well as SQL approach. As we have too many databases in our environment, it might take too much time to return the results
spark.catalog.listDatabases()
spark.catalog.currentDatabase()
'itversity_demo_db'
Create a Data Frame which contain one column by name dummy and one row with value X.
l = [("X", )]
df = spark.createDataFrame(l, schema="dummy STRING")
spark.catalog.listTables()
[]
df.show()
+-----+
|dummy|
+-----+
| X|
+-----+
Create a table by name dual for the above Data Frame in the database created.
df.write.saveAsTable?
Signature:
df.write.saveAsTable(
name,
format=None,
mode=None,
partitionBy=None,
**options,
)
Docstring:
Saves the content of the :class:`DataFrame` as the specified table.
In the case the table already exists, behavior of this function depends on the
save mode, specified by the `mode` function (default to throwing an exception).
When `mode` is `Overwrite`, the schema of the :class:`DataFrame` does not need to be
the same as that of the existing table.
* `append`: Append contents of this :class:`DataFrame` to existing data.
* `overwrite`: Overwrite existing data.
* `error` or `errorifexists`: Throw an exception if data already exists.
* `ignore`: Silently ignore this operation if data already exists.
:param name: the table name
:param format: the format used to save
:param mode: one of `append`, `overwrite`, `error`, `errorifexists`, `ignore` (default: error)
:param partitionBy: names of partitioning columns
:param options: all other string options
.. versionadded:: 1.4
File: /opt/spark-3.0.1-bin-hadoop3.2/python/pyspark/sql/readwriter.py
Type: method
df.write.saveAsTable("dual", mode='overwrite')
spark.catalog.listTables()
[Table(name='dual', database='itversity_demo_db', description=None, tableType='MANAGED', isTemporary=False)]
spark.read.table("dual").show()
+-----+
|dummy|
+-----+
| X|
+-----+
spark.sql('SELECT * FROM dual').show()
+-----+
|dummy|
+-----+
| X|
+-----+
Create Empty table and insert data into it.
spark.sql("DROP TABLE dual")
spark.catalog.listTables()
[]
df.show()
+-----+
|dummy|
+-----+
| X|
+-----+
schema = df.schema
schema
StructType(List(StructField(dummy,StringType,true)))
spark.catalog.createTable?
Signature:
spark.catalog.createTable(
tableName,
path=None,
source=None,
schema=None,
**options,
)
Docstring:
Creates a table based on the dataset in a data source.
It returns the DataFrame associated with the table.
The data source is specified by the ``source`` and a set of ``options``.
If ``source`` is not specified, the default data source configured by
``spark.sql.sources.default`` will be used. When ``path`` is specified, an external table is
created from the data at the given path. Otherwise a managed table is created.
Optionally, a schema can be provided as the schema of the returned :class:`DataFrame` and
created table.
:return: :class:`DataFrame`
.. versionadded:: 2.2
File: /opt/spark-3.0.1-bin-hadoop3.2/python/pyspark/sql/catalog.py
Type: method
spark.catalog.createTable('dual', schema=schema)
dummy |
---|
spark.catalog.listTables()
[Table(name='dual', database='itversity_demo_db', description=None, tableType='MANAGED', isTemporary=False)]
df.write.insertInto?
Signature: df.write.insertInto(tableName, overwrite=None)
Docstring:
Inserts the content of the :class:`DataFrame` to the specified table.
It requires that the schema of the :class:`DataFrame` is the same as the
schema of the table.
Optionally overwriting any existing data.
.. versionadded:: 1.4
File: /opt/spark-3.0.1-bin-hadoop3.2/python/pyspark/sql/readwriter.py
Type: method
df.write.insertInto('dual')
spark.read.table("dual").show()
+-----+
|dummy|
+-----+
| X|
+-----+
spark.sql('SELECT * FROM dual').show()
+-----+
|dummy|
+-----+
| X|
+-----+
Let us drop the table dual and then database db. We need to use
spark.sql
asspark.catalog
does not have API to drop the tables or databases.
spark.sql("DROP TABLE dual")
spark.sql(f"DROP DATABASE {username}_demo_db")
# We can use CASCADE to drop database along with tables.
spark.sql(f"DROP DATABASE IF EXISTS {username}_demo_db CASCADE")