Using first and last functionsΒΆ
Let us understand the usage of first and last value functions.
Let us start spark context for this Notebook so that we can execute the code provided. You can sign up for our 10 node state of the art cluster/labs to learn Spark SQL using our unique integrated LMS.
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession. \
builder. \
config('spark.ui.port', '0'). \
config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
enableHiveSupport(). \
appName(f'{username} | Python - Windowing Functions'). \
master('yarn'). \
getOrCreate()
If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.
Using Spark SQL
spark2-sql \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
Using Scala
spark2-shell \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
Using Pyspark
pyspark2 \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
spark.conf.set('spark.sql.shuffle.partitions', '2')
Let us get highest paid employee and least paid employee with in each department for each employee using employees data set.
You can also use max to get max salary for each department, but you cannot get other attributes related to ma salary such as employee id, name etc. With first or last, you can get other details as well.
employeesPath = '/public/hr_db/employees'
employees = spark. \
read. \
format('csv'). \
option('sep', '\t'). \
schema('''employee_id INT,
first_name STRING,
last_name STRING,
email STRING,
phone_number STRING,
hire_date STRING,
job_id STRING,
salary FLOAT,
commission_pct STRING,
manager_id STRING,
department_id STRING
'''). \
load(employeesPath)
from pyspark.sql.functions import col
employees. \
select('employee_id',
col('department_id').cast('int').alias('department_id'),
'salary'
). \
orderBy('department_id', 'salary'). \
show()
+-----------+-------------+-------+
|employee_id|department_id| salary|
+-----------+-------------+-------+
| 178| null| 7000.0|
| 200| 10| 4400.0|
| 202| 20| 6000.0|
| 201| 20|13000.0|
| 119| 30| 2500.0|
| 118| 30| 2600.0|
| 117| 30| 2800.0|
| 116| 30| 2900.0|
| 115| 30| 3100.0|
| 114| 30|11000.0|
| 203| 40| 6500.0|
| 132| 50| 2100.0|
| 136| 50| 2200.0|
| 128| 50| 2200.0|
| 127| 50| 2400.0|
| 135| 50| 2400.0|
| 140| 50| 2500.0|
| 144| 50| 2500.0|
| 191| 50| 2500.0|
| 182| 50| 2500.0|
+-----------+-------------+-------+
only showing top 20 rows
from pyspark.sql.window import Window
spec = Window. \
partitionBy('department_id'). \
orderBy(col('salary').desc())
from pyspark.sql.functions import first
employees. \
select('employee_id',
col('department_id').cast('int').alias('department_id'),
'salary'
). \
withColumn("highest_salary", first('salary').over(spec)). \
withColumn("highest_employee_id", first('employee_id').over(spec)). \
orderBy("department_id", col("salary").desc()). \
show()
+-----------+-------------+-------+--------------+-------------------+
|employee_id|department_id| salary|highest_salary|highest_employee_id|
+-----------+-------------+-------+--------------+-------------------+
| 178| null| 7000.0| 7000.0| 178|
| 200| 10| 4400.0| 4400.0| 200|
| 201| 20|13000.0| 13000.0| 201|
| 202| 20| 6000.0| 13000.0| 201|
| 114| 30|11000.0| 11000.0| 114|
| 115| 30| 3100.0| 11000.0| 114|
| 116| 30| 2900.0| 11000.0| 114|
| 117| 30| 2800.0| 11000.0| 114|
| 118| 30| 2600.0| 11000.0| 114|
| 119| 30| 2500.0| 11000.0| 114|
| 203| 40| 6500.0| 6500.0| 203|
| 121| 50| 8200.0| 8200.0| 121|
| 120| 50| 8000.0| 8200.0| 121|
| 122| 50| 7900.0| 8200.0| 121|
| 123| 50| 6500.0| 8200.0| 121|
| 124| 50| 5800.0| 8200.0| 121|
| 184| 50| 4200.0| 8200.0| 121|
| 185| 50| 4100.0| 8200.0| 121|
| 192| 50| 4000.0| 8200.0| 121|
| 193| 50| 3900.0| 8200.0| 121|
+-----------+-------------+-------+--------------+-------------------+
only showing top 20 rows
The default functionality of last function is to use the rows between unbounded preceding to current row. We need to change the rows between to unbounded preceding to unbounded following.
from pyspark.sql.functions import last
spec = Window. \
partitionBy('department_id'). \
orderBy(col('salary').desc())
employees. \
select('employee_id',
col('department_id').cast('int').alias('department_id'),
'salary'
). \
withColumn("highest_salary", last('salary').over(spec)). \
withColumn("highest_employee_id", last('employee_id').over(spec)). \
orderBy("department_id", col("salary").desc()). \
show()
+-----------+-------------+-------+--------------+-------------------+
|employee_id|department_id| salary|highest_salary|highest_employee_id|
+-----------+-------------+-------+--------------+-------------------+
| 178| null| 7000.0| 7000.0| 178|
| 200| 10| 4400.0| 4400.0| 200|
| 201| 20|13000.0| 13000.0| 201|
| 202| 20| 6000.0| 6000.0| 202|
| 114| 30|11000.0| 11000.0| 114|
| 115| 30| 3100.0| 3100.0| 115|
| 116| 30| 2900.0| 2900.0| 116|
| 117| 30| 2800.0| 2800.0| 117|
| 118| 30| 2600.0| 2600.0| 118|
| 119| 30| 2500.0| 2500.0| 119|
| 203| 40| 6500.0| 6500.0| 203|
| 121| 50| 8200.0| 8200.0| 121|
| 120| 50| 8000.0| 8000.0| 120|
| 122| 50| 7900.0| 7900.0| 122|
| 123| 50| 6500.0| 6500.0| 123|
| 124| 50| 5800.0| 5800.0| 124|
| 184| 50| 4200.0| 4200.0| 184|
| 185| 50| 4100.0| 4100.0| 185|
| 192| 50| 4000.0| 4000.0| 192|
| 193| 50| 3900.0| 3900.0| 193|
+-----------+-------------+-------+--------------+-------------------+
only showing top 20 rows
help(spec)
Help on WindowSpec in module pyspark.sql.window object:
class WindowSpec(builtins.object)
| A window specification that defines the partitioning, ordering,
| and frame boundaries.
|
| Use the static methods in :class:`Window` to create a :class:`WindowSpec`.
|
| .. note:: Experimental
|
| .. versionadded:: 1.4
|
| Methods defined here:
|
| __init__(self, jspec)
| Initialize self. See help(type(self)) for accurate signature.
|
| orderBy(self, *cols)
| Defines the ordering columns in a :class:`WindowSpec`.
|
| :param cols: names of columns or expressions
|
| .. versionadded:: 1.4
|
| partitionBy(self, *cols)
| Defines the partitioning columns in a :class:`WindowSpec`.
|
| :param cols: names of columns or expressions
|
| .. versionadded:: 1.4
|
| rangeBetween(self, start, end)
| Defines the frame boundaries, from `start` (inclusive) to `end` (inclusive).
|
| Both `start` and `end` are relative from the current row. For example,
| "0" means "current row", while "-1" means one off before the current row,
| and "5" means the five off after the current row.
|
| We recommend users use ``Window.unboundedPreceding``, ``Window.unboundedFollowing``,
| and ``Window.currentRow`` to specify special boundary values, rather than using integral
| values directly.
|
| :param start: boundary start, inclusive.
| The frame is unbounded if this is ``Window.unboundedPreceding``, or
| any value less than or equal to max(-sys.maxsize, -9223372036854775808).
| :param end: boundary end, inclusive.
| The frame is unbounded if this is ``Window.unboundedFollowing``, or
| any value greater than or equal to min(sys.maxsize, 9223372036854775807).
|
| .. versionadded:: 1.4
|
| rowsBetween(self, start, end)
| Defines the frame boundaries, from `start` (inclusive) to `end` (inclusive).
|
| Both `start` and `end` are relative positions from the current row.
| For example, "0" means "current row", while "-1" means the row before
| the current row, and "5" means the fifth row after the current row.
|
| We recommend users use ``Window.unboundedPreceding``, ``Window.unboundedFollowing``,
| and ``Window.currentRow`` to specify special boundary values, rather than using integral
| values directly.
|
| :param start: boundary start, inclusive.
| The frame is unbounded if this is ``Window.unboundedPreceding``, or
| any value less than or equal to max(-sys.maxsize, -9223372036854775808).
| :param end: boundary end, inclusive.
| The frame is unbounded if this is ``Window.unboundedFollowing``, or
| any value greater than or equal to min(sys.maxsize, 9223372036854775807).
|
| .. versionadded:: 1.4
|
| ----------------------------------------------------------------------
| Data descriptors defined here:
|
| __dict__
| dictionary for instance variables (if defined)
|
| __weakref__
| list of weak references to the object (if defined)
spec = Window. \
partitionBy('department_id'). \
orderBy(col('salary')). \
rowsBetween(Window.unboundedPreceding, Window.currentRow)
employees. \
select('employee_id',
col('department_id').cast('int').alias('department_id'),
'salary'
). \
withColumn("highest_salary", last('salary').over(spec)). \
withColumn("highest_employee_id", last('employee_id').over(spec)). \
orderBy("department_id", col("salary").desc()). \
show()
+-----------+-------------+-------+--------------+-------------------+
|employee_id|department_id| salary|highest_salary|highest_employee_id|
+-----------+-------------+-------+--------------+-------------------+
| 178| null| 7000.0| 7000.0| 178|
| 200| 10| 4400.0| 4400.0| 200|
| 201| 20|13000.0| 13000.0| 201|
| 202| 20| 6000.0| 6000.0| 202|
| 114| 30|11000.0| 11000.0| 114|
| 115| 30| 3100.0| 3100.0| 115|
| 116| 30| 2900.0| 2900.0| 116|
| 117| 30| 2800.0| 2800.0| 117|
| 118| 30| 2600.0| 2600.0| 118|
| 119| 30| 2500.0| 2500.0| 119|
| 203| 40| 6500.0| 6500.0| 203|
| 121| 50| 8200.0| 8200.0| 121|
| 120| 50| 8000.0| 8000.0| 120|
| 122| 50| 7900.0| 7900.0| 122|
| 123| 50| 6500.0| 6500.0| 123|
| 124| 50| 5800.0| 5800.0| 124|
| 184| 50| 4200.0| 4200.0| 184|
| 185| 50| 4100.0| 4100.0| 185|
| 192| 50| 4000.0| 4000.0| 192|
| 193| 50| 3900.0| 3900.0| 193|
+-----------+-------------+-------+--------------+-------------------+
only showing top 20 rows