Ranking FunctionsΒΆ
We can use ranking functions to assign ranks to a particular record within a partition.
Sparse Rank - rank
Dense Rank - dense_rank
Assigning Row Numbers - row_number
Percentage Rank - percent_rank
Let us start spark context for this Notebook so that we can execute the code provided. You can sign up for our 10 node state of the art cluster/labs to learn Spark SQL using our unique integrated LMS.
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession. \
builder. \
config('spark.ui.port', '0'). \
config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
enableHiveSupport(). \
appName(f'{username} | Python - Windowing Functions'). \
master('yarn'). \
getOrCreate()
If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.
Using Spark SQL
spark2-sql \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
Using Scala
spark2-shell \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
Using Pyspark
pyspark2 \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
spark.conf.set('spark.sql.shuffle.partitions', '2')
Let us assign ranks based up on departure delay from each of the airport.
airtraffic_path = "/public/airtraffic_all/airtraffic-part/flightmonth=200801"
airtraffic = spark. \
read. \
parquet(airtraffic_path)
from pyspark.sql.functions import col, lit, lpad, concat
from pyspark.sql.functions import rank, dense_rank
from pyspark.sql.functions import percent_rank, row_number, round
from pyspark.sql.window import Window
spec = Window. \
partitionBy("FlightDate", "Origin"). \
orderBy(col("DepDelay").desc())
airtraffic. \
filter("IsDepDelayed = 'YES' and Cancelled = 0"). \
select(concat("Year",
lpad("Month", 2, "0"),
lpad("DayOfMonth", 2, "0")
).alias("FlightDate"),
"Origin",
"UniqueCarrier",
"FlightNum",
"CRSDepTime",
"IsDepDelayed",
col("DepDelay").cast("int").alias("DepDelay")
). \
withColumn("srank", rank().over(spec)). \
withColumn("drank", dense_rank().over(spec)). \
withColumn("prank", round(percent_rank().over(spec), 2)). \
withColumn("rn", row_number().over(spec)). \
orderBy("FlightDate", "Origin", col("DepDelay").desc()). \
show()
+----------+------+-------------+---------+----------+------------+--------+-----+-----+-----+---+
|FlightDate|Origin|UniqueCarrier|FlightNum|CRSDepTime|IsDepDelayed|DepDelay|srank|drank|prank| rn|
+----------+------+-------------+---------+----------+------------+--------+-----+-----+-----+---+
| 20080101| ABE| YV| 7138| 1741| YES| 175| 1| 1| 0.0| 1|
| 20080101| ABE| YV| 7263| 1230| YES| 137| 2| 2| 0.14| 2|
| 20080101| ABE| 9E| 2940| 1215| YES| 70| 3| 3| 0.29| 3|
| 20080101| ABE| 9E| 2936| 1615| YES| 34| 4| 4| 0.43| 4|
| 20080101| ABE| XE| 2594| 1740| YES| 34| 4| 4| 0.43| 5|
| 20080101| ABE| XE| 2578| 1410| YES| 22| 6| 5| 0.71| 6|
| 20080101| ABE| OH| 5457| 1720| YES| 14| 7| 6| 0.86| 7|
| 20080101| ABE| OO| 5873| 720| YES| 1| 8| 7| 1.0| 8|
| 20080101| ABI| MQ| 3214| 1735| YES| 3| 1| 1| 0.0| 1|
| 20080101| ABQ| WN| 823| 2045| YES| 218| 1| 1| 0.0| 1|
| 20080101| ABQ| WN| 357| 1525| YES| 171| 2| 2| 0.02| 2|
| 20080101| ABQ| AA| 1814| 925| YES| 100| 3| 3| 0.04| 3|
| 20080101| ABQ| WN| 1178| 1845| YES| 94| 4| 4| 0.06| 4|
| 20080101| ABQ| WN| 2497| 1825| YES| 93| 5| 5| 0.09| 5|
| 20080101| ABQ| WN| 3481| 1500| YES| 78| 6| 6| 0.11| 6|
| 20080101| ABQ| WN| 1328| 1245| YES| 78| 6| 6| 0.11| 7|
| 20080101| ABQ| XE| 221| 1850| YES| 68| 8| 7| 0.15| 8|
| 20080101| ABQ| DL| 1601| 740| YES| 65| 9| 8| 0.17| 9|
| 20080101| ABQ| WN| 45| 1435| YES| 64| 10| 9| 0.19| 10|
| 20080101| ABQ| WN| 2788| 1755| YES| 53| 11| 10| 0.21| 11|
+----------+------+-------------+---------+----------+------------+--------+-----+-----+-----+---+
only showing top 20 rows
Let us assign ranks to each employee with in their respective department based up on their salary.
employeesPath = '/public/hr_db/employees'
employees = spark. \
read. \
format('csv'). \
option('sep', '\t'). \
schema('''employee_id INT,
first_name STRING,
last_name STRING,
email STRING,
phone_number STRING,
hire_date STRING,
job_id STRING,
salary FLOAT,
commission_pct STRING,
manager_id STRING,
department_id STRING
'''). \
load(employeesPath)
employees. \
select('employee_id',
col('department_id').cast('int').alias('department_id'),
'salary'
). \
orderBy('department_id', 'salary'). \
show()
+-----------+-------------+-------+
|employee_id|department_id| salary|
+-----------+-------------+-------+
| 178| null| 7000.0|
| 200| 10| 4400.0|
| 202| 20| 6000.0|
| 201| 20|13000.0|
| 119| 30| 2500.0|
| 118| 30| 2600.0|
| 117| 30| 2800.0|
| 116| 30| 2900.0|
| 115| 30| 3100.0|
| 114| 30|11000.0|
| 203| 40| 6500.0|
| 132| 50| 2100.0|
| 128| 50| 2200.0|
| 136| 50| 2200.0|
| 135| 50| 2400.0|
| 127| 50| 2400.0|
| 131| 50| 2500.0|
| 140| 50| 2500.0|
| 191| 50| 2500.0|
| 144| 50| 2500.0|
+-----------+-------------+-------+
only showing top 20 rows
spec = Window. \
partitionBy('department_id'). \
orderBy(col('salary').desc())
employees. \
select('employee_id',
col('department_id').cast('int').alias('department_id'),
'salary'
). \
withColumn("srank", rank().over(spec)). \
withColumn("drank", dense_rank().over(spec)). \
withColumn("prank", round(percent_rank().over(spec), 2)). \
withColumn("rn", row_number().over(spec)). \
orderBy("department_id", col("salary").desc()). \
show(107)
+-----------+-------------+-------+-----+-----+-----+---+
|employee_id|department_id| salary|srank|drank|prank| rn|
+-----------+-------------+-------+-----+-----+-----+---+
| 178| null| 7000.0| 1| 1| 0.0| 1|
| 200| 10| 4400.0| 1| 1| 0.0| 1|
| 201| 20|13000.0| 1| 1| 0.0| 1|
| 202| 20| 6000.0| 2| 2| 1.0| 2|
| 114| 30|11000.0| 1| 1| 0.0| 1|
| 115| 30| 3100.0| 2| 2| 0.2| 2|
| 116| 30| 2900.0| 3| 3| 0.4| 3|
| 117| 30| 2800.0| 4| 4| 0.6| 4|
| 118| 30| 2600.0| 5| 5| 0.8| 5|
| 119| 30| 2500.0| 6| 6| 1.0| 6|
| 203| 40| 6500.0| 1| 1| 0.0| 1|
| 121| 50| 8200.0| 1| 1| 0.0| 1|
| 120| 50| 8000.0| 2| 2| 0.02| 2|
| 122| 50| 7900.0| 3| 3| 0.05| 3|
| 123| 50| 6500.0| 4| 4| 0.07| 4|
| 124| 50| 5800.0| 5| 5| 0.09| 5|
| 184| 50| 4200.0| 6| 6| 0.11| 6|
| 185| 50| 4100.0| 7| 7| 0.14| 7|
| 192| 50| 4000.0| 8| 8| 0.16| 8|
| 193| 50| 3900.0| 9| 9| 0.18| 9|
| 188| 50| 3800.0| 10| 10| 0.2| 10|
| 189| 50| 3600.0| 11| 11| 0.23| 12|
| 137| 50| 3600.0| 11| 11| 0.23| 11|
| 141| 50| 3500.0| 13| 12| 0.27| 13|
| 186| 50| 3400.0| 14| 13| 0.3| 14|
| 133| 50| 3300.0| 15| 14| 0.32| 16|
| 129| 50| 3300.0| 15| 14| 0.32| 15|
| 138| 50| 3200.0| 17| 15| 0.36| 17|
| 125| 50| 3200.0| 17| 15| 0.36| 18|
| 180| 50| 3200.0| 17| 15| 0.36| 19|
| 194| 50| 3200.0| 17| 15| 0.36| 20|
| 196| 50| 3100.0| 21| 16| 0.45| 23|
| 142| 50| 3100.0| 21| 16| 0.45| 21|
| 181| 50| 3100.0| 21| 16| 0.45| 22|
| 197| 50| 3000.0| 24| 17| 0.52| 25|
| 187| 50| 3000.0| 24| 17| 0.52| 24|
| 134| 50| 2900.0| 26| 18| 0.57| 26|
| 190| 50| 2900.0| 26| 18| 0.57| 27|
| 130| 50| 2800.0| 28| 19| 0.61| 28|
| 195| 50| 2800.0| 28| 19| 0.61| 30|
| 183| 50| 2800.0| 28| 19| 0.61| 29|
| 126| 50| 2700.0| 31| 20| 0.68| 32|
| 139| 50| 2700.0| 31| 20| 0.68| 31|
| 198| 50| 2600.0| 33| 21| 0.73| 34|
| 143| 50| 2600.0| 33| 21| 0.73| 33|
| 199| 50| 2600.0| 33| 21| 0.73| 35|
| 144| 50| 2500.0| 36| 22| 0.8| 38|
| 182| 50| 2500.0| 36| 22| 0.8| 39|
| 191| 50| 2500.0| 36| 22| 0.8| 40|
| 140| 50| 2500.0| 36| 22| 0.8| 37|
| 131| 50| 2500.0| 36| 22| 0.8| 36|
| 127| 50| 2400.0| 41| 23| 0.91| 41|
| 135| 50| 2400.0| 41| 23| 0.91| 42|
| 128| 50| 2200.0| 43| 24| 0.95| 43|
| 136| 50| 2200.0| 43| 24| 0.95| 44|
| 132| 50| 2100.0| 45| 25| 1.0| 45|
| 103| 60| 9000.0| 1| 1| 0.0| 1|
| 104| 60| 6000.0| 2| 2| 0.25| 2|
| 105| 60| 4800.0| 3| 3| 0.5| 3|
| 106| 60| 4800.0| 3| 3| 0.5| 4|
| 107| 60| 4200.0| 5| 4| 1.0| 5|
| 204| 70|10000.0| 1| 1| 0.0| 1|
| 145| 80|14000.0| 1| 1| 0.0| 1|
| 146| 80|13500.0| 2| 2| 0.03| 2|
| 147| 80|12000.0| 3| 3| 0.06| 3|
| 168| 80|11500.0| 4| 4| 0.09| 4|
| 148| 80|11000.0| 5| 5| 0.12| 5|
| 174| 80|11000.0| 5| 5| 0.12| 6|
| 162| 80|10500.0| 7| 6| 0.18| 8|
| 149| 80|10500.0| 7| 6| 0.18| 7|
| 169| 80|10000.0| 9| 7| 0.24| 11|
| 156| 80|10000.0| 9| 7| 0.24| 10|
| 150| 80|10000.0| 9| 7| 0.24| 9|
| 170| 80| 9600.0| 12| 8| 0.33| 12|
| 163| 80| 9500.0| 13| 9| 0.36| 15|
| 157| 80| 9500.0| 13| 9| 0.36| 14|
| 151| 80| 9500.0| 13| 9| 0.36| 13|
| 158| 80| 9000.0| 16| 10| 0.45| 17|
| 152| 80| 9000.0| 16| 10| 0.45| 16|
| 175| 80| 8800.0| 18| 11| 0.52| 18|
| 176| 80| 8600.0| 19| 12| 0.55| 19|
| 177| 80| 8400.0| 20| 13| 0.58| 20|
| 159| 80| 8000.0| 21| 14| 0.61| 22|
| 153| 80| 8000.0| 21| 14| 0.61| 21|
| 160| 80| 7500.0| 23| 15| 0.67| 24|
| 154| 80| 7500.0| 23| 15| 0.67| 23|
| 171| 80| 7400.0| 25| 16| 0.73| 25|
| 172| 80| 7300.0| 26| 17| 0.76| 26|
| 164| 80| 7200.0| 27| 18| 0.79| 27|
| 155| 80| 7000.0| 28| 19| 0.82| 28|
| 161| 80| 7000.0| 28| 19| 0.82| 29|
| 165| 80| 6800.0| 30| 20| 0.88| 30|
| 166| 80| 6400.0| 31| 21| 0.91| 31|
| 167| 80| 6200.0| 32| 22| 0.94| 32|
| 179| 80| 6200.0| 32| 22| 0.94| 33|
| 173| 80| 6100.0| 34| 23| 1.0| 34|
| 100| 90|24000.0| 1| 1| 0.0| 1|
| 101| 90|17000.0| 2| 2| 0.5| 2|
| 102| 90|17000.0| 2| 2| 0.5| 3|
| 108| 100|12000.0| 1| 1| 0.0| 1|
| 109| 100| 9000.0| 2| 2| 0.2| 2|
| 110| 100| 8200.0| 3| 3| 0.4| 3|
| 112| 100| 7800.0| 4| 4| 0.6| 4|
| 111| 100| 7700.0| 5| 5| 0.8| 5|
| 113| 100| 6900.0| 6| 6| 1.0| 6|
| 205| 110|12000.0| 1| 1| 0.0| 1|
| 206| 110| 8300.0| 2| 2| 1.0| 2|
+-----------+-------------+-------+-----+-----+-----+---+