Using Inner JoinΒΆ
Let us understand about inner join in Spark. Here are the steps we typically follow for joining data frames.
Read the data sets that are supposed to be joined from files into respective data frames.
Optionally we filter the data, if filter is involved as per the requirements.
Join both the data sets using inner join. We will get the data satisfying the join condition in the form of a new Data Frame.
Once the Data Frame with Join Results is created we can refer the columns from both the data sets after the join using the original data frame for further processing
Let us start spark context for this Notebook so that we can execute the code provided. You can sign up for our 10 node state of the art cluster/labs to learn Spark SQL using our unique integrated LMS.
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession. \
builder. \
config('spark.ui.port', '0'). \
config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
enableHiveSupport(). \
appName(f'{username} | Python - Joining Data Sets'). \
master('yarn'). \
getOrCreate()
If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.
Using Spark SQL
spark2-sql \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
Using Scala
spark2-shell \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
Using Pyspark
pyspark2 \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
Read orders and order_items data sets
spark.conf.set("spark.sql.shuffle.partitions", "2")
orders = spark.read.json('/public/retail_db_json/orders')
order_items = spark.read.json('/public/retail_db_json/order_items')
orders.printSchema()
root
|-- order_customer_id: long (nullable = true)
|-- order_date: string (nullable = true)
|-- order_id: long (nullable = true)
|-- order_status: string (nullable = true)
order_items.printSchema()
root
|-- order_item_id: long (nullable = true)
|-- order_item_order_id: long (nullable = true)
|-- order_item_product_id: long (nullable = true)
|-- order_item_product_price: double (nullable = true)
|-- order_item_quantity: long (nullable = true)
|-- order_item_subtotal: double (nullable = true)
Join orders and order_items based up on the common key. In orders it is known as order_id and in order_items it is known as order_item_order_id.
help(orders.join)
Help on method join in module pyspark.sql.dataframe:
join(other, on=None, how=None) method of pyspark.sql.dataframe.DataFrame instance
Joins with another :class:`DataFrame`, using the given join expression.
:param other: Right side of the join
:param on: a string for the join column name, a list of column names,
a join expression (Column), or a list of Columns.
If `on` is a string or a list of strings indicating the name of the join column(s),
the column(s) must exist on both sides, and this performs an equi-join.
:param how: str, default ``inner``. Must be one of: ``inner``, ``cross``, ``outer``,
``full``, ``full_outer``, ``left``, ``left_outer``, ``right``, ``right_outer``,
``left_semi``, and ``left_anti``.
The following performs a full outer join between ``df1`` and ``df2``.
>>> df.join(df2, df.name == df2.name, 'outer').select(df.name, df2.height).collect()
[Row(name=None, height=80), Row(name='Bob', height=85), Row(name='Alice', height=None)]
>>> df.join(df2, 'name', 'outer').select('name', 'height').collect()
[Row(name='Tom', height=80), Row(name='Bob', height=85), Row(name='Alice', height=None)]
>>> cond = [df.name == df3.name, df.age == df3.age]
>>> df.join(df3, cond, 'outer').select(df.name, df3.age).collect()
[Row(name='Alice', age=2), Row(name='Bob', age=5)]
>>> df.join(df2, 'name').select(df.name, df2.height).collect()
[Row(name='Bob', height=85)]
>>> df.join(df4, ['name', 'age']).select(df.name, df.age).collect()
[Row(name='Bob', age=5)]
.. versionadded:: 1.3
orders_join = orders.join(
order_items,
orders.order_id == order_items.order_item_order_id
)
orders_join = orders.join(
order_items,
on=orders.order_id == order_items.order_item_order_id
)
orders_join = orders.join(
order_items,
on=orders['order_id'] == order_items['order_item_order_id']
)
orders_join = orders.join(
order_items,
on=orders['order_id'] == order_items['order_item_order_id'],
how='inner'
)
orders_join.printSchema()
root
|-- order_customer_id: long (nullable = true)
|-- order_date: string (nullable = true)
|-- order_id: long (nullable = true)
|-- order_status: string (nullable = true)
|-- order_item_id: long (nullable = true)
|-- order_item_order_id: long (nullable = true)
|-- order_item_product_id: long (nullable = true)
|-- order_item_product_price: double (nullable = true)
|-- order_item_quantity: long (nullable = true)
|-- order_item_subtotal: double (nullable = true)
orders_join.show()
+-----------------+--------------------+--------+---------------+-------------+-------------------+---------------------+------------------------+-------------------+-------------------+
|order_customer_id| order_date|order_id| order_status|order_item_id|order_item_order_id|order_item_product_id|order_item_product_price|order_item_quantity|order_item_subtotal|
+-----------------+--------------------+--------+---------------+-------------+-------------------+---------------------+------------------------+-------------------+-------------------+
| 11599|2013-07-25 00:00:...| 1| CLOSED| 1| 1| 957| 299.98| 1| 299.98|
| 256|2013-07-25 00:00:...| 2|PENDING_PAYMENT| 2| 2| 1073| 199.99| 1| 199.99|
| 256|2013-07-25 00:00:...| 2|PENDING_PAYMENT| 3| 2| 502| 50.0| 5| 250.0|
| 256|2013-07-25 00:00:...| 2|PENDING_PAYMENT| 4| 2| 403| 129.99| 1| 129.99|
| 8827|2013-07-25 00:00:...| 4| CLOSED| 5| 4| 897| 24.99| 2| 49.98|
| 8827|2013-07-25 00:00:...| 4| CLOSED| 6| 4| 365| 59.99| 5| 299.95|
| 8827|2013-07-25 00:00:...| 4| CLOSED| 7| 4| 502| 50.0| 3| 150.0|
| 8827|2013-07-25 00:00:...| 4| CLOSED| 8| 4| 1014| 49.98| 4| 199.92|
| 11318|2013-07-25 00:00:...| 5| COMPLETE| 9| 5| 957| 299.98| 1| 299.98|
| 11318|2013-07-25 00:00:...| 5| COMPLETE| 10| 5| 365| 59.99| 5| 299.95|
| 11318|2013-07-25 00:00:...| 5| COMPLETE| 11| 5| 1014| 49.98| 2| 99.96|
| 11318|2013-07-25 00:00:...| 5| COMPLETE| 12| 5| 957| 299.98| 1| 299.98|
| 11318|2013-07-25 00:00:...| 5| COMPLETE| 13| 5| 403| 129.99| 1| 129.99|
| 4530|2013-07-25 00:00:...| 7| COMPLETE| 14| 7| 1073| 199.99| 1| 199.99|
| 4530|2013-07-25 00:00:...| 7| COMPLETE| 15| 7| 957| 299.98| 1| 299.98|
| 4530|2013-07-25 00:00:...| 7| COMPLETE| 16| 7| 926| 15.99| 5| 79.95|
| 2911|2013-07-25 00:00:...| 8| PROCESSING| 17| 8| 365| 59.99| 3| 179.97|
| 2911|2013-07-25 00:00:...| 8| PROCESSING| 18| 8| 365| 59.99| 5| 299.95|
| 2911|2013-07-25 00:00:...| 8| PROCESSING| 19| 8| 1014| 49.98| 4| 199.92|
| 2911|2013-07-25 00:00:...| 8| PROCESSING| 20| 8| 502| 50.0| 1| 50.0|
+-----------------+--------------------+--------+---------------+-------------+-------------------+---------------------+------------------------+-------------------+-------------------+
only showing top 20 rows
orders.count()
68883
order_items.count()
172198
orders_join.count()
172198
Project all the fields from orders and then order_item_subtotal from order_items.
orders. \
join(
order_items,
on=orders['order_id'] == order_items['order_item_order_id'],
how='inner'
). \
select(orders['*'], order_items['order_item_subtotal']). \
show()
+-----------------+--------------------+--------+---------------+-------------------+
|order_customer_id| order_date|order_id| order_status|order_item_subtotal|
+-----------------+--------------------+--------+---------------+-------------------+
| 11599|2013-07-25 00:00:...| 1| CLOSED| 299.98|
| 256|2013-07-25 00:00:...| 2|PENDING_PAYMENT| 199.99|
| 256|2013-07-25 00:00:...| 2|PENDING_PAYMENT| 250.0|
| 256|2013-07-25 00:00:...| 2|PENDING_PAYMENT| 129.99|
| 8827|2013-07-25 00:00:...| 4| CLOSED| 49.98|
| 8827|2013-07-25 00:00:...| 4| CLOSED| 299.95|
| 8827|2013-07-25 00:00:...| 4| CLOSED| 150.0|
| 8827|2013-07-25 00:00:...| 4| CLOSED| 199.92|
| 11318|2013-07-25 00:00:...| 5| COMPLETE| 299.98|
| 11318|2013-07-25 00:00:...| 5| COMPLETE| 299.95|
| 11318|2013-07-25 00:00:...| 5| COMPLETE| 99.96|
| 11318|2013-07-25 00:00:...| 5| COMPLETE| 299.98|
| 11318|2013-07-25 00:00:...| 5| COMPLETE| 129.99|
| 4530|2013-07-25 00:00:...| 7| COMPLETE| 199.99|
| 4530|2013-07-25 00:00:...| 7| COMPLETE| 299.98|
| 4530|2013-07-25 00:00:...| 7| COMPLETE| 79.95|
| 2911|2013-07-25 00:00:...| 8| PROCESSING| 179.97|
| 2911|2013-07-25 00:00:...| 8| PROCESSING| 299.95|
| 2911|2013-07-25 00:00:...| 8| PROCESSING| 199.92|
| 2911|2013-07-25 00:00:...| 8| PROCESSING| 50.0|
+-----------------+--------------------+--------+---------------+-------------------+
only showing top 20 rows
Project order_id, order_date, order_status from orders and order_item_subtotal from order_items.
orders. \
join(
order_items,
orders['order_id'] == order_items['order_item_order_id']
). \
select(orders.order_id, orders.order_date, orders.order_status, order_items.order_item_subtotal). \
show()
+--------+--------------------+---------------+-------------------+
|order_id| order_date| order_status|order_item_subtotal|
+--------+--------------------+---------------+-------------------+
| 1|2013-07-25 00:00:...| CLOSED| 299.98|
| 2|2013-07-25 00:00:...|PENDING_PAYMENT| 199.99|
| 2|2013-07-25 00:00:...|PENDING_PAYMENT| 250.0|
| 2|2013-07-25 00:00:...|PENDING_PAYMENT| 129.99|
| 4|2013-07-25 00:00:...| CLOSED| 49.98|
| 4|2013-07-25 00:00:...| CLOSED| 299.95|
| 4|2013-07-25 00:00:...| CLOSED| 150.0|
| 4|2013-07-25 00:00:...| CLOSED| 199.92|
| 5|2013-07-25 00:00:...| COMPLETE| 299.98|
| 5|2013-07-25 00:00:...| COMPLETE| 299.95|
| 5|2013-07-25 00:00:...| COMPLETE| 99.96|
| 5|2013-07-25 00:00:...| COMPLETE| 299.98|
| 5|2013-07-25 00:00:...| COMPLETE| 129.99|
| 7|2013-07-25 00:00:...| COMPLETE| 199.99|
| 7|2013-07-25 00:00:...| COMPLETE| 299.98|
| 7|2013-07-25 00:00:...| COMPLETE| 79.95|
| 8|2013-07-25 00:00:...| PROCESSING| 179.97|
| 8|2013-07-25 00:00:...| PROCESSING| 299.95|
| 8|2013-07-25 00:00:...| PROCESSING| 199.92|
| 8|2013-07-25 00:00:...| PROCESSING| 50.0|
+--------+--------------------+---------------+-------------------+
only showing top 20 rows
orders. \
join(
order_items,
on=orders['order_id'] == order_items['order_item_order_id']
). \
select(orders['order_id'], orders['order_date'], orders['order_status'], order_items['order_item_subtotal']). \
show()
+--------+--------------------+---------------+-------------------+
|order_id| order_date| order_status|order_item_subtotal|
+--------+--------------------+---------------+-------------------+
| 1|2013-07-25 00:00:...| CLOSED| 299.98|
| 2|2013-07-25 00:00:...|PENDING_PAYMENT| 199.99|
| 2|2013-07-25 00:00:...|PENDING_PAYMENT| 250.0|
| 2|2013-07-25 00:00:...|PENDING_PAYMENT| 129.99|
| 4|2013-07-25 00:00:...| CLOSED| 49.98|
| 4|2013-07-25 00:00:...| CLOSED| 299.95|
| 4|2013-07-25 00:00:...| CLOSED| 150.0|
| 4|2013-07-25 00:00:...| CLOSED| 199.92|
| 5|2013-07-25 00:00:...| COMPLETE| 299.98|
| 5|2013-07-25 00:00:...| COMPLETE| 299.95|
| 5|2013-07-25 00:00:...| COMPLETE| 99.96|
| 5|2013-07-25 00:00:...| COMPLETE| 299.98|
| 5|2013-07-25 00:00:...| COMPLETE| 129.99|
| 7|2013-07-25 00:00:...| COMPLETE| 199.99|
| 7|2013-07-25 00:00:...| COMPLETE| 299.98|
| 7|2013-07-25 00:00:...| COMPLETE| 79.95|
| 8|2013-07-25 00:00:...| PROCESSING| 179.97|
| 8|2013-07-25 00:00:...| PROCESSING| 299.95|
| 8|2013-07-25 00:00:...| PROCESSING| 199.92|
| 8|2013-07-25 00:00:...| PROCESSING| 50.0|
+--------+--------------------+---------------+-------------------+
only showing top 20 rows