Inferring Schema for Tables

When we want to create a table using spark.catalog.createTable or using spark.catalog.createExternalTable, we need to specify Schema.

  • Schema can be inferred from the Dataframe and then can be passed using StructType object while creating the table.

  • StructType takes list of objects of type StructField.

  • StructField is built using column name and data type. All the data types are available under pyspark.sql.types.

  • We need to pass table name and schema for spark.catalog.createTable.

  • We have to pass path along with name and schema for spark.catalog.createExternalTable.

  • We can use source to define file format along with applicable options. For example, if we want to create a table for CSV, then source will be csv and we can pass applicable options for CSV such as sep, header etc.

Let us start spark context for this Notebook so that we can execute the code provided. You can sign up for our 10 node state of the art cluster/labs to learn Spark SQL using our unique integrated LMS.

from pyspark.sql import SparkSession

import getpass
username = getpass.getuser()

spark = SparkSession. \
    builder. \
    config('spark.ui.port', '0'). \
    config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
    enableHiveSupport(). \
    appName(f'{username} | Python - Spark Metastore'). \
    master('yarn'). \
    getOrCreate()

If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.

Using Spark SQL

spark2-sql \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse

Using Scala

spark2-shell \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse

Using Pyspark

pyspark2 \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
spark.conf.set('spark.sql.shuffle.partitions', '2')

Tasks

  • Create database by name {username}_airtraffic and create external table for airport-codes.txt.

    • Data have header

    • Fields in each record are delimited by a tab character.

    • We can pass options such as sep, header, inferSchema etc to define the schema.

spark.catalog.createExternalTable?
Signature:
spark.catalog.createExternalTable(
    tableName,
    path=None,
    source=None,
    schema=None,
    **options,
)
Docstring:
Creates a table based on the dataset in a data source.

It returns the DataFrame associated with the external table.

The data source is specified by the ``source`` and a set of ``options``.
If ``source`` is not specified, the default data source configured by
``spark.sql.sources.default`` will be used.

Optionally, a schema can be provided as the schema of the returned :class:`DataFrame` and
created external table.

:return: :class:`DataFrame`

.. versionadded:: 2.0
File:      /opt/spark-2.4.7-bin-hadoop2.7/python/pyspark/sql/catalog.py
Type:      method
import getpass
username = getpass.getuser()
spark.sql(f"CREATE DATABASE IF NOT EXISTS {username}_airtraffic")
spark.catalog.setCurrentDatabase(f"{username}_airtraffic")
spark.catalog.currentDatabase()
'itversity_airtraffic'
  • To create external table, we need to have write permissions over the path which we want to use.

  • As we have only read permissions on /public/airtraffic_all/airport-codes we cannot use that path while creating external table.

  • Let us copy the data to /user/whoami/airtraffic_all/airport-codes

%%sh

hdfs dfs -mkdir -p /user/`whoami`/airtraffic_all
hdfs dfs -cp -f /public/airtraffic_all/airport-codes /user/`whoami`/airtraffic_all
hdfs dfs -ls /user/`whoami`/airtraffic_all/airport-codes
Found 1 items
-rw-r--r--   3 itversity itversity      11411 2021-03-12 10:57 /user/itversity/airtraffic_all/airport-codes/airport-codes-na.txt
%%sh

hdfs dfs -tail /user/`whoami`/airtraffic_all/airport-codes/airport-codes-na.txt
Yuma	AZ	USA	YUM	Canada	YZFLa	YWK
import getpass
username = getpass.getuser()

airport_codes_path = f'/user/{username}/airtraffic_all/airport-codes'
spark.sql('DROP TABLE IF EXISTS airport_codes')
spark.catalog. \
    createExternalTable("airport_codes",
                        path=airport_codes_path,
                        source="csv",
                        sep="\t",
                        header="true",
                        inferSchema="true"
                       )
CityStateCountryIATA
AbbotsfordBCCanadaYXX
AberdeenSDUSAABR
AbileneTXUSAABI
AkronOHUSACAK
AlamosaCOUSAALS
AlbanyGAUSAABY
AlbanyNYUSAALB
AlbuquerqueNMUSAABQ
AlexandriaLAUSAAEX
AllentownPAUSAABE
AllianceNEUSAAIA
AlpenaMIUSAAPN
AltoonaPAUSAAOO
AmarilloTXUSAAMA
Anahim LakeBCCanadaYAA
AnchorageAKUSAANC
AppletonWIUSAATW
ArviatNWTCanadaYEK
AshevilleNCUSAAVL
AspenCOUSAASE
only showing top 20 rows
spark.catalog.listTables()
[Table(name='airport_codes', database='itversity_airtraffic', description=None, tableType='EXTERNAL', isTemporary=False)]
spark.read.table("airport_codes").show()
+-----------+-----+-------+----+
|       City|State|Country|IATA|
+-----------+-----+-------+----+
| Abbotsford|   BC| Canada| YXX|
|   Aberdeen|   SD|    USA| ABR|
|    Abilene|   TX|    USA| ABI|
|      Akron|   OH|    USA| CAK|
|    Alamosa|   CO|    USA| ALS|
|     Albany|   GA|    USA| ABY|
|     Albany|   NY|    USA| ALB|
|Albuquerque|   NM|    USA| ABQ|
| Alexandria|   LA|    USA| AEX|
|  Allentown|   PA|    USA| ABE|
|   Alliance|   NE|    USA| AIA|
|     Alpena|   MI|    USA| APN|
|    Altoona|   PA|    USA| AOO|
|   Amarillo|   TX|    USA| AMA|
|Anahim Lake|   BC| Canada| YAA|
|  Anchorage|   AK|    USA| ANC|
|   Appleton|   WI|    USA| ATW|
|     Arviat|  NWT| Canada| YEK|
|  Asheville|   NC|    USA| AVL|
|      Aspen|   CO|    USA| ASE|
+-----------+-----+-------+----+
only showing top 20 rows
spark.sql('DESCRIBE FORMATTED airport_codes').show(100, False)
+----------------------------+-----------------------------------------------------------------------+-------+
|col_name                    |data_type                                                              |comment|
+----------------------------+-----------------------------------------------------------------------+-------+
|City                        |string                                                                 |null   |
|State                       |string                                                                 |null   |
|Country                     |string                                                                 |null   |
|IATA                        |string                                                                 |null   |
|                            |                                                                       |       |
|# Detailed Table Information|                                                                       |       |
|Database                    |itversity_airtraffic                                                     |       |
|Table                       |airport_codes                                                          |       |
|Owner                       |itversity                                                              |       |
|Created Time                |Fri Mar 12 11:00:09 EST 2021                                           |       |
|Last Access                 |Wed Dec 31 19:00:00 EST 1969                                           |       |
|Created By                  |Spark 2.4.7                                                            |       |
|Type                        |EXTERNAL                                                               |       |
|Provider                    |csv                                                                    |       |
|Table Properties            |[transient_lastDdlTime=1615564809]                                     |       |
|Location                    |hdfs://m01.itversity.com:9000/user/itversity/airtraffic_all/airport-codes|       |
|Serde Library               |org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe                     |       |
|InputFormat                 |org.apache.hadoop.mapred.SequenceFileInputFormat                       |       |
|OutputFormat                |org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat              |       |
|Storage Properties          |[serialization.format=1, inferSchema=true, sep=	, header=true]         |       |
+----------------------------+-----------------------------------------------------------------------+-------+
spark.catalog.listColumns('airport_codes')
[Column(name='City', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
 Column(name='State', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
 Column(name='Country', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
 Column(name='IATA', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False)]