Inserting into Existing Tables¶
Let us understand how we can insert data into existing tables using insertInto
.
We can use modes such as
append
andoverwrite
withinsertInto
. Default isappend
.When we use
insertInto
, following happens:If the table does not exist,
insertInto
will throw an exception.If the table exists, by default data will be appended.
We can alter the behavior by using keyword argument overwrite. It is by default False, we can pass True to replace existing data.
Let us start spark context for this Notebook so that we can execute the code provided. You can sign up for our 10 node state of the art cluster/labs to learn Spark SQL using our unique integrated LMS.
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession. \
builder. \
config('spark.ui.port', '0'). \
config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
enableHiveSupport(). \
appName(f'{username} | Python - Spark Metastore'). \
master('yarn'). \
getOrCreate()
If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.
Using Spark SQL
spark2-sql \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
Using Scala
spark2-shell \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
Using Pyspark
pyspark2 \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
spark.conf.set('spark.sql.shuffle.partitions', '2')
Tasks¶
Let us perform few tasks to understand how to write a Data Frame into existing tables in the Metastore.
Make sure hr_db database and employees table in hr_db are created.
spark.catalog.setCurrentDatabase(f"{username}_hr_db")
spark.catalog.currentDatabase()
'itversity_hr_db'
spark.catalog.listTables()
[Table(name='employees', database='itversity_hr_db', description=None, tableType='MANAGED', isTemporary=False)]
spark.catalog.listColumns('employees')
[Column(name='employee_id', description=None, dataType='int', nullable=True, isPartition=False, isBucket=False),
Column(name='first_name', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
Column(name='last_name', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
Column(name='salary', description=None, dataType='float', nullable=True, isPartition=False, isBucket=False),
Column(name='nationality', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False)]
Use employees Data Frame and insert data into the employees table in hr_db database. Make sure existing data is overwritten.
employees = [(1, "Scott", "Tiger", 1000.0, "united states"),
(2, "Henry", "Ford", 1250.0, "India"),
(3, "Nick", "Junior", 750.0, "united KINGDOM"),
(4, "Bill", "Gomes", 1500.0, "AUSTRALIA")
]
spark.read.table('employees').schema
StructType(List(StructField(employee_id,IntegerType,true),StructField(first_name,StringType,true),StructField(last_name,StringType,true),StructField(salary,FloatType,true),StructField(nationality,StringType,true)))
employeesDF = spark.createDataFrame(employees,
schema="""employee_id INT, first_name STRING, last_name STRING,
salary FLOAT, nationality STRING
"""
)
employeesDF.show()
+-----------+----------+---------+------+--------------+
|employee_id|first_name|last_name|salary| nationality|
+-----------+----------+---------+------+--------------+
| 1| Scott| Tiger|1000.0| united states|
| 2| Henry| Ford|1250.0| India|
| 3| Nick| Junior| 750.0|united KINGDOM|
| 4| Bill| Gomes|1500.0| AUSTRALIA|
+-----------+----------+---------+------+--------------+
employeesDF.schema
StructType(List(StructField(employee_id,IntegerType,true),StructField(first_name,StringType,true),StructField(last_name,StringType,true),StructField(salary,FloatType,true),StructField(nationality,StringType,true)))
employeesDF.write.insertInto("employees", overwrite=True)
spark.read.table("employees").show()
+-----------+----------+---------+------+--------------+
|employee_id|first_name|last_name|salary| nationality|
+-----------+----------+---------+------+--------------+
| 3| Nick| Junior| 750.0|united KINGDOM|
| 4| Bill| Gomes|1500.0| AUSTRALIA|
| 1| Scott| Tiger|1000.0| united states|
| 2| Henry| Ford|1250.0| India|
+-----------+----------+---------+------+--------------+
spark.sql('SELECT * FROM employees').show()
+-----------+----------+---------+------+--------------+
|employee_id|first_name|last_name|salary| nationality|
+-----------+----------+---------+------+--------------+
| 3| Nick| Junior| 750.0|united KINGDOM|
| 4| Bill| Gomes|1500.0| AUSTRALIA|
| 1| Scott| Tiger|1000.0| united states|
| 2| Henry| Ford|1250.0| India|
+-----------+----------+---------+------+--------------+