## Overview of Windowing Functions

Let us get an overview of Windowing Functions using Spark. 
* These are available as part of SQL in most of the traditional databases. 
* In some databases they are also known as Analytical Functions.

Let us start spark context for this Notebook so that we can execute the code provided. You can sign up for our [10 node state of the art cluster/labs](https://labs.itversity.com/plans) to learn Spark SQL using our unique integrated LMS.

In [1]:
from pyspark.sql import SparkSession

import getpass
username = getpass.getuser()

spark = SparkSession. \
    builder. \
    config('spark.ui.port', '0'). \
    config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
    enableHiveSupport(). \
    appName(f'{username} | Python - Windowing Functions'). \
    master('yarn'). \
    getOrCreate()

If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.

**Using Spark SQL**

```
spark2-sql \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
```

**Using Scala**

```
spark2-shell \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
```

**Using Pyspark**

```
pyspark2 \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
```

 * First let us understand relevance of these functions using `employees` data set.

In [2]:
employeesPath = '/public/hr_db/employees'

In [3]:
employees = spark. \
    read. \
    format('csv'). \
    option('sep', '\t'). \
    schema('''employee_id INT, 
              first_name STRING, 
              last_name STRING, 
              email STRING,
              phone_number STRING, 
              hire_date STRING, 
              job_id STRING, 
              salary FLOAT,
              commission_pct STRING,
              manager_id STRING, 
              department_id STRING
            '''). \
    load(employeesPath)

In [4]:
employees.show()

+-----------+----------+----------+--------+------------------+----------+--------+-------+--------------+----------+-------------+
|employee_id|first_name| last_name|   email|      phone_number| hire_date|  job_id| salary|commission_pct|manager_id|department_id|
+-----------+----------+----------+--------+------------------+----------+--------+-------+--------------+----------+-------------+
|        127|     James|    Landry| JLANDRY|      650.124.1334|1999-01-14|ST_CLERK| 2400.0|          null|       120|           50|
|        128|    Steven|    Markle| SMARKLE|      650.124.1434|2000-03-08|ST_CLERK| 2200.0|          null|       120|           50|
|        129|     Laura|    Bissot| LBISSOT|      650.124.5234|1997-08-20|ST_CLERK| 3300.0|          null|       121|           50|
|        130|     Mozhe|  Atkinson|MATKINSO|      650.124.6234|1997-10-30|ST_CLERK| 2800.0|          null|       121|           50|
|        131|     James|    Marlow| JAMRLOW|      650.124.7234|1997-02-16|ST

In [5]:
employees.printSchema()

root
 |-- employee_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- phone_number: string (nullable = true)
 |-- hire_date: string (nullable = true)
 |-- job_id: string (nullable = true)
 |-- salary: float (nullable = true)
 |-- commission_pct: string (nullable = true)
 |-- manager_id: string (nullable = true)
 |-- department_id: string (nullable = true)



In [6]:
employees.count()

107

In [7]:
from pyspark.sql.functions import col

In [8]:
employees. \
    select('employee_id', 
           col('department_id').cast('int').alias('department_id'), 
           'salary'
          ). \
    orderBy('department_id', 'salary'). \
    show()

+-----------+-------------+-------+
|employee_id|department_id| salary|
+-----------+-------------+-------+
|        178|         null| 7000.0|
|        200|           10| 4400.0|
|        202|           20| 6000.0|
|        201|           20|13000.0|
|        119|           30| 2500.0|
|        118|           30| 2600.0|
|        117|           30| 2800.0|
|        116|           30| 2900.0|
|        115|           30| 3100.0|
|        114|           30|11000.0|
|        203|           40| 6500.0|
|        132|           50| 2100.0|
|        128|           50| 2200.0|
|        136|           50| 2200.0|
|        135|           50| 2400.0|
|        127|           50| 2400.0|
|        131|           50| 2500.0|
|        140|           50| 2500.0|
|        191|           50| 2500.0|
|        144|           50| 2500.0|
+-----------+-------------+-------+
only showing top 20 rows



* Let us say we want to compare individual salary with department wise salary expense.
* Here is one of the approach which require self join.
  * Compute department wise expense usig `groupBy` and `agg`.
  * Join with **employees** again on department_id.

In [9]:
from pyspark.sql.functions import sum, col

In [10]:
department_expense = employees. \
    groupBy('department_id'). \
    agg(sum('salary').alias('expense'))

In [11]:
department_expense.show()

+-------------+--------+
|department_id| expense|
+-------------+--------+
|           30| 24900.0|
|          110| 20300.0|
|          100| 51600.0|
|           70| 10000.0|
|           90| 58000.0|
|           60| 28800.0|
|           40|  6500.0|
|           20| 19000.0|
|           10|  4400.0|
|           80|304500.0|
|         null|  7000.0|
|           50|156400.0|
+-------------+--------+



In [12]:
employees. \
    select('employee_id', 'department_id', 'salary'). \
    join(department_expense, employees.department_id == department_expense.department_id). \
    orderBy(employees.department_id, col('salary')). \
    show()

+-----------+-------------+-------+-------------+--------+
|employee_id|department_id| salary|department_id| expense|
+-----------+-------------+-------+-------------+--------+
|        200|           10| 4400.0|           10|  4400.0|
|        113|          100| 6900.0|          100| 51600.0|
|        111|          100| 7700.0|          100| 51600.0|
|        112|          100| 7800.0|          100| 51600.0|
|        110|          100| 8200.0|          100| 51600.0|
|        109|          100| 9000.0|          100| 51600.0|
|        108|          100|12000.0|          100| 51600.0|
|        206|          110| 8300.0|          110| 20300.0|
|        205|          110|12000.0|          110| 20300.0|
|        202|           20| 6000.0|           20| 19000.0|
|        201|           20|13000.0|           20| 19000.0|
|        119|           30| 2500.0|           30| 24900.0|
|        118|           30| 2600.0|           30| 24900.0|
|        117|           30| 2800.0|           30| 24900.

 **However, using this approach is not very efficient and also overly complicated. Windowing functions actually simplify the logic and also runs efficiently**
 
Now let us get into the details related to Windowing functions.
 * Main package `pyspark.sql.window`
 * It has classes such as `Window` and `WindowSpec`
 * `Window` have APIs such as `partitionBy`, `orderBy` etc
 * These APIs (such as `partitionBy`) return `WindowSpec` object. We can pass `WindowSpec` object to over on functions such as `rank()`, `dense_rank()`, `sum()` etc
 * Syntax: `sum().over(spec)` where `spec = Window.partitionBy('ColumnName')`

In [14]:
from pyspark.sql import window

In [15]:
help(window)

Help on module pyspark.sql.window in pyspark.sql:

NAME
    pyspark.sql.window

DESCRIPTION
    # Licensed to the Apache Software Foundation (ASF) under one or more
    # contributor license agreements.  See the NOTICE file distributed with
    # this work for additional information regarding copyright ownership.
    # The ASF licenses this file to You under the Apache License, Version 2.0
    # (the "License"); you may not use this file except in compliance with
    # the License.  You may obtain a copy of the License at
    #
    #    http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    #

CLASSES
    builtins.object
        Window
        WindowSpec
   

| Functions        | API or Function      |
| ------------- |:-------------:|
| Aggregate Functions      | <ul><li>sum</li><li>avg</li><li>min</li><li>max</li></ul> |
| Ranking Functions      | <ul><li>rank</li><li>dense_rank</li></ul><ul><li>percent_rank</li><li>row_number</li> <li>ntile</li></ul> |
| Analytic Functions      | <ul><li>cume_dist</li><li>first</li><li>last</li><li>lead</li> <li>lag</li></ul> |