Inferring Schema for Tables¶
When we want to create a table using spark.catalog.createTable
or using spark.catalog.createExternalTable
, we need to specify Schema.
Schema can be inferred from the Dataframe and then can be passed using
StructType
object while creating the table.StructType
takes list of objects of typeStructField
.StructField
is built using column name and data type. All the data types are available underpyspark.sql.types
.We need to pass table name and schema for
spark.catalog.createTable
.We have to pass path along with name and schema for
spark.catalog.createExternalTable
.We can use source to define file format along with applicable options. For example, if we want to create a table for CSV, then source will be csv and we can pass applicable options for CSV such as sep, header etc.
Let us start spark context for this Notebook so that we can execute the code provided. You can sign up for our 10 node state of the art cluster/labs to learn Spark SQL using our unique integrated LMS.
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession. \
builder. \
config('spark.ui.port', '0'). \
config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
enableHiveSupport(). \
appName(f'{username} | Python - Spark Metastore'). \
master('yarn'). \
getOrCreate()
If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.
Using Spark SQL
spark2-sql \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
Using Scala
spark2-shell \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
Using Pyspark
pyspark2 \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
spark.conf.set('spark.sql.shuffle.partitions', '2')
Tasks¶
Create database by name {username}_airtraffic and create external table for airport-codes.txt.
Data have header
Fields in each record are delimited by a tab character.
We can pass options such as sep, header, inferSchema etc to define the schema.
spark.catalog.createExternalTable?
Signature:
spark.catalog.createExternalTable(
tableName,
path=None,
source=None,
schema=None,
**options,
)
Docstring:
Creates a table based on the dataset in a data source.
It returns the DataFrame associated with the external table.
The data source is specified by the ``source`` and a set of ``options``.
If ``source`` is not specified, the default data source configured by
``spark.sql.sources.default`` will be used.
Optionally, a schema can be provided as the schema of the returned :class:`DataFrame` and
created external table.
:return: :class:`DataFrame`
.. versionadded:: 2.0
File: /opt/spark-2.4.7-bin-hadoop2.7/python/pyspark/sql/catalog.py
Type: method
import getpass
username = getpass.getuser()
spark.sql(f"CREATE DATABASE IF NOT EXISTS {username}_airtraffic")
spark.catalog.setCurrentDatabase(f"{username}_airtraffic")
spark.catalog.currentDatabase()
'itversity_airtraffic'
To create external table, we need to have write permissions over the path which we want to use.
As we have only read permissions on /public/airtraffic_all/airport-codes we cannot use that path while creating external table.
Let us copy the data to /user/
whoami
/airtraffic_all/airport-codes
%%sh
hdfs dfs -mkdir -p /user/`whoami`/airtraffic_all
hdfs dfs -cp -f /public/airtraffic_all/airport-codes /user/`whoami`/airtraffic_all
hdfs dfs -ls /user/`whoami`/airtraffic_all/airport-codes
Found 1 items
-rw-r--r-- 3 itversity itversity 11411 2021-03-12 10:57 /user/itversity/airtraffic_all/airport-codes/airport-codes-na.txt
%%sh
hdfs dfs -tail /user/`whoami`/airtraffic_all/airport-codes/airport-codes-na.txt
Yuma AZ USA YUM Canada YZFLa YWK
import getpass
username = getpass.getuser()
airport_codes_path = f'/user/{username}/airtraffic_all/airport-codes'
spark.sql('DROP TABLE IF EXISTS airport_codes')
spark.catalog. \
createExternalTable("airport_codes",
path=airport_codes_path,
source="csv",
sep="\t",
header="true",
inferSchema="true"
)
City | State | Country | IATA |
---|---|---|---|
Abbotsford | BC | Canada | YXX |
Aberdeen | SD | USA | ABR |
Abilene | TX | USA | ABI |
Akron | OH | USA | CAK |
Alamosa | CO | USA | ALS |
Albany | GA | USA | ABY |
Albany | NY | USA | ALB |
Albuquerque | NM | USA | ABQ |
Alexandria | LA | USA | AEX |
Allentown | PA | USA | ABE |
Alliance | NE | USA | AIA |
Alpena | MI | USA | APN |
Altoona | PA | USA | AOO |
Amarillo | TX | USA | AMA |
Anahim Lake | BC | Canada | YAA |
Anchorage | AK | USA | ANC |
Appleton | WI | USA | ATW |
Arviat | NWT | Canada | YEK |
Asheville | NC | USA | AVL |
Aspen | CO | USA | ASE |
spark.catalog.listTables()
[Table(name='airport_codes', database='itversity_airtraffic', description=None, tableType='EXTERNAL', isTemporary=False)]
spark.read.table("airport_codes").show()
+-----------+-----+-------+----+
| City|State|Country|IATA|
+-----------+-----+-------+----+
| Abbotsford| BC| Canada| YXX|
| Aberdeen| SD| USA| ABR|
| Abilene| TX| USA| ABI|
| Akron| OH| USA| CAK|
| Alamosa| CO| USA| ALS|
| Albany| GA| USA| ABY|
| Albany| NY| USA| ALB|
|Albuquerque| NM| USA| ABQ|
| Alexandria| LA| USA| AEX|
| Allentown| PA| USA| ABE|
| Alliance| NE| USA| AIA|
| Alpena| MI| USA| APN|
| Altoona| PA| USA| AOO|
| Amarillo| TX| USA| AMA|
|Anahim Lake| BC| Canada| YAA|
| Anchorage| AK| USA| ANC|
| Appleton| WI| USA| ATW|
| Arviat| NWT| Canada| YEK|
| Asheville| NC| USA| AVL|
| Aspen| CO| USA| ASE|
+-----------+-----+-------+----+
only showing top 20 rows
spark.sql('DESCRIBE FORMATTED airport_codes').show(100, False)
+----------------------------+-----------------------------------------------------------------------+-------+
|col_name |data_type |comment|
+----------------------------+-----------------------------------------------------------------------+-------+
|City |string |null |
|State |string |null |
|Country |string |null |
|IATA |string |null |
| | | |
|# Detailed Table Information| | |
|Database |itversity_airtraffic | |
|Table |airport_codes | |
|Owner |itversity | |
|Created Time |Fri Mar 12 11:00:09 EST 2021 | |
|Last Access |Wed Dec 31 19:00:00 EST 1969 | |
|Created By |Spark 2.4.7 | |
|Type |EXTERNAL | |
|Provider |csv | |
|Table Properties |[transient_lastDdlTime=1615564809] | |
|Location |hdfs://m01.itversity.com:9000/user/itversity/airtraffic_all/airport-codes| |
|Serde Library |org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe | |
|InputFormat |org.apache.hadoop.mapred.SequenceFileInputFormat | |
|OutputFormat |org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat | |
|Storage Properties |[serialization.format=1, inferSchema=true, sep= , header=true] | |
+----------------------------+-----------------------------------------------------------------------+-------+
spark.catalog.listColumns('airport_codes')
[Column(name='City', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
Column(name='State', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
Column(name='Country', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
Column(name='IATA', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False)]