## Creating Metastore Tables using catalog

Data Frames can be written into Metastore Tables using APIs such as `saveAsTable` and `insertInto` available as part of write on top of objects of type Data Frame.

* We can create a new table using Data Frame using `saveAsTable`. We can also create an empty table by using `spark.catalog.createTable` or `spark.catalog.createExternalTable`.
* We can also prefix the database name to write data into tables belonging to a particular database. If the database is not specified then the session will be attached to default database.
* We can also attach or connect the current session to a specific database using `spark.catalog.setCurrentDatabase`.
* Databases can be created using `spark.sql("CREATE DATABASE database_name")`. We can list Databases using `spark.sql` or `spark.catalog.listDatabases()`
* We can use modes such as `append`, `overwrite` and `error` with `saveAsTable`. Default is error.
* We can use modes such as `append` and `overwrite` with `insertInto`. Default is append.
* When we use `saveAsTable`, following happens:
  * Check for table if the table already exists. By default `saveAsTable` will throw exception.
  * If the table does not exists the table will be created.
  * Data from Data Frame will be copied into the table.
  * We can alter the behavior by using mode. We can overwrite the existing table or we can append into it.
* We can list the tables using `spark.catalog.listTables` after switching to appropriate database using `spark.catalog.setCurrentDatabase`.
* We can also switch the database and list tables using `spark.sql`.

Let us start spark context for this Notebook so that we can execute the code provided. You can sign up for our [10 node state of the art cluster/labs](https://labs.itversity.com/plans) to learn Spark SQL using our unique integrated LMS.

In [1]:
from pyspark.sql import SparkSession

import getpass
username = getpass.getuser()

spark = SparkSession. \
    builder. \
    config('spark.ui.port', '0'). \
    config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
    enableHiveSupport(). \
    appName(f'{username} | Python - Spark Metastore'). \
    master('yarn'). \
    getOrCreate()

If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.

**Using Spark SQL**

```
spark2-sql \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
```

**Using Scala**

```
spark2-shell \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
```

**Using Pyspark**

```
pyspark2 \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
```

In [2]:
spark.conf.set('spark.sql.shuffle.partitions', '2')

In [3]:
spark.catalog?

[0;31mType:[0m        property
[0;31mString form:[0m <property object at 0x7fe1e92d4728>
[0;31mDocstring:[0m  
Interface through which the user may create, drop, alter or query underlying
databases, tables, functions, etc.

:return: :class:`Catalog`

.. versionadded:: 2.0


In [None]:
help(spark.catalog)

### Tasks
Let us perform few tasks to understand how to write a Data Frame into Metastore tables and also list them.
* Create database by name demo_db in the metastore. We need to use `spark.sql` as there is no function to create database under `spark.catalog`.

In [5]:
import getpass
username = getpass.getuser()

In [6]:
username

'itversity'

In [7]:
spark.sql(f"DROP DATABASE IF EXISTS {username}_demo_db CASCADE")

In [8]:
spark.sql(f"CREATE DATABASE {username}_demo_db")

In [9]:
spark.catalog.setCurrentDatabase(f'{username}_demo_db')

* List the databases using both API as well as SQL approach. As we have too many databases in our environment, it might take too much time to return the results

In [None]:
spark.catalog.listDatabases()

In [10]:
spark.catalog.currentDatabase()

'itversity_demo_db'

* Create a Data Frame which contain one column by name **dummy** and one row with value **X**.

In [11]:
l = [("X", )]
df = spark.createDataFrame(l, schema="dummy STRING")

In [12]:
spark.catalog.listTables()

[]

In [13]:
df.show()

+-----+
|dummy|
+-----+
|    X|
+-----+



* Create a table by name dual for the above Data Frame in the database created. 

In [14]:
df.write.saveAsTable?

[0;31mSignature:[0m
[0mdf[0m[0;34m.[0m[0mwrite[0m[0;34m.[0m[0msaveAsTable[0m[0;34m([0m[0;34m[0m
[0;34m[0m    [0mname[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mformat[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mmode[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mpartitionBy[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0;34m**[0m[0moptions[0m[0;34m,[0m[0;34m[0m
[0;34m[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m
Saves the content of the :class:`DataFrame` as the specified table.

In the case the table already exists, behavior of this function depends on the
save mode, specified by the `mode` function (default to throwing an exception).
When `mode` is `Overwrite`, the schema of the :class:`DataFrame` does not need to be
the same as that of the existing table.

* `append`: Append contents of this :class:`DataFrame` to existing data.
* `overwrite`: Overwrite exi

In [15]:
df.write.saveAsTable("dual", mode='overwrite')

In [16]:
spark.catalog.listTables()

[Table(name='dual', database='itversity_demo_db', description=None, tableType='MANAGED', isTemporary=False)]

In [17]:
spark.read.table("dual").show()

+-----+
|dummy|
+-----+
|    X|
+-----+



In [18]:
spark.sql('SELECT * FROM dual').show()

+-----+
|dummy|
+-----+
|    X|
+-----+



* Create Empty table and insert data into it.

In [19]:
spark.sql("DROP TABLE dual")

In [20]:
spark.catalog.listTables()

[]

In [21]:
df.show()

+-----+
|dummy|
+-----+
|    X|
+-----+



In [22]:
schema = df.schema

In [23]:
schema

StructType(List(StructField(dummy,StringType,true)))

In [24]:
spark.catalog.createTable?

[0;31mSignature:[0m
[0mspark[0m[0;34m.[0m[0mcatalog[0m[0;34m.[0m[0mcreateTable[0m[0;34m([0m[0;34m[0m
[0;34m[0m    [0mtableName[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mpath[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0msource[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mschema[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0;34m**[0m[0moptions[0m[0;34m,[0m[0;34m[0m
[0;34m[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m
Creates a table based on the dataset in a data source.

It returns the DataFrame associated with the table.

The data source is specified by the ``source`` and a set of ``options``.
If ``source`` is not specified, the default data source configured by
``spark.sql.sources.default`` will be used. When ``path`` is specified, an external table is
created from the data at the given path. Otherwise a managed table is created.

Optionally, a schema can 

In [25]:
spark.catalog.createTable('dual', schema=schema)

dummy


In [26]:
spark.catalog.listTables()

[Table(name='dual', database='itversity_demo_db', description=None, tableType='MANAGED', isTemporary=False)]

In [27]:
df.write.insertInto?

[0;31mSignature:[0m [0mdf[0m[0;34m.[0m[0mwrite[0m[0;34m.[0m[0minsertInto[0m[0;34m([0m[0mtableName[0m[0;34m,[0m [0moverwrite[0m[0;34m=[0m[0;32mNone[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m
Inserts the content of the :class:`DataFrame` to the specified table.

It requires that the schema of the :class:`DataFrame` is the same as the
schema of the table.

Optionally overwriting any existing data.

.. versionadded:: 1.4
[0;31mFile:[0m      /opt/spark-3.0.1-bin-hadoop3.2/python/pyspark/sql/readwriter.py
[0;31mType:[0m      method


In [28]:
df.write.insertInto('dual')

In [29]:
spark.read.table("dual").show()

+-----+
|dummy|
+-----+
|    X|
+-----+



In [30]:
spark.sql('SELECT * FROM dual').show()

+-----+
|dummy|
+-----+
|    X|
+-----+



* Let us drop the table **dual** and then database **db**. We need to use `spark.sql` as `spark.catalog` does not have API to drop the tables or databases.

In [31]:
spark.sql("DROP TABLE dual")

In [32]:
spark.sql(f"DROP DATABASE {username}_demo_db")

In [33]:
# We can use CASCADE to drop database along with tables.
spark.sql(f"DROP DATABASE IF EXISTS {username}_demo_db CASCADE")