Overview of Data Frame APIs¶
Let us get an overview of Data Frame APIs to process data in Data Frames.
Row Level Transformations or Projection of Data can be done using
select
,selectExpr
,withColumn
,drop
on Data Frame.We typically apply functions from
pyspark.sql.functions
on columns usingselect
andwithColumn
Filtering is typically done either by using
filter
orwhere
on Data Frame.We can pass the condition to
filter
orwhere
either by using SQL Style or Programming Language Style.Global Aggregations can be performed directly on the Data Frame.
By Key or Grouping Aggregations are typically performed using
groupBy
and then aggregate functions usingagg
We can sort the data in Data Frame using
sort
ororderBy
We will talk about Window Functions later. We can use use Window Functions for some advanced Aggregations and Ranking.
Let us start spark context for this Notebook so that we can execute the code provided. You can sign up for our 10 node state of the art cluster/labs to learn Spark SQL using our unique integrated LMS.
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession. \
builder. \
config('spark.ui.port', '0'). \
config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
enableHiveSupport(). \
appName(f'{username} | Python - Data Processing - Overview'). \
master('yarn'). \
getOrCreate()
If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.
Using Spark SQL
spark2-sql \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
Using Scala
spark2-shell \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
Using Pyspark
pyspark2 \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
Tasks¶
Let us understand how to project the data using different options such as select
, selectExpr
, withColumn
, drop.
Create Dataframe employees using Collection
employees = [(1, "Scott", "Tiger", 1000.0, "united states"),
(2, "Henry", "Ford", 1250.0, "India"),
(3, "Nick", "Junior", 750.0, "united KINGDOM"),
(4, "Bill", "Gomes", 1500.0, "AUSTRALIA")
]
type(employees)
employees[0]
type(employees[0])
spark.createDataFrame?
employeesDF = spark. \
createDataFrame(employees,
schema="""employee_id INT, first_name STRING,
last_name STRING, salary FLOAT, nationality STRING"""
)
employeesDF.printSchema()
employeesDF.show()
Project employee first name and last name.
employeesDF. \
select("first_name", "last_name"). \
show()
Project all the fields except for Nationality
employeesDF. \
drop("nationality"). \
show()
from pyspark.sql.functions import *
employeesDF. \
withColumn('full_name', concat('first_name', lit(' '), 'last_name')). \
show()
employeesDF.selectExpr('*', 'concat(first_name, " ", last_name) AS full_name').show()
We will explore most of the APIs to process data in Data Frames as we get into the data processing at a later point in time