Define Schema for Tables using StructType

When we want to create a table using spark.catalog.createTable or using spark.catalog.createExternalTable, we need to specify Schema.

  • Schema can be inferred or we can pass schema using StructType object while creating the table..

  • StructType takes list of objects of type StructField.

  • StructField is built using column name and data type. All the data types are available under pyspark.sql.types.

  • We need to pass table name and schema for spark.catalog.createTable.

  • We have to pass path along with name and schema for spark.catalog.createExternalTable.

Let us start spark context for this Notebook so that we can execute the code provided. You can sign up for our 10 node state of the art cluster/labs to learn Spark SQL using our unique integrated LMS.

from pyspark.sql import SparkSession

import getpass
username = getpass.getuser()

spark = SparkSession. \
    builder. \
    config('spark.ui.port', '0'). \
    config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
    enableHiveSupport(). \
    appName(f'{username} | Python - Spark Metastore'). \
    master('yarn'). \
    getOrCreate()

If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.

Using Spark SQL

spark2-sql \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse

Using Scala

spark2-shell \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse

Using Pyspark

pyspark2 \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
spark.conf.set('spark.sql.shuffle.partitions', '2')

Tasks

Let us perform tasks to create empty table using spark.catalog.createTable or using spark.catalog.createExternalTable.

  • Create database {username}_hr_db and table employees with following fields. Let us create Database first and then we will see how to create table.

    • employee_id of type Integer

    • first_name of type String

    • last_name of type String

    • salary of type Float

    • nationality of type String

import getpass
username = getpass.getuser()
spark.sql(f"CREATE DATABASE IF NOT EXISTS {username}_hr_db")
spark.catalog.setCurrentDatabase(f"{username}_hr_db")
spark.catalog.currentDatabase()
'itversity_hr_db'
spark.catalog.createTable?
Signature:
spark.catalog.createTable(
    tableName,
    path=None,
    source=None,
    schema=None,
    **options,
)
Docstring:
Creates a table based on the dataset in a data source.

It returns the DataFrame associated with the table.

The data source is specified by the ``source`` and a set of ``options``.
If ``source`` is not specified, the default data source configured by
``spark.sql.sources.default`` will be used. When ``path`` is specified, an external table is
created from the data at the given path. Otherwise a managed table is created.

Optionally, a schema can be provided as the schema of the returned :class:`DataFrame` and
created table.

:return: :class:`DataFrame`

.. versionadded:: 2.2
File:      /opt/spark-2.4.7-bin-hadoop2.7/python/pyspark/sql/catalog.py
Type:      method
  • Build StructType object using StructField list.

from pyspark.sql.types import StructField, StructType, \
    IntegerType, StringType, FloatType
employeesSchema = StructType([
    StructField("employee_id", IntegerType()),
    StructField("first_name", StringType()),
    StructField("last_name", StringType()),
    StructField("salary", FloatType()),
    StructField("nationality", StringType())
])
employeesSchema
StructType(List(StructField(employee_id,IntegerType,true),StructField(first_name,StringType,true),StructField(last_name,StringType,true),StructField(salary,FloatType,true),StructField(nationality,StringType,true)))
help(employeesSchema)
employeesSchema.simpleString()
'struct<employee_id:int,first_name:string,last_name:string,salary:float,nationality:string>'
spark.sql('DROP TABLE IF EXISTS employees')
  • Create table by passing StructType object as schema.

spark.catalog.createTable("employees", schema=employeesSchema)
employee_idfirst_namelast_namesalarynationality
  • List the tables from database created.

spark.catalog.listTables()
[Table(name='employees', database='itversity_hr_db', description=None, tableType='MANAGED', isTemporary=False)]
spark.catalog.listColumns('employees')
[Column(name='employee_id', description=None, dataType='int', nullable=True, isPartition=False, isBucket=False),
 Column(name='first_name', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
 Column(name='last_name', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
 Column(name='salary', description=None, dataType='float', nullable=True, isPartition=False, isBucket=False),
 Column(name='nationality', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False)]
spark.sql('DESCRIBE FORMATTED employees').show(100, truncate=False)
+----------------------------+-----------------------------------------------------------------------------------+-------+
|col_name                    |data_type                                                                          |comment|
+----------------------------+-----------------------------------------------------------------------------------+-------+
|employee_id                 |int                                                                                |null   |
|first_name                  |string                                                                             |null   |
|last_name                   |string                                                                             |null   |
|salary                      |float                                                                              |null   |
|nationality                 |string                                                                             |null   |
|                            |                                                                                   |       |
|# Detailed Table Information|                                                                                   |       |
|Database                    |itversity_hr_db                                                                    |       |
|Table                       |employees                                                                          |       |
|Owner                       |itversity                                                                          |       |
|Created Time                |Fri Mar 12 11:13:36 EST 2021                                                       |       |
|Last Access                 |Wed Dec 31 19:00:00 EST 1969                                                       |       |
|Created By                  |Spark 2.4.7                                                                        |       |
|Type                        |MANAGED                                                                            |       |
|Provider                    |parquet                                                                            |       |
|Table Properties            |[transient_lastDdlTime=1615565616]                                                 |       |
|Location                    |hdfs://m01.itversity.com:9000/user/itversity/warehouse/itversity_hr_db.db/employees|       |
|Serde Library               |org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe                        |       |
|InputFormat                 |org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat                      |       |
|OutputFormat                |org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat                     |       |
|Storage Properties          |[serialization.format=1]                                                           |       |
+----------------------------+-----------------------------------------------------------------------------------+-------+