Left or Right Outer JoinΒΆ

Let us understand about left or right outer join using Spark.

  • Many times we want to get the data from both the data sets satisfying the join condition along with the data from the driving table which does not satisfy the join condition.

  • We can use either left or right outer join to perform outer join to fulfill our requirements.

  • Here is one of the classic example.

    • We have customers data in customers folder or table and orders data in orders folder or table.

    • There is one to many relationship between customers and orders.

    • There might be customers who never placed any orders.

    • We might want to get the revenue or number of orders placed by customers.

    • In case, if we need to display count or revenue as 0 for such customers - we need to perform outer join between customers and orders with customers as driving table.

Let us start spark context for this Notebook so that we can execute the code provided. You can sign up for our 10 node state of the art cluster/labs to learn Spark SQL using our unique integrated LMS.

from pyspark.sql import SparkSession

import getpass
username = getpass.getuser()

spark = SparkSession. \
    builder. \
    config('spark.ui.port', '0'). \
    config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
    enableHiveSupport(). \
    appName(f'{username} | Python - Joining Data Sets'). \
    master('yarn'). \
    getOrCreate()

If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.

Using Spark SQL

spark2-sql \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse

Using Scala

spark2-shell \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse

Using Pyspark

pyspark2 \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
  • Perform outer join between customers and orders and get the count to compare with inner join count.

spark.conf.set("spark.sql.shuffle.partitions", "2")
orders = spark.read.json('/public/retail_db_json/orders')
customers = spark.read.json('/public/retail_db_json/customers')
orders.printSchema()
root
 |-- order_customer_id: long (nullable = true)
 |-- order_date: string (nullable = true)
 |-- order_id: long (nullable = true)
 |-- order_status: string (nullable = true)
customers.printSchema()
root
 |-- customer_city: string (nullable = true)
 |-- customer_email: string (nullable = true)
 |-- customer_fname: string (nullable = true)
 |-- customer_id: long (nullable = true)
 |-- customer_lname: string (nullable = true)
 |-- customer_password: string (nullable = true)
 |-- customer_state: string (nullable = true)
 |-- customer_street: string (nullable = true)
 |-- customer_zipcode: string (nullable = true)
help(customers.join)
Help on method join in module pyspark.sql.dataframe:

join(other, on=None, how=None) method of pyspark.sql.dataframe.DataFrame instance
    Joins with another :class:`DataFrame`, using the given join expression.
    
    :param other: Right side of the join
    :param on: a string for the join column name, a list of column names,
        a join expression (Column), or a list of Columns.
        If `on` is a string or a list of strings indicating the name of the join column(s),
        the column(s) must exist on both sides, and this performs an equi-join.
    :param how: str, default ``inner``. Must be one of: ``inner``, ``cross``, ``outer``,
        ``full``, ``full_outer``, ``left``, ``left_outer``, ``right``, ``right_outer``,
        ``left_semi``, and ``left_anti``.
    
    The following performs a full outer join between ``df1`` and ``df2``.
    
    >>> df.join(df2, df.name == df2.name, 'outer').select(df.name, df2.height).collect()
    [Row(name=None, height=80), Row(name='Bob', height=85), Row(name='Alice', height=None)]
    
    >>> df.join(df2, 'name', 'outer').select('name', 'height').collect()
    [Row(name='Tom', height=80), Row(name='Bob', height=85), Row(name='Alice', height=None)]
    
    >>> cond = [df.name == df3.name, df.age == df3.age]
    >>> df.join(df3, cond, 'outer').select(df.name, df3.age).collect()
    [Row(name='Alice', age=2), Row(name='Bob', age=5)]
    
    >>> df.join(df2, 'name').select(df.name, df2.height).collect()
    [Row(name='Bob', height=85)]
    
    >>> df.join(df4, ['name', 'age']).select(df.name, df.age).collect()
    [Row(name='Bob', age=5)]
    
    .. versionadded:: 1.3
customer_order_details = customers.join(
    orders, 
    on=customers['customer_id'] == orders['order_customer_id'],
    how='inner'
)
orders.count()
68883
customer_order_details.count()
68883
customer_order_details_left = customers.join(
    orders, 
    on=customers['customer_id'] == orders['order_customer_id'],
    how='left'
)
customer_order_details_left = customers.join(
    orders, 
    on=customers['customer_id'] == orders['order_customer_id'],
    how='left_outer'
)
customer_order_details_left.printSchema()
root
 |-- customer_city: string (nullable = true)
 |-- customer_email: string (nullable = true)
 |-- customer_fname: string (nullable = true)
 |-- customer_id: long (nullable = true)
 |-- customer_lname: string (nullable = true)
 |-- customer_password: string (nullable = true)
 |-- customer_state: string (nullable = true)
 |-- customer_street: string (nullable = true)
 |-- customer_zipcode: string (nullable = true)
 |-- order_customer_id: long (nullable = true)
 |-- order_date: string (nullable = true)
 |-- order_id: long (nullable = true)
 |-- order_status: string (nullable = true)
customer_order_details_left.count()
68913
customer_order_details_right = orders.join(
    customers, 
    on=customers['customer_id'] == orders['order_customer_id'],
    how='right'
)
customer_order_details_right.count()
68913
customer_order_details_right.printSchema()
root
 |-- order_customer_id: long (nullable = true)
 |-- order_date: string (nullable = true)
 |-- order_id: long (nullable = true)
 |-- order_status: string (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_email: string (nullable = true)
 |-- customer_fname: string (nullable = true)
 |-- customer_id: long (nullable = true)
 |-- customer_lname: string (nullable = true)
 |-- customer_password: string (nullable = true)
 |-- customer_state: string (nullable = true)
 |-- customer_street: string (nullable = true)
 |-- customer_zipcode: string (nullable = true)
customer_order_details_left. \
    select(customers.customer_id, customers.customer_email, orders['*']). \
    show()
+-----------+--------------+-----------------+--------------------+--------+---------------+
|customer_id|customer_email|order_customer_id|          order_date|order_id|   order_status|
+-----------+--------------+-----------------+--------------------+--------+---------------+
|          1|     XXXXXXXXX|                1|2013-12-13 00:00:...|   22945|       COMPLETE|
|          2|     XXXXXXXXX|                2|2013-11-30 00:00:...|   67863|       COMPLETE|
|          2|     XXXXXXXXX|                2|2013-08-02 00:00:...|   57963|        ON_HOLD|
|          2|     XXXXXXXXX|                2|2014-02-18 00:00:...|   33865|       COMPLETE|
|          2|     XXXXXXXXX|                2|2013-10-29 00:00:...|   15192|PENDING_PAYMENT|
|          3|     XXXXXXXXX|                3|2013-12-14 00:00:...|   61453|       COMPLETE|
|          3|     XXXXXXXXX|                3|2014-07-24 00:00:...|   57617|       COMPLETE|
|          3|     XXXXXXXXX|                3|2014-07-15 00:00:...|   56178|        PENDING|
|          3|     XXXXXXXXX|                3|2014-05-09 00:00:...|   46399|     PROCESSING|
|          3|     XXXXXXXXX|                3|2014-02-26 00:00:...|   35158|       COMPLETE|
|          3|     XXXXXXXXX|                3|2013-12-19 00:00:...|   23662|       COMPLETE|
|          3|     XXXXXXXXX|                3|2013-12-11 00:00:...|   22646|       COMPLETE|
|          4|     XXXXXXXXX|                4|2014-06-10 00:00:...|   51157|         CLOSED|
|          4|     XXXXXXXXX|                4|2014-05-28 00:00:...|   49339|       COMPLETE|
|          4|     XXXXXXXXX|                4|2014-03-15 00:00:...|   37878|       COMPLETE|
|          4|     XXXXXXXXX|                4|2013-11-09 00:00:...|   17253|PENDING_PAYMENT|
|          4|     XXXXXXXXX|                4|2013-09-24 00:00:...|    9704|       COMPLETE|
|          4|     XXXXXXXXX|                4|2013-09-19 00:00:...|    9023|       COMPLETE|
|          5|     XXXXXXXXX|                5|2014-05-05 00:00:...|   45832|PENDING_PAYMENT|
|          5|     XXXXXXXXX|                5|2014-04-05 00:00:...|   41333|       COMPLETE|
+-----------+--------------+-----------------+--------------------+--------+---------------+
only showing top 20 rows
orders.count()
68883
customers.count()
12435
customer_order_details.count()
68883
customer_order_details_left.count()
68913
  • Using customer_order_details_left, get all the customers who never placed orders. We need to check, if orders primary key is null on the left or right outer join results.

customer_order_details_left. \
    select(customers.customer_id, customers.customer_email, orders['*']). \
    show()
+-----------+--------------+-----------------+--------------------+--------+---------------+
|customer_id|customer_email|order_customer_id|          order_date|order_id|   order_status|
+-----------+--------------+-----------------+--------------------+--------+---------------+
|          1|     XXXXXXXXX|                1|2013-12-13 00:00:...|   22945|       COMPLETE|
|          2|     XXXXXXXXX|                2|2013-11-30 00:00:...|   67863|       COMPLETE|
|          2|     XXXXXXXXX|                2|2013-08-02 00:00:...|   57963|        ON_HOLD|
|          2|     XXXXXXXXX|                2|2014-02-18 00:00:...|   33865|       COMPLETE|
|          2|     XXXXXXXXX|                2|2013-10-29 00:00:...|   15192|PENDING_PAYMENT|
|          3|     XXXXXXXXX|                3|2013-12-14 00:00:...|   61453|       COMPLETE|
|          3|     XXXXXXXXX|                3|2014-07-24 00:00:...|   57617|       COMPLETE|
|          3|     XXXXXXXXX|                3|2014-07-15 00:00:...|   56178|        PENDING|
|          3|     XXXXXXXXX|                3|2014-05-09 00:00:...|   46399|     PROCESSING|
|          3|     XXXXXXXXX|                3|2014-02-26 00:00:...|   35158|       COMPLETE|
|          3|     XXXXXXXXX|                3|2013-12-19 00:00:...|   23662|       COMPLETE|
|          3|     XXXXXXXXX|                3|2013-12-11 00:00:...|   22646|       COMPLETE|
|          4|     XXXXXXXXX|                4|2014-06-10 00:00:...|   51157|         CLOSED|
|          4|     XXXXXXXXX|                4|2014-05-28 00:00:...|   49339|       COMPLETE|
|          4|     XXXXXXXXX|                4|2014-03-15 00:00:...|   37878|       COMPLETE|
|          4|     XXXXXXXXX|                4|2013-11-09 00:00:...|   17253|PENDING_PAYMENT|
|          4|     XXXXXXXXX|                4|2013-09-24 00:00:...|    9704|       COMPLETE|
|          4|     XXXXXXXXX|                4|2013-09-19 00:00:...|    9023|       COMPLETE|
|          5|     XXXXXXXXX|                5|2014-05-05 00:00:...|   45832|PENDING_PAYMENT|
|          5|     XXXXXXXXX|                5|2014-04-05 00:00:...|   41333|       COMPLETE|
+-----------+--------------+-----------------+--------------------+--------+---------------+
only showing top 20 rows
customer_order_details_left. \
    filter(orders.order_id.isNull()). \
    select(customers.customer_id, customers.customer_email, orders['*']). \
    show()
+-----------+--------------+-----------------+----------+--------+------------+
|customer_id|customer_email|order_customer_id|order_date|order_id|order_status|
+-----------+--------------+-----------------+----------+--------+------------+
|        219|     XXXXXXXXX|             null|      null|    null|        null|
|        339|     XXXXXXXXX|             null|      null|    null|        null|
|        469|     XXXXXXXXX|             null|      null|    null|        null|
|       1187|     XXXXXXXXX|             null|      null|    null|        null|
|       1481|     XXXXXXXXX|             null|      null|    null|        null|
|       1808|     XXXXXXXXX|             null|      null|    null|        null|
|       2073|     XXXXXXXXX|             null|      null|    null|        null|
|       2096|     XXXXXXXXX|             null|      null|    null|        null|
|       2450|     XXXXXXXXX|             null|      null|    null|        null|
|       4555|     XXXXXXXXX|             null|      null|    null|        null|
|       4927|     XXXXXXXXX|             null|      null|    null|        null|
|       6072|     XXXXXXXXX|             null|      null|    null|        null|
|       6613|     XXXXXXXXX|             null|      null|    null|        null|
|       7011|     XXXXXXXXX|             null|      null|    null|        null|
|       7552|     XXXXXXXXX|             null|      null|    null|        null|
|       8243|     XXXXXXXXX|             null|      null|    null|        null|
|       8343|     XXXXXXXXX|             null|      null|    null|        null|
|       8575|     XXXXXXXXX|             null|      null|    null|        null|
|       8778|     XXXXXXXXX|             null|      null|    null|        null|
|       8882|     XXXXXXXXX|             null|      null|    null|        null|
+-----------+--------------+-----------------+----------+--------+------------+
only showing top 20 rows
customer_order_details_left. \
    filter(orders.order_id.isNull()). \
    count()
30
  • Alternative approach of aliases to the dataframe as part of joins.

customers. \
    alias('c'). \
    join(
        orders.alias('o'), 
        on=customers['customer_id'] == orders['order_customer_id'],
        how='left'
    ). \
    filter('o.order_id IS NULL'). \
    selectExpr('c.customer_id', 'c.customer_email', 'o.*'). \
    show()
+-----------+--------------+-----------------+----------+--------+------------+
|customer_id|customer_email|order_customer_id|order_date|order_id|order_status|
+-----------+--------------+-----------------+----------+--------+------------+
|        219|     XXXXXXXXX|             null|      null|    null|        null|
|        339|     XXXXXXXXX|             null|      null|    null|        null|
|        469|     XXXXXXXXX|             null|      null|    null|        null|
|       1187|     XXXXXXXXX|             null|      null|    null|        null|
|       1481|     XXXXXXXXX|             null|      null|    null|        null|
|       1808|     XXXXXXXXX|             null|      null|    null|        null|
|       2073|     XXXXXXXXX|             null|      null|    null|        null|
|       2096|     XXXXXXXXX|             null|      null|    null|        null|
|       2450|     XXXXXXXXX|             null|      null|    null|        null|
|       4555|     XXXXXXXXX|             null|      null|    null|        null|
|       4927|     XXXXXXXXX|             null|      null|    null|        null|
|       6072|     XXXXXXXXX|             null|      null|    null|        null|
|       6613|     XXXXXXXXX|             null|      null|    null|        null|
|       7011|     XXXXXXXXX|             null|      null|    null|        null|
|       7552|     XXXXXXXXX|             null|      null|    null|        null|
|       8243|     XXXXXXXXX|             null|      null|    null|        null|
|       8343|     XXXXXXXXX|             null|      null|    null|        null|
|       8575|     XXXXXXXXX|             null|      null|    null|        null|
|       8778|     XXXXXXXXX|             null|      null|    null|        null|
|       8882|     XXXXXXXXX|             null|      null|    null|        null|
+-----------+--------------+-----------------+----------+--------+------------+
only showing top 20 rows
customers. \
    alias('c'). \
    join(
        orders.alias('o'), 
        on=customers['customer_id'] == orders['order_customer_id'],
        how='left'
    ). \
    filter('o.order_id IS NULL'). \
    count()
30
  • Get the number of orders placed by each customer for the year 2013. If a customer have not placed any order get the order count for the customer as 0.

orders = spark.read.json('/public/retail_db_json/orders')
customers = spark.read.json('/public/retail_db_json/customers')
orders_filtered = orders.filter("order_date LIKE '2013%'")
customer_order_details_left = customers.alias('c'). \
    join(
    orders_filtered.alias('o'), 
    on=customers['customer_id'] == orders_filtered['order_customer_id'],
    how='left'
)
orders_filtered.count()
30662
customer_order_details_left.count()
31746
orders_filtered.select('order_customer_id').distinct().count()
11351
from pyspark.sql.functions import col, lit, expr, sum
customer_order_details_left. \
    groupBy('customer_id', 'customer_email'). \
    agg(sum(expr('CASE WHEN order_id IS NULL THEN 0 ELSE 1 END')).alias('order_count')). \
    orderBy('order_count', 'customer_id'). \
    show()
+-----------+--------------+-----------+
|customer_id|customer_email|order_count|
+-----------+--------------+-----------+
|         10|     XXXXXXXXX|          0|
|         28|     XXXXXXXXX|          0|
|         43|     XXXXXXXXX|          0|
|         47|     XXXXXXXXX|          0|
|         55|     XXXXXXXXX|          0|
|         56|     XXXXXXXXX|          0|
|         59|     XXXXXXXXX|          0|
|         78|     XXXXXXXXX|          0|
|         91|     XXXXXXXXX|          0|
|         98|     XXXXXXXXX|          0|
|        129|     XXXXXXXXX|          0|
|        143|     XXXXXXXXX|          0|
|        144|     XXXXXXXXX|          0|
|        186|     XXXXXXXXX|          0|
|        200|     XXXXXXXXX|          0|
|        212|     XXXXXXXXX|          0|
|        219|     XXXXXXXXX|          0|
|        238|     XXXXXXXXX|          0|
|        253|     XXXXXXXXX|          0|
|        261|     XXXXXXXXX|          0|
+-----------+--------------+-----------+
only showing top 20 rows
customer_order_details_left. \
    groupBy('customer_id', 'customer_email'). \
    agg(sum(expr('CASE WHEN order_id IS NULL THEN 0 ELSE 1 END')).alias('order_count')). \
    orderBy('order_count', 'customer_id'). \
    count()
12435
customer_order_details_left.filter('order_id IS NULL').count()
1084