Define Schema for Tables using StructType¶
When we want to create a table using spark.catalog.createTable
or using spark.catalog.createExternalTable
, we need to specify Schema.
Schema can be inferred or we can pass schema using
StructType
object while creating the table..StructType
takes list of objects of typeStructField
.StructField
is built using column name and data type. All the data types are available underpyspark.sql.types
.We need to pass table name and schema for
spark.catalog.createTable
.We have to pass path along with name and schema for
spark.catalog.createExternalTable
.
Let us start spark context for this Notebook so that we can execute the code provided. You can sign up for our 10 node state of the art cluster/labs to learn Spark SQL using our unique integrated LMS.
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession. \
builder. \
config('spark.ui.port', '0'). \
config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
enableHiveSupport(). \
appName(f'{username} | Python - Spark Metastore'). \
master('yarn'). \
getOrCreate()
If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.
Using Spark SQL
spark2-sql \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
Using Scala
spark2-shell \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
Using Pyspark
pyspark2 \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
spark.conf.set('spark.sql.shuffle.partitions', '2')
Tasks¶
Let us perform tasks to create empty table using spark.catalog.createTable
or using spark.catalog.createExternalTable
.
Create database {username}_hr_db and table employees with following fields. Let us create Database first and then we will see how to create table.
employee_id of type Integer
first_name of type String
last_name of type String
salary of type Float
nationality of type String
import getpass
username = getpass.getuser()
spark.sql(f"CREATE DATABASE IF NOT EXISTS {username}_hr_db")
spark.catalog.setCurrentDatabase(f"{username}_hr_db")
spark.catalog.currentDatabase()
'itversity_hr_db'
spark.catalog.createTable?
Signature:
spark.catalog.createTable(
tableName,
path=None,
source=None,
schema=None,
**options,
)
Docstring:
Creates a table based on the dataset in a data source.
It returns the DataFrame associated with the table.
The data source is specified by the ``source`` and a set of ``options``.
If ``source`` is not specified, the default data source configured by
``spark.sql.sources.default`` will be used. When ``path`` is specified, an external table is
created from the data at the given path. Otherwise a managed table is created.
Optionally, a schema can be provided as the schema of the returned :class:`DataFrame` and
created table.
:return: :class:`DataFrame`
.. versionadded:: 2.2
File: /opt/spark-2.4.7-bin-hadoop2.7/python/pyspark/sql/catalog.py
Type: method
Build StructType object using StructField list.
from pyspark.sql.types import StructField, StructType, \
IntegerType, StringType, FloatType
employeesSchema = StructType([
StructField("employee_id", IntegerType()),
StructField("first_name", StringType()),
StructField("last_name", StringType()),
StructField("salary", FloatType()),
StructField("nationality", StringType())
])
employeesSchema
StructType(List(StructField(employee_id,IntegerType,true),StructField(first_name,StringType,true),StructField(last_name,StringType,true),StructField(salary,FloatType,true),StructField(nationality,StringType,true)))
help(employeesSchema)
employeesSchema.simpleString()
'struct<employee_id:int,first_name:string,last_name:string,salary:float,nationality:string>'
spark.sql('DROP TABLE IF EXISTS employees')
Create table by passing StructType object as schema.
spark.catalog.createTable("employees", schema=employeesSchema)
employee_id | first_name | last_name | salary | nationality |
---|
List the tables from database created.
spark.catalog.listTables()
[Table(name='employees', database='itversity_hr_db', description=None, tableType='MANAGED', isTemporary=False)]
spark.catalog.listColumns('employees')
[Column(name='employee_id', description=None, dataType='int', nullable=True, isPartition=False, isBucket=False),
Column(name='first_name', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
Column(name='last_name', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
Column(name='salary', description=None, dataType='float', nullable=True, isPartition=False, isBucket=False),
Column(name='nationality', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False)]
spark.sql('DESCRIBE FORMATTED employees').show(100, truncate=False)
+----------------------------+-----------------------------------------------------------------------------------+-------+
|col_name |data_type |comment|
+----------------------------+-----------------------------------------------------------------------------------+-------+
|employee_id |int |null |
|first_name |string |null |
|last_name |string |null |
|salary |float |null |
|nationality |string |null |
| | | |
|# Detailed Table Information| | |
|Database |itversity_hr_db | |
|Table |employees | |
|Owner |itversity | |
|Created Time |Fri Mar 12 11:13:36 EST 2021 | |
|Last Access |Wed Dec 31 19:00:00 EST 1969 | |
|Created By |Spark 2.4.7 | |
|Type |MANAGED | |
|Provider |parquet | |
|Table Properties |[transient_lastDdlTime=1615565616] | |
|Location |hdfs://m01.itversity.com:9000/user/itversity/warehouse/itversity_hr_db.db/employees| |
|Serde Library |org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe | |
|InputFormat |org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat | |
|OutputFormat |org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat | |
|Storage Properties |[serialization.format=1] | |
+----------------------------+-----------------------------------------------------------------------------------+-------+