Inferring Schema for Tables¶
When we want to create a table using spark.catalog.createTable or using spark.catalog.createExternalTable, we need to specify Schema.
- Schema can be inferred from the Dataframe and then can be passed using - StructTypeobject while creating the table.
- StructTypetakes list of objects of type- StructField.
- StructFieldis built using column name and data type. All the data types are available under- pyspark.sql.types.
- We need to pass table name and schema for - spark.catalog.createTable.
- We have to pass path along with name and schema for - spark.catalog.createExternalTable.
- We can use source to define file format along with applicable options. For example, if we want to create a table for CSV, then source will be csv and we can pass applicable options for CSV such as sep, header etc. 
Let us start spark context for this Notebook so that we can execute the code provided. You can sign up for our 10 node state of the art cluster/labs to learn Spark SQL using our unique integrated LMS.
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession. \
    builder. \
    config('spark.ui.port', '0'). \
    config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
    enableHiveSupport(). \
    appName(f'{username} | Python - Spark Metastore'). \
    master('yarn'). \
    getOrCreate()
If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.
Using Spark SQL
spark2-sql \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
Using Scala
spark2-shell \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
Using Pyspark
pyspark2 \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
spark.conf.set('spark.sql.shuffle.partitions', '2')
Tasks¶
- Create database by name {username}_airtraffic and create external table for airport-codes.txt. - Data have header 
- Fields in each record are delimited by a tab character. 
- We can pass options such as sep, header, inferSchema etc to define the schema. 
 
spark.catalog.createExternalTable?
Signature:
spark.catalog.createExternalTable(
    tableName,
    path=None,
    source=None,
    schema=None,
    **options,
)
Docstring:
Creates a table based on the dataset in a data source.
It returns the DataFrame associated with the external table.
The data source is specified by the ``source`` and a set of ``options``.
If ``source`` is not specified, the default data source configured by
``spark.sql.sources.default`` will be used.
Optionally, a schema can be provided as the schema of the returned :class:`DataFrame` and
created external table.
:return: :class:`DataFrame`
.. versionadded:: 2.0
File:      /opt/spark-2.4.7-bin-hadoop2.7/python/pyspark/sql/catalog.py
Type:      method
import getpass
username = getpass.getuser()
spark.sql(f"CREATE DATABASE IF NOT EXISTS {username}_airtraffic")
spark.catalog.setCurrentDatabase(f"{username}_airtraffic")
spark.catalog.currentDatabase()
'itversity_airtraffic'
- To create external table, we need to have write permissions over the path which we want to use. 
- As we have only read permissions on /public/airtraffic_all/airport-codes we cannot use that path while creating external table. 
- Let us copy the data to /user/ - whoami/airtraffic_all/airport-codes
%%sh
hdfs dfs -mkdir -p /user/`whoami`/airtraffic_all
hdfs dfs -cp -f /public/airtraffic_all/airport-codes /user/`whoami`/airtraffic_all
hdfs dfs -ls /user/`whoami`/airtraffic_all/airport-codes
Found 1 items
-rw-r--r--   3 itversity itversity      11411 2021-03-12 10:57 /user/itversity/airtraffic_all/airport-codes/airport-codes-na.txt
%%sh
hdfs dfs -tail /user/`whoami`/airtraffic_all/airport-codes/airport-codes-na.txt
Yuma	AZ	USA	YUM	Canada	YZFLa	YWK
import getpass
username = getpass.getuser()
airport_codes_path = f'/user/{username}/airtraffic_all/airport-codes'
spark.sql('DROP TABLE IF EXISTS airport_codes')
spark.catalog. \
    createExternalTable("airport_codes",
                        path=airport_codes_path,
                        source="csv",
                        sep="\t",
                        header="true",
                        inferSchema="true"
                       )
| City | State | Country | IATA | 
|---|---|---|---|
| Abbotsford | BC | Canada | YXX | 
| Aberdeen | SD | USA | ABR | 
| Abilene | TX | USA | ABI | 
| Akron | OH | USA | CAK | 
| Alamosa | CO | USA | ALS | 
| Albany | GA | USA | ABY | 
| Albany | NY | USA | ALB | 
| Albuquerque | NM | USA | ABQ | 
| Alexandria | LA | USA | AEX | 
| Allentown | PA | USA | ABE | 
| Alliance | NE | USA | AIA | 
| Alpena | MI | USA | APN | 
| Altoona | PA | USA | AOO | 
| Amarillo | TX | USA | AMA | 
| Anahim Lake | BC | Canada | YAA | 
| Anchorage | AK | USA | ANC | 
| Appleton | WI | USA | ATW | 
| Arviat | NWT | Canada | YEK | 
| Asheville | NC | USA | AVL | 
| Aspen | CO | USA | ASE | 
spark.catalog.listTables()
[Table(name='airport_codes', database='itversity_airtraffic', description=None, tableType='EXTERNAL', isTemporary=False)]
spark.read.table("airport_codes").show()
+-----------+-----+-------+----+
|       City|State|Country|IATA|
+-----------+-----+-------+----+
| Abbotsford|   BC| Canada| YXX|
|   Aberdeen|   SD|    USA| ABR|
|    Abilene|   TX|    USA| ABI|
|      Akron|   OH|    USA| CAK|
|    Alamosa|   CO|    USA| ALS|
|     Albany|   GA|    USA| ABY|
|     Albany|   NY|    USA| ALB|
|Albuquerque|   NM|    USA| ABQ|
| Alexandria|   LA|    USA| AEX|
|  Allentown|   PA|    USA| ABE|
|   Alliance|   NE|    USA| AIA|
|     Alpena|   MI|    USA| APN|
|    Altoona|   PA|    USA| AOO|
|   Amarillo|   TX|    USA| AMA|
|Anahim Lake|   BC| Canada| YAA|
|  Anchorage|   AK|    USA| ANC|
|   Appleton|   WI|    USA| ATW|
|     Arviat|  NWT| Canada| YEK|
|  Asheville|   NC|    USA| AVL|
|      Aspen|   CO|    USA| ASE|
+-----------+-----+-------+----+
only showing top 20 rows
spark.sql('DESCRIBE FORMATTED airport_codes').show(100, False)
+----------------------------+-----------------------------------------------------------------------+-------+
|col_name                    |data_type                                                              |comment|
+----------------------------+-----------------------------------------------------------------------+-------+
|City                        |string                                                                 |null   |
|State                       |string                                                                 |null   |
|Country                     |string                                                                 |null   |
|IATA                        |string                                                                 |null   |
|                            |                                                                       |       |
|# Detailed Table Information|                                                                       |       |
|Database                    |itversity_airtraffic                                                     |       |
|Table                       |airport_codes                                                          |       |
|Owner                       |itversity                                                              |       |
|Created Time                |Fri Mar 12 11:00:09 EST 2021                                           |       |
|Last Access                 |Wed Dec 31 19:00:00 EST 1969                                           |       |
|Created By                  |Spark 2.4.7                                                            |       |
|Type                        |EXTERNAL                                                               |       |
|Provider                    |csv                                                                    |       |
|Table Properties            |[transient_lastDdlTime=1615564809]                                     |       |
|Location                    |hdfs://m01.itversity.com:9000/user/itversity/airtraffic_all/airport-codes|       |
|Serde Library               |org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe                     |       |
|InputFormat                 |org.apache.hadoop.mapred.SequenceFileInputFormat                       |       |
|OutputFormat                |org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat              |       |
|Storage Properties          |[serialization.format=1, inferSchema=true, sep=	, header=true]         |       |
+----------------------------+-----------------------------------------------------------------------+-------+
spark.catalog.listColumns('airport_codes')
[Column(name='City', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
 Column(name='State', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
 Column(name='Country', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
 Column(name='IATA', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False)]