Inserting into Existing Tables

Let us understand how we can insert data into existing tables using insertInto.

  • We can use modes such as append and overwrite with insertInto. Default is append.

  • When we use insertInto, following happens:

    • If the table does not exist, insertInto will throw an exception.

    • If the table exists, by default data will be appended.

    • We can alter the behavior by using keyword argument overwrite. It is by default False, we can pass True to replace existing data.

Let us start spark context for this Notebook so that we can execute the code provided. You can sign up for our 10 node state of the art cluster/labs to learn Spark SQL using our unique integrated LMS.

from pyspark.sql import SparkSession

import getpass
username = getpass.getuser()

spark = SparkSession. \
    builder. \
    config('spark.ui.port', '0'). \
    config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
    enableHiveSupport(). \
    appName(f'{username} | Python - Spark Metastore'). \
    master('yarn'). \
    getOrCreate()

If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.

Using Spark SQL

spark2-sql \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse

Using Scala

spark2-shell \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse

Using Pyspark

pyspark2 \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
spark.conf.set('spark.sql.shuffle.partitions', '2')

Tasks

Let us perform few tasks to understand how to write a Data Frame into existing tables in the Metastore.

  • Make sure hr_db database and employees table in hr_db are created.

spark.catalog.setCurrentDatabase(f"{username}_hr_db")
spark.catalog.currentDatabase()
'itversity_hr_db'
spark.catalog.listTables()
[Table(name='employees', database='itversity_hr_db', description=None, tableType='MANAGED', isTemporary=False)]
spark.catalog.listColumns('employees')
[Column(name='employee_id', description=None, dataType='int', nullable=True, isPartition=False, isBucket=False),
 Column(name='first_name', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
 Column(name='last_name', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
 Column(name='salary', description=None, dataType='float', nullable=True, isPartition=False, isBucket=False),
 Column(name='nationality', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False)]

Use employees Data Frame and insert data into the employees table in hr_db database. Make sure existing data is overwritten.

employees = [(1, "Scott", "Tiger", 1000.0, "united states"),
             (2, "Henry", "Ford", 1250.0, "India"),
             (3, "Nick", "Junior", 750.0, "united KINGDOM"),
             (4, "Bill", "Gomes", 1500.0, "AUSTRALIA")
            ]
spark.read.table('employees').schema
StructType(List(StructField(employee_id,IntegerType,true),StructField(first_name,StringType,true),StructField(last_name,StringType,true),StructField(salary,FloatType,true),StructField(nationality,StringType,true)))
employeesDF = spark.createDataFrame(employees,
    schema="""employee_id INT, first_name STRING, last_name STRING,
              salary FLOAT, nationality STRING
           """
)
employeesDF.show()
+-----------+----------+---------+------+--------------+
|employee_id|first_name|last_name|salary|   nationality|
+-----------+----------+---------+------+--------------+
|          1|     Scott|    Tiger|1000.0| united states|
|          2|     Henry|     Ford|1250.0|         India|
|          3|      Nick|   Junior| 750.0|united KINGDOM|
|          4|      Bill|    Gomes|1500.0|     AUSTRALIA|
+-----------+----------+---------+------+--------------+
employeesDF.schema
StructType(List(StructField(employee_id,IntegerType,true),StructField(first_name,StringType,true),StructField(last_name,StringType,true),StructField(salary,FloatType,true),StructField(nationality,StringType,true)))
employeesDF.write.insertInto("employees", overwrite=True)
spark.read.table("employees").show()
+-----------+----------+---------+------+--------------+
|employee_id|first_name|last_name|salary|   nationality|
+-----------+----------+---------+------+--------------+
|          3|      Nick|   Junior| 750.0|united KINGDOM|
|          4|      Bill|    Gomes|1500.0|     AUSTRALIA|
|          1|     Scott|    Tiger|1000.0| united states|
|          2|     Henry|     Ford|1250.0|         India|
+-----------+----------+---------+------+--------------+
spark.sql('SELECT * FROM employees').show()
+-----------+----------+---------+------+--------------+
|employee_id|first_name|last_name|salary|   nationality|
+-----------+----------+---------+------+--------------+
|          3|      Nick|   Junior| 750.0|united KINGDOM|
|          4|      Bill|    Gomes|1500.0|     AUSTRALIA|
|          1|     Scott|    Tiger|1000.0| united states|
|          2|     Henry|     Ford|1250.0|         India|
+-----------+----------+---------+------+--------------+