## Using LEAD or LAG

Let us understand the usage of `LEAD` or `LAG` functions. Both are used for similar scenarios.

Let us start spark context for this Notebook so that we can execute the code provided. You can sign up for our [10 node state of the art cluster/labs](https://labs.itversity.com/plans) to learn Spark SQL using our unique integrated LMS.

In [None]:
from pyspark.sql import SparkSession

import getpass
username = getpass.getuser()

spark = SparkSession. \
    builder. \
    config('spark.ui.port', '0'). \
    config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
    enableHiveSupport(). \
    appName(f'{username} | Python - Windowing Functions'). \
    master('yarn'). \
    getOrCreate()

If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.

**Using Spark SQL**

```
spark2-sql \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
```

**Using Scala**

```
spark2-shell \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
```

**Using Pyspark**

```
pyspark2 \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
```

In [2]:
spark.conf.set('spark.sql.shuffle.partitions', '2')

In [None]:
airtraffic_path = "/public/airtraffic_all/airtraffic-part/flightmonth=200801"

In [None]:
airtraffic = spark. \
  read. \
  parquet(airtraffic_path)

In [None]:
from pyspark.sql.functions import col, lit, lpad, concat

In [None]:
from pyspark.sql.functions import lead

In [None]:
from pyspark.sql.window import Window

In [None]:
spec = Window. \
    partitionBy("FlightDate", "Origin"). \
    orderBy(col("CRSDepTime"))

In [None]:
airtraffic. \
    filter("IsDepDelayed = 'YES' and Cancelled = 0"). \
    select(concat("Year", 
                  lpad("Month", 2, "0"), 
                  lpad("DayOfMonth", 2, "0")
                 ).alias("FlightDate"),
           "Origin",
           "UniqueCarrier",
           "FlightNum",
           "CRSDepTime",
           "IsDepDelayed",
           col("DepDelay").cast("int").alias("DepDelay")
          ). \
    withColumn("LeadUniqueCarrier", lead("UniqueCarrier").over(spec)). \
    withColumn("LeadFlightNum", lead("FlightNum").over(spec)). \
    withColumn("LeadCRSDepTime", lead("CRSDepTime").over(spec)). \
    withColumn("LeadDepDelay", lead("DepDelay").over(spec)). \
    orderBy("FlightDate", "Origin", "CRSDepTime"). \
    show()

### Using LEAD or LAG with 7

In [None]:
airtraffic_path = "/public/airtraffic_all/airtraffic-part/flightmonth=200801"

In [None]:
airtraffic = spark. \
    read. \
    parquet(airtraffic_path)

In [None]:
from pyspark.sql.functions import col, lit, lpad, concat

In [None]:
from pyspark.sql.functions import sum, lead, substring

In [None]:
from pyspark.sql.window import Window

In [None]:
spec = Window. \
    partitionBy(substring("FlightDate", 1, 6), "Origin"). \
    orderBy("FlightDate", col("TotalDepDelay").desc())

In [None]:
airtraffic. \
    filter("""IsDepDelayed = 'YES' 
              AND Cancelled = 0
              AND concat(Year, 
                         lpad(Month, 2, '0'),
                         lpad(DayOfMonth, 2, '0')
                        ) BETWEEN 20080101 AND 20080114
              AND Origin IN ('ATL', 'DFW', 'JFK', 'LAX', 'SFO', 'ORD')
           """
          ). \
    groupBy(concat("Year", 
                   lpad("Month", 2, "0"), 
                   lpad("DayOfMonth", 2, "0")
                  ).alias("FlightDate"), 
            "Origin"
           ). \
    agg(sum(col("DepDelay").cast("int")).alias("TotalDepDelay")). \
    withColumn("LeadFlightDate", lead("FlightDate", 7).over(spec)). \
    withColumn("LeadOrigin", lead("Origin", 7).over(spec)). \
    withColumn("LeadTotalDepDelay", lead("TotalDepDelay", 7).over(spec)). \
    filter('Origin = "ORD"'). \
    orderBy("FlightDate", col("TotalDepDelay").desc()). \
    show()

In [None]:
airtraffic. \
    filter("""IsDepDelayed = 'YES' 
              AND Cancelled = 0
              AND concat(Year, 
                         lpad(Month, 2, '0'),
                         lpad(DayOfMonth, 2, '0')
                        ) BETWEEN 20080101 AND 20080114
              AND Origin IN ('ATL', 'DFW', 'JFK', 'LAX', 'SFO', 'ORD')
           """
          ). \
    groupBy(concat("Year", 
                   lpad("Month", 2, "0"), 
                   lpad("DayOfMonth", 2, "0")
                  ).alias("FlightDate"), 
            "Origin"
           ). \
    agg(sum(col("DepDelay").cast("int")).alias("TotalDepDelay")). \
    withColumn("LeadFlightDate", lead("FlightDate", 7).over(spec)). \
    withColumn("LeadOrigin", lead("Origin", 7).over(spec)). \
    withColumn("LeadTotalDepDelay", lead("TotalDepDelay", 7).over(spec)). \
    filter('Origin = "ORD" AND FlightDate BETWEEN 20080101 AND 20080107'). \
    orderBy("FlightDate", col("TotalDepDelay").desc()). \
    show()

In [None]:
airtraffic. \
    filter("""IsDepDelayed = 'YES' 
              AND Cancelled = 0
              AND concat(Year, 
                         lpad(Month, 2, '0'),
                         lpad(DayOfMonth, 2, '0')
                        ) BETWEEN 20080101 AND 20080114
              AND Origin IN ('ATL', 'DFW', 'JFK', 'LAX', 'SFO', 'ORD')
           """
          ). \
    groupBy(concat("Year", 
                   lpad("Month", 2, "0"), 
                   lpad("DayOfMonth", 2, "0")
                  ).alias("FlightDate"), 
            "Origin"
           ). \
    agg(sum(col("DepDelay").cast("int")).alias("TotalDepDelay")). \
    withColumn("LeadFlightDate", lead("FlightDate", 7).over(spec)). \
    withColumn("LeadOrigin", lead("Origin", 7).over(spec)). \
    withColumn("LeadTotalDepDelay", lead("TotalDepDelay", 7).over(spec)). \
    filter('FlightDate BETWEEN 20080101 AND 20080107'). \
    orderBy("FlightDate", col("TotalDepDelay").desc()). \
    show()