Overview of Data Frame APIs

Let us get an overview of Data Frame APIs to process data in Data Frames.

  • Row Level Transformations or Projection of Data can be done using select, selectExpr, withColumn, drop on Data Frame.

  • We typically apply functions from pyspark.sql.functions on columns using select and withColumn

  • Filtering is typically done either by using filter or where on Data Frame.

  • We can pass the condition to filter or where either by using SQL Style or Programming Language Style.

  • Global Aggregations can be performed directly on the Data Frame.

  • By Key or Grouping Aggregations are typically performed using groupBy and then aggregate functions using agg

  • We can sort the data in Data Frame using sort or orderBy

  • We will talk about Window Functions later. We can use use Window Functions for some advanced Aggregations and Ranking.

Let us start spark context for this Notebook so that we can execute the code provided. You can sign up for our 10 node state of the art cluster/labs to learn Spark SQL using our unique integrated LMS.

from pyspark.sql import SparkSession

import getpass
username = getpass.getuser()

spark = SparkSession. \
    builder. \
    config('spark.ui.port', '0'). \
    config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
    enableHiveSupport(). \
    appName(f'{username} | Python - Data Processing - Overview'). \
    master('yarn'). \
    getOrCreate()

If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.

Using Spark SQL

spark2-sql \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse

Using Scala

spark2-shell \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse

Using Pyspark

pyspark2 \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse

Tasks

Let us understand how to project the data using different options such as select, selectExpr, withColumn, drop.

  • Create Dataframe employees using Collection

employees = [(1, "Scott", "Tiger", 1000.0, "united states"),
             (2, "Henry", "Ford", 1250.0, "India"),
             (3, "Nick", "Junior", 750.0, "united KINGDOM"),
             (4, "Bill", "Gomes", 1500.0, "AUSTRALIA")
            ]
type(employees)
employees[0]
type(employees[0])
spark.createDataFrame?
employeesDF = spark. \
    createDataFrame(employees,
                    schema="""employee_id INT, first_name STRING, 
                    last_name STRING, salary FLOAT, nationality STRING"""
                   )
employeesDF.printSchema()
employeesDF.show()
  • Project employee first name and last name.

employeesDF. \
    select("first_name", "last_name"). \
    show()
  • Project all the fields except for Nationality

employeesDF. \
    drop("nationality"). \
    show()
from pyspark.sql.functions import *
employeesDF. \
    withColumn('full_name', concat('first_name', lit(' '), 'last_name')). \
    show()
employeesDF.selectExpr('*', 'concat(first_name, " ", last_name) AS full_name').show()

We will explore most of the APIs to process data in Data Frames as we get into the data processing at a later point in time