## Ranking Functions

We can use ranking functions to assign ranks to a particular record within a partition.

* Sparse Rank - rank
* Dense Rank - dense_rank
* Assigning Row Numbers - row_number
* Percentage Rank - percent_rank

Let us start spark context for this Notebook so that we can execute the code provided. You can sign up for our [10 node state of the art cluster/labs](https://labs.itversity.com/plans) to learn Spark SQL using our unique integrated LMS.

In [1]:
from pyspark.sql import SparkSession

import getpass
username = getpass.getuser()

spark = SparkSession. \
    builder. \
    config('spark.ui.port', '0'). \
    config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
    enableHiveSupport(). \
    appName(f'{username} | Python - Windowing Functions'). \
    master('yarn'). \
    getOrCreate()

If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.

**Using Spark SQL**

```
spark2-sql \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
```

**Using Scala**

```
spark2-shell \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
```

**Using Pyspark**

```
pyspark2 \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
```

In [2]:
spark.conf.set('spark.sql.shuffle.partitions', '2')

* Let us assign ranks based up on departure delay from each of the airport.

In [3]:
airtraffic_path = "/public/airtraffic_all/airtraffic-part/flightmonth=200801"

In [4]:
airtraffic = spark. \
    read. \
    parquet(airtraffic_path)

In [5]:
from pyspark.sql.functions import col, lit, lpad, concat
from pyspark.sql.functions import rank, dense_rank
from pyspark.sql.functions import percent_rank, row_number, round
from pyspark.sql.window import Window

In [6]:
spec = Window. \
    partitionBy("FlightDate", "Origin"). \
    orderBy(col("DepDelay").desc())

In [7]:
airtraffic. \
    filter("IsDepDelayed = 'YES' and Cancelled = 0"). \
    select(concat("Year", 
                  lpad("Month", 2, "0"), 
                  lpad("DayOfMonth", 2, "0")
                 ).alias("FlightDate"),
           "Origin",
           "UniqueCarrier",
           "FlightNum",
           "CRSDepTime",
           "IsDepDelayed",
           col("DepDelay").cast("int").alias("DepDelay")
          ). \
    withColumn("srank", rank().over(spec)). \
    withColumn("drank", dense_rank().over(spec)). \
    withColumn("prank", round(percent_rank().over(spec), 2)). \
    withColumn("rn", row_number().over(spec)). \
    orderBy("FlightDate", "Origin", col("DepDelay").desc()). \
    show()

+----------+------+-------------+---------+----------+------------+--------+-----+-----+-----+---+
|FlightDate|Origin|UniqueCarrier|FlightNum|CRSDepTime|IsDepDelayed|DepDelay|srank|drank|prank| rn|
+----------+------+-------------+---------+----------+------------+--------+-----+-----+-----+---+
|  20080101|   ABE|           YV|     7138|      1741|         YES|     175|    1|    1|  0.0|  1|
|  20080101|   ABE|           YV|     7263|      1230|         YES|     137|    2|    2| 0.14|  2|
|  20080101|   ABE|           9E|     2940|      1215|         YES|      70|    3|    3| 0.29|  3|
|  20080101|   ABE|           9E|     2936|      1615|         YES|      34|    4|    4| 0.43|  4|
|  20080101|   ABE|           XE|     2594|      1740|         YES|      34|    4|    4| 0.43|  5|
|  20080101|   ABE|           XE|     2578|      1410|         YES|      22|    6|    5| 0.71|  6|
|  20080101|   ABE|           OH|     5457|      1720|         YES|      14|    7|    6| 0.86|  7|
|  2008010

* Let us assign ranks to each employee with in their respective department based up on their salary.

In [8]:
employeesPath = '/public/hr_db/employees'

In [9]:
employees = spark. \
    read. \
    format('csv'). \
    option('sep', '\t'). \
    schema('''employee_id INT, 
              first_name STRING, 
              last_name STRING, 
              email STRING,
              phone_number STRING, 
              hire_date STRING, 
              job_id STRING, 
              salary FLOAT,
              commission_pct STRING,
              manager_id STRING, 
              department_id STRING
            '''). \
    load(employeesPath)

In [10]:
employees. \
    select('employee_id', 
           col('department_id').cast('int').alias('department_id'), 
           'salary'
          ). \
    orderBy('department_id', 'salary'). \
    show()

+-----------+-------------+-------+
|employee_id|department_id| salary|
+-----------+-------------+-------+
|        178|         null| 7000.0|
|        200|           10| 4400.0|
|        202|           20| 6000.0|
|        201|           20|13000.0|
|        119|           30| 2500.0|
|        118|           30| 2600.0|
|        117|           30| 2800.0|
|        116|           30| 2900.0|
|        115|           30| 3100.0|
|        114|           30|11000.0|
|        203|           40| 6500.0|
|        132|           50| 2100.0|
|        128|           50| 2200.0|
|        136|           50| 2200.0|
|        135|           50| 2400.0|
|        127|           50| 2400.0|
|        131|           50| 2500.0|
|        140|           50| 2500.0|
|        191|           50| 2500.0|
|        144|           50| 2500.0|
+-----------+-------------+-------+
only showing top 20 rows



In [11]:
spec = Window. \
    partitionBy('department_id'). \
    orderBy(col('salary').desc())

In [12]:
employees. \
    select('employee_id', 
           col('department_id').cast('int').alias('department_id'), 
           'salary'
          ). \
    withColumn("srank", rank().over(spec)). \
    withColumn("drank", dense_rank().over(spec)). \
    withColumn("prank", round(percent_rank().over(spec), 2)). \
    withColumn("rn", row_number().over(spec)). \
    orderBy("department_id", col("salary").desc()). \
    show(107)

+-----------+-------------+-------+-----+-----+-----+---+
|employee_id|department_id| salary|srank|drank|prank| rn|
+-----------+-------------+-------+-----+-----+-----+---+
|        178|         null| 7000.0|    1|    1|  0.0|  1|
|        200|           10| 4400.0|    1|    1|  0.0|  1|
|        201|           20|13000.0|    1|    1|  0.0|  1|
|        202|           20| 6000.0|    2|    2|  1.0|  2|
|        114|           30|11000.0|    1|    1|  0.0|  1|
|        115|           30| 3100.0|    2|    2|  0.2|  2|
|        116|           30| 2900.0|    3|    3|  0.4|  3|
|        117|           30| 2800.0|    4|    4|  0.6|  4|
|        118|           30| 2600.0|    5|    5|  0.8|  5|
|        119|           30| 2500.0|    6|    6|  1.0|  6|
|        203|           40| 6500.0|    1|    1|  0.0|  1|
|        121|           50| 8200.0|    1|    1|  0.0|  1|
|        120|           50| 8000.0|    2|    2| 0.02|  2|
|        122|           50| 7900.0|    3|    3| 0.05|  3|
|        123| 