Aggregate data using rollupΒΆ

Let us go through the details related to advanced aggregations using rollup in Spark.

Let us start spark context for this Notebook so that we can execute the code provided. You can sign up for our 10 node state of the art cluster/labs to learn Spark SQL using our unique integrated LMS.

from pyspark.sql import SparkSession

import getpass
username = getpass.getuser()

spark = SparkSession. \
    builder. \
    config('spark.ui.port', '0'). \
    config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
    enableHiveSupport(). \
    appName(f'{username} | Python - Basic Transformations'). \
    master('yarn'). \
    getOrCreate()

If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.

Using Spark SQL

spark2-sql \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse

Using Scala

spark2-shell \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse

Using Pyspark

pyspark2 \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
orders = spark.read.json('/public/retail_db_json/orders')
orders.show()
+-----------------+--------------------+--------+---------------+
|order_customer_id|          order_date|order_id|   order_status|
+-----------------+--------------------+--------+---------------+
|            11599|2013-07-25 00:00:...|       1|         CLOSED|
|              256|2013-07-25 00:00:...|       2|PENDING_PAYMENT|
|            12111|2013-07-25 00:00:...|       3|       COMPLETE|
|             8827|2013-07-25 00:00:...|       4|         CLOSED|
|            11318|2013-07-25 00:00:...|       5|       COMPLETE|
|             7130|2013-07-25 00:00:...|       6|       COMPLETE|
|             4530|2013-07-25 00:00:...|       7|       COMPLETE|
|             2911|2013-07-25 00:00:...|       8|     PROCESSING|
|             5657|2013-07-25 00:00:...|       9|PENDING_PAYMENT|
|             5648|2013-07-25 00:00:...|      10|PENDING_PAYMENT|
|              918|2013-07-25 00:00:...|      11| PAYMENT_REVIEW|
|             1837|2013-07-25 00:00:...|      12|         CLOSED|
|             9149|2013-07-25 00:00:...|      13|PENDING_PAYMENT|
|             9842|2013-07-25 00:00:...|      14|     PROCESSING|
|             2568|2013-07-25 00:00:...|      15|       COMPLETE|
|             7276|2013-07-25 00:00:...|      16|PENDING_PAYMENT|
|             2667|2013-07-25 00:00:...|      17|       COMPLETE|
|             1205|2013-07-25 00:00:...|      18|         CLOSED|
|             9488|2013-07-25 00:00:...|      19|PENDING_PAYMENT|
|             9198|2013-07-25 00:00:...|      20|     PROCESSING|
+-----------------+--------------------+--------+---------------+
only showing top 20 rows
orders.printSchema()
root
 |-- order_customer_id: long (nullable = true)
 |-- order_date: string (nullable = true)
 |-- order_id: long (nullable = true)
 |-- order_status: string (nullable = true)
orders.count()
68883
  • Get count of orders rolled up by date.

from pyspark.sql.functions import count, lit
orders. \
    groupBy('order_date'). \
    agg(count(lit(1)).alias('order_count')). \
    show()
+--------------------+-----------+
|          order_date|order_count|
+--------------------+-----------+
|2013-08-13 00:00:...|         73|
|2013-10-12 00:00:...|        162|
|2013-11-15 00:00:...|        135|
|2014-03-19 00:00:...|        130|
|2014-04-26 00:00:...|        251|
|2013-09-16 00:00:...|        121|
|2013-09-20 00:00:...|        139|
|2013-12-31 00:00:...|        266|
|2013-09-06 00:00:...|        276|
|2014-06-15 00:00:...|        128|
|2013-12-24 00:00:...|        170|
|2014-01-07 00:00:...|        163|
|2014-06-07 00:00:...|        191|
|2013-10-14 00:00:...|        139|
|2013-11-11 00:00:...|        246|
|2014-01-27 00:00:...|        163|
|2014-01-29 00:00:...|        158|
|2014-02-14 00:00:...|        174|
|2014-04-15 00:00:...|        180|
|2014-04-22 00:00:...|        144|
+--------------------+-----------+
only showing top 20 rows
orders. \
    groupBy('order_date'). \
    agg(count(lit(1)).alias('order_count')). \
    count()
364
orders.rollup?
Signature: orders.rollup(*cols)
Docstring:
Create a multi-dimensional rollup for the current :class:`DataFrame` using
the specified columns, so we can run aggregation on them.

>>> df.rollup("name", df.age).count().orderBy("name", "age").show()
+-----+----+-----+
| name| age|count|
+-----+----+-----+
| null|null|    2|
|Alice|null|    1|
|Alice|   2|    1|
|  Bob|null|    1|
|  Bob|   5|    1|
+-----+----+-----+

.. versionadded:: 1.4
File:      /usr/hdp/current/spark2-client/python/pyspark/sql/dataframe.py
Type:      method
orders. \
    rollup('order_date'). \
    agg(count(lit(1)).alias('order_count')). \
    orderBy('order_date'). \
    show()
+--------------------+-----------+
|          order_date|order_count|
+--------------------+-----------+
|                null|      68883|
|2013-07-25 00:00:...|        143|
|2013-07-26 00:00:...|        269|
|2013-07-27 00:00:...|        202|
|2013-07-28 00:00:...|        187|
|2013-07-29 00:00:...|        253|
|2013-07-30 00:00:...|        227|
|2013-07-31 00:00:...|        252|
|2013-08-01 00:00:...|        246|
|2013-08-02 00:00:...|        224|
|2013-08-03 00:00:...|        183|
|2013-08-04 00:00:...|        187|
|2013-08-05 00:00:...|        153|
|2013-08-06 00:00:...|        258|
|2013-08-07 00:00:...|        203|
|2013-08-08 00:00:...|        154|
|2013-08-09 00:00:...|        125|
|2013-08-10 00:00:...|        270|
|2013-08-11 00:00:...|        154|
|2013-08-12 00:00:...|        255|
+--------------------+-----------+
only showing top 20 rows
orders. \
    rollup('order_date'). \
    agg(count(lit(1)).alias('order_count')). \
    orderBy('order_date'). \
    count()
365
  • Get count of orders rolled up by month as well as date. You will see an additional record per month.

from pyspark.sql.functions import date_format
orders. \
    groupBy(date_format('order_date', 'yyyyMM').alias('order_month'), 'order_date'). \
    agg(count(lit(1)).alias('order_count')). \
    orderBy('order_month', 'order_date'). \
    show()
+-----------+--------------------+-----------+
|order_month|          order_date|order_count|
+-----------+--------------------+-----------+
|     201307|2013-07-25 00:00:...|        143|
|     201307|2013-07-26 00:00:...|        269|
|     201307|2013-07-27 00:00:...|        202|
|     201307|2013-07-28 00:00:...|        187|
|     201307|2013-07-29 00:00:...|        253|
|     201307|2013-07-30 00:00:...|        227|
|     201307|2013-07-31 00:00:...|        252|
|     201308|2013-08-01 00:00:...|        246|
|     201308|2013-08-02 00:00:...|        224|
|     201308|2013-08-03 00:00:...|        183|
|     201308|2013-08-04 00:00:...|        187|
|     201308|2013-08-05 00:00:...|        153|
|     201308|2013-08-06 00:00:...|        258|
|     201308|2013-08-07 00:00:...|        203|
|     201308|2013-08-08 00:00:...|        154|
|     201308|2013-08-09 00:00:...|        125|
|     201308|2013-08-10 00:00:...|        270|
|     201308|2013-08-11 00:00:...|        154|
|     201308|2013-08-12 00:00:...|        255|
|     201308|2013-08-13 00:00:...|         73|
+-----------+--------------------+-----------+
only showing top 20 rows
orders. \
    groupBy(date_format('order_date', 'yyyyMM').alias('order_month'), 'order_date'). \
    agg(count(lit(1)).alias('order_count')). \
    orderBy('order_month', 'order_date'). \
    count()
364
orders. \
    rollup(date_format('order_date', 'yyyyMM').alias('order_month'), 'order_date'). \
    agg(count(lit(1)).alias('order_count')). \
    orderBy('order_month', 'order_date'). \
    show()
+-----------+--------------------+-----------+
|order_month|          order_date|order_count|
+-----------+--------------------+-----------+
|       null|                null|      68883|
|     201307|                null|       1533|
|     201307|2013-07-25 00:00:...|        143|
|     201307|2013-07-26 00:00:...|        269|
|     201307|2013-07-27 00:00:...|        202|
|     201307|2013-07-28 00:00:...|        187|
|     201307|2013-07-29 00:00:...|        253|
|     201307|2013-07-30 00:00:...|        227|
|     201307|2013-07-31 00:00:...|        252|
|     201308|                null|       5680|
|     201308|2013-08-01 00:00:...|        246|
|     201308|2013-08-02 00:00:...|        224|
|     201308|2013-08-03 00:00:...|        183|
|     201308|2013-08-04 00:00:...|        187|
|     201308|2013-08-05 00:00:...|        153|
|     201308|2013-08-06 00:00:...|        258|
|     201308|2013-08-07 00:00:...|        203|
|     201308|2013-08-08 00:00:...|        154|
|     201308|2013-08-09 00:00:...|        125|
|     201308|2013-08-10 00:00:...|        270|
+-----------+--------------------+-----------+
only showing top 20 rows
orders. \
    rollup(date_format('order_date', 'yyyyMM').alias('order_month'), 'order_date'). \
    agg(count(lit(1)).alias('order_count')). \
    orderBy('order_month', 'order_date'). \
    count()
378
  • Get count of orders rolled up by year, month as well as date. You will see an additional record per month as well as per year.

from pyspark.sql.functions import year
orders. \
    groupBy(
        year('order_date').alias('order_year'),
        date_format('order_date', 'yyyyMM').alias('order_month'), 
        'order_date'
    ). \
    agg(count(lit(1)).alias('order_count')). \
    orderBy('order_year', 'order_month', 'order_date'). \
    show()
+----------+-----------+--------------------+-----------+
|order_year|order_month|          order_date|order_count|
+----------+-----------+--------------------+-----------+
|      2013|     201307|2013-07-25 00:00:...|        143|
|      2013|     201307|2013-07-26 00:00:...|        269|
|      2013|     201307|2013-07-27 00:00:...|        202|
|      2013|     201307|2013-07-28 00:00:...|        187|
|      2013|     201307|2013-07-29 00:00:...|        253|
|      2013|     201307|2013-07-30 00:00:...|        227|
|      2013|     201307|2013-07-31 00:00:...|        252|
|      2013|     201308|2013-08-01 00:00:...|        246|
|      2013|     201308|2013-08-02 00:00:...|        224|
|      2013|     201308|2013-08-03 00:00:...|        183|
|      2013|     201308|2013-08-04 00:00:...|        187|
|      2013|     201308|2013-08-05 00:00:...|        153|
|      2013|     201308|2013-08-06 00:00:...|        258|
|      2013|     201308|2013-08-07 00:00:...|        203|
|      2013|     201308|2013-08-08 00:00:...|        154|
|      2013|     201308|2013-08-09 00:00:...|        125|
|      2013|     201308|2013-08-10 00:00:...|        270|
|      2013|     201308|2013-08-11 00:00:...|        154|
|      2013|     201308|2013-08-12 00:00:...|        255|
|      2013|     201308|2013-08-13 00:00:...|         73|
+----------+-----------+--------------------+-----------+
only showing top 20 rows
orders. \
    groupBy(
        year('order_date').alias('order_year'),
        date_format('order_date', 'yyyyMM').alias('order_month'), 
        'order_date'
    ). \
    agg(count(lit(1)).alias('order_count')). \
    orderBy('order_year', 'order_month', 'order_date'). \
    count()
364
orders. \
    rollup(
        year('order_date').alias('order_year'),
        date_format('order_date', 'yyyyMM').alias('order_month'), 
        'order_date'
    ). \
    agg(count(lit(1)).alias('order_count')). \
    orderBy('order_year', 'order_month', 'order_date'). \
    show()
+----------+-----------+--------------------+-----------+
|order_year|order_month|          order_date|order_count|
+----------+-----------+--------------------+-----------+
|      null|       null|                null|      68883|
|      2013|       null|                null|      30662|
|      2013|     201307|                null|       1533|
|      2013|     201307|2013-07-25 00:00:...|        143|
|      2013|     201307|2013-07-26 00:00:...|        269|
|      2013|     201307|2013-07-27 00:00:...|        202|
|      2013|     201307|2013-07-28 00:00:...|        187|
|      2013|     201307|2013-07-29 00:00:...|        253|
|      2013|     201307|2013-07-30 00:00:...|        227|
|      2013|     201307|2013-07-31 00:00:...|        252|
|      2013|     201308|                null|       5680|
|      2013|     201308|2013-08-01 00:00:...|        246|
|      2013|     201308|2013-08-02 00:00:...|        224|
|      2013|     201308|2013-08-03 00:00:...|        183|
|      2013|     201308|2013-08-04 00:00:...|        187|
|      2013|     201308|2013-08-05 00:00:...|        153|
|      2013|     201308|2013-08-06 00:00:...|        258|
|      2013|     201308|2013-08-07 00:00:...|        203|
|      2013|     201308|2013-08-08 00:00:...|        154|
|      2013|     201308|2013-08-09 00:00:...|        125|
+----------+-----------+--------------------+-----------+
only showing top 20 rows
orders. \
    rollup(
        year('order_date').alias('order_year'),
        date_format('order_date', 'yyyyMM').alias('order_month'), 
        'order_date'
    ). \
    agg(count(lit(1)).alias('order_count')). \
    orderBy('order_year', 'order_month', 'order_date'). \
    count()
380
orders. \
    rollup(
        year('order_date').alias('order_year'),
        date_format('order_date', 'yyyyMM').alias('order_month'), 
        'order_date'
    ). \
    agg(count(lit(1)).alias('order_count')). \
    filter("order_month = 201401"). \
    orderBy('order_year', 'order_month', 'order_date'). \
    show(32)
+----------+-----------+--------------------+-----------+
|order_year|order_month|          order_date|order_count|
+----------+-----------+--------------------+-----------+
|      2014|     201401|                null|       5908|
|      2014|     201401|2014-01-01 00:00:...|        135|
|      2014|     201401|2014-01-02 00:00:...|        111|
|      2014|     201401|2014-01-03 00:00:...|        250|
|      2014|     201401|2014-01-04 00:00:...|        129|
|      2014|     201401|2014-01-05 00:00:...|        266|
|      2014|     201401|2014-01-06 00:00:...|        155|
|      2014|     201401|2014-01-07 00:00:...|        163|
|      2014|     201401|2014-01-08 00:00:...|        122|
|      2014|     201401|2014-01-09 00:00:...|        207|
|      2014|     201401|2014-01-10 00:00:...|        241|
|      2014|     201401|2014-01-11 00:00:...|        281|
|      2014|     201401|2014-01-12 00:00:...|        215|
|      2014|     201401|2014-01-13 00:00:...|        179|
|      2014|     201401|2014-01-14 00:00:...|        209|
|      2014|     201401|2014-01-15 00:00:...|        243|
|      2014|     201401|2014-01-16 00:00:...|        194|
|      2014|     201401|2014-01-17 00:00:...|        149|
|      2014|     201401|2014-01-18 00:00:...|        139|
|      2014|     201401|2014-01-19 00:00:...|        217|
|      2014|     201401|2014-01-20 00:00:...|        203|
|      2014|     201401|2014-01-21 00:00:...|        259|
|      2014|     201401|2014-01-22 00:00:...|        209|
|      2014|     201401|2014-01-23 00:00:...|        220|
|      2014|     201401|2014-01-24 00:00:...|        159|
|      2014|     201401|2014-01-25 00:00:...|        104|
|      2014|     201401|2014-01-26 00:00:...|        154|
|      2014|     201401|2014-01-27 00:00:...|        163|
|      2014|     201401|2014-01-28 00:00:...|        197|
|      2014|     201401|2014-01-29 00:00:...|        158|
|      2014|     201401|2014-01-30 00:00:...|        254|
|      2014|     201401|2014-01-31 00:00:...|        223|
+----------+-----------+--------------------+-----------+
orders. \
    rollup(
        year('order_date').alias('order_year'),
        date_format('order_date', 'yyyyMM').alias('order_month'), 
        'order_date'
    ). \
    agg(count(lit(1)).alias('order_count')). \
    filter("order_month = 201401"). \
    orderBy('order_year', 'order_month', 'order_date'). \
    count()
32