Overview of Sorting Data FramesΒΆ
Let us understand how to sort the data in a Data Frame.
We can use
orderBy
orsort
to sort the data.We can perform composite sorting by passing multiple columns or expressions.
By default data is sorted in ascending order, we can change it to descending by applying
desc()
function on the column or expression.If the sort column contain null values those will come first. We can change the position of nulls to last.
Let us start spark context for this Notebook so that we can execute the code provided. You can sign up for our 10 node state of the art cluster/labs to learn Spark SQL using our unique integrated LMS.
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession. \
builder. \
config('spark.ui.port', '0'). \
config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
enableHiveSupport(). \
appName(f'{username} | Python - Basic Transformations'). \
master('yarn'). \
getOrCreate()
If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.
Using Spark SQL
spark2-sql \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
Using Scala
spark2-shell \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
Using Pyspark
pyspark2 \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
airtraffic_path = "/public/airtraffic_all/airtraffic-part/flightmonth=200801"
airtraffic = spark. \
read. \
parquet(airtraffic_path)
Get daily count of cancelled flights where data is sorted in ascending order by count of cancelled flights.
from pyspark.sql.functions import col, concat, lpad, lit, count
airtraffic. \
filter('cancelled = 1'). \
groupBy(
concat(
col("Year"),
lpad(col("Month"), 2, "0"),
lpad(col("DayOfMonth"), 2, "0")
).alias('FlightDate')
). \
agg(count(lit(1)).alias('FlightCount')). \
show()
+----------+-----------+
|FlightDate|FlightCount|
+----------+-----------+
| 20080120| 247|
| 20080130| 694|
| 20080115| 299|
| 20080118| 230|
| 20080122| 788|
| 20080104| 769|
| 20080125| 526|
| 20080102| 511|
| 20080105| 456|
| 20080111| 524|
| 20080109| 377|
| 20080127| 638|
| 20080101| 552|
| 20080128| 654|
| 20080119| 876|
| 20080106| 683|
| 20080123| 530|
| 20080117| 872|
| 20080116| 532|
| 20080112| 226|
+----------+-----------+
only showing top 20 rows
airtraffic. \
filter('cancelled = 1'). \
groupBy(
concat(
col("Year"),
lpad(col("Month"), 2, "0"),
lpad(col("DayOfMonth"), 2, "0")
).alias('FlightDate')
). \
agg(count(lit(1)).alias('FlightCount')). \
orderBy('FlightCount'). \
show(31)
+----------+-----------+
|FlightDate|FlightCount|
+----------+-----------+
| 20080112| 226|
| 20080118| 230|
| 20080120| 247|
| 20080115| 299|
| 20080124| 322|
| 20080110| 341|
| 20080113| 359|
| 20080109| 377|
| 20080126| 416|
| 20080105| 456|
| 20080108| 463|
| 20080103| 475|
| 20080121| 475|
| 20080102| 511|
| 20080111| 524|
| 20080125| 526|
| 20080123| 530|
| 20080116| 532|
| 20080101| 552|
| 20080107| 579|
| 20080127| 638|
| 20080128| 654|
| 20080106| 683|
| 20080130| 694|
| 20080104| 769|
| 20080122| 788|
| 20080117| 872|
| 20080119| 876|
| 20080129| 889|
| 20080114| 909|
| 20080131| 1081|
+----------+-----------+
airtraffic. \
filter('cancelled = 1'). \
groupBy(
concat(
col("Year"),
lpad(col("Month"), 2, "0"),
lpad(col("DayOfMonth"), 2, "0")
).alias('FlightDate')
). \
agg(count(lit(1)).alias('FlightCount')). \
orderBy(col('FlightCount').asc()). \
show(31)
+----------+-----------+
|FlightDate|FlightCount|
+----------+-----------+
| 20080112| 226|
| 20080118| 230|
| 20080120| 247|
| 20080115| 299|
| 20080124| 322|
| 20080110| 341|
| 20080113| 359|
| 20080109| 377|
| 20080126| 416|
| 20080105| 456|
| 20080108| 463|
| 20080103| 475|
| 20080121| 475|
| 20080102| 511|
| 20080111| 524|
| 20080125| 526|
| 20080123| 530|
| 20080116| 532|
| 20080101| 552|
| 20080107| 579|
| 20080127| 638|
| 20080128| 654|
| 20080106| 683|
| 20080130| 694|
| 20080104| 769|
| 20080122| 788|
| 20080117| 872|
| 20080119| 876|
| 20080129| 889|
| 20080114| 909|
| 20080131| 1081|
+----------+-----------+
Get daily count of cancelled flights where data is sorted in descending order by count of cancelled flights.
airtraffic. \
filter('cancelled = 1'). \
groupBy(
concat(
col("Year"),
lpad(col("Month"), 2, "0"),
lpad(col("DayOfMonth"), 2, "0")
).alias('FlightDate')
). \
agg(count(lit(1)).alias('FlightCount')). \
orderBy(col('FlightCount').desc()). \
show(31)
+----------+-----------+
|FlightDate|FlightCount|
+----------+-----------+
| 20080131| 1081|
| 20080114| 909|
| 20080129| 889|
| 20080119| 876|
| 20080117| 872|
| 20080122| 788|
| 20080104| 769|
| 20080130| 694|
| 20080106| 683|
| 20080128| 654|
| 20080127| 638|
| 20080107| 579|
| 20080101| 552|
| 20080116| 532|
| 20080123| 530|
| 20080125| 526|
| 20080111| 524|
| 20080102| 511|
| 20080121| 475|
| 20080103| 475|
| 20080108| 463|
| 20080105| 456|
| 20080126| 416|
| 20080109| 377|
| 20080113| 359|
| 20080110| 341|
| 20080124| 322|
| 20080115| 299|
| 20080120| 247|
| 20080118| 230|
| 20080112| 226|
+----------+-----------+
Project Year, Month, DayOfMonth, CRSDepTime and Origin for this task.
Sort the data based up on year, month, day of month and then using scheduled time. Data should be sorted in ascending order by year, month and day of month then in descending order by scheduled time.
airtraffic. \
select('Year', 'Month', 'DayOfMonth', 'CRSDepTime', 'Origin'). \
show()
+----+-----+----------+----------+------+
|Year|Month|DayOfMonth|CRSDepTime|Origin|
+----+-----+----------+----------+------+
|2008| 1| 16| 1735| BGR|
|2008| 1| 17| 1701| SYR|
|2008| 1| 17| 1225| SAV|
|2008| 1| 17| 1530| CVG|
|2008| 1| 17| 1205| STL|
|2008| 1| 18| 1150| STL|
|2008| 1| 18| 1009| MCI|
|2008| 1| 19| 835| TUL|
|2008| 1| 20| 1935| JFK|
|2008| 1| 20| 830| RDU|
|2008| 1| 21| 1640| CVG|
|2008| 1| 21| 1204| MSY|
|2008| 1| 21| 1935| JFK|
|2008| 1| 21| 1830| DCA|
|2008| 1| 21| 700| HSV|
|2008| 1| 22| 1910| ORD|
|2008| 1| 22| 1320| CVG|
|2008| 1| 23| 908| LGA|
|2008| 1| 23| 1252| CLT|
|2008| 1| 23| 635| GSP|
+----+-----+----------+----------+------+
only showing top 20 rows
airtraffic. \
select('Year', 'Month', 'DayOfMonth', 'CRSDepTime', 'Origin'). \
orderBy('Year', 'Month', 'DayOfMonth', 'CRSDepTime'). \
show()
+----+-----+----------+----------+------+
|Year|Month|DayOfMonth|CRSDepTime|Origin|
+----+-----+----------+----------+------+
|2008| 1| 1| 10| LAX|
|2008| 1| 1| 15| SMF|
|2008| 1| 1| 25| SMF|
|2008| 1| 1| 25| PHX|
|2008| 1| 1| 30| ANC|
|2008| 1| 1| 30| LAX|
|2008| 1| 1| 30| LAS|
|2008| 1| 1| 30| ONT|
|2008| 1| 1| 35| MCO|
|2008| 1| 1| 35| SFO|
|2008| 1| 1| 40| LAX|
|2008| 1| 1| 40| LAS|
|2008| 1| 1| 40| LAX|
|2008| 1| 1| 40| SEA|
|2008| 1| 1| 40| SFO|
|2008| 1| 1| 40| SEA|
|2008| 1| 1| 45| PHX|
|2008| 1| 1| 45| LAS|
|2008| 1| 1| 50| ANC|
|2008| 1| 1| 53| PDX|
+----+-----+----------+----------+------+
only showing top 20 rows
airtraffic. \
select('Year', 'Month', 'DayOfMonth', 'CRSDepTime', 'Origin'). \
orderBy('Year', 'Month', 'DayOfMonth', col('CRSDepTime').desc()). \
show()
+----+-----+----------+----------+------+
|Year|Month|DayOfMonth|CRSDepTime|Origin|
+----+-----+----------+----------+------+
|2008| 1| 1| 2359| PHX|
|2008| 1| 1| 2359| LAS|
|2008| 1| 1| 2359| LAS|
|2008| 1| 1| 2359| PHX|
|2008| 1| 1| 2359| SLC|
|2008| 1| 1| 2359| LAS|
|2008| 1| 1| 2359| LAS|
|2008| 1| 1| 2359| LAS|
|2008| 1| 1| 2359| LAS|
|2008| 1| 1| 2359| LAS|
|2008| 1| 1| 2359| LAS|
|2008| 1| 1| 2359| SLC|
|2008| 1| 1| 2359| SEA|
|2008| 1| 1| 2359| LAS|
|2008| 1| 1| 2359| LAS|
|2008| 1| 1| 2359| TUS|
|2008| 1| 1| 2359| LAS|
|2008| 1| 1| 2358| LAS|
|2008| 1| 1| 2358| LAS|
|2008| 1| 1| 2356| LAS|
+----+-----+----------+----------+------+
only showing top 20 rows
Create employees Data Frame and get employees data in ascending order by nationality. However, data related to United States should come at top always.
employees = [(1, "Scott", "Tiger", 1000.0, 10,
"united states", "+1 123 456 7890", "123 45 6789"
),
(2, "Henry", "Ford", 1250.0, None,
"India", "+91 234 567 8901", "456 78 9123"
),
(3, "Nick", "Junior", 750.0, '',
"united KINGDOM", "+44 111 111 1111", "222 33 4444"
),
(4, "Bill", "Gomes", 1500.0, 2,
"AUSTRALIA", "+61 987 654 3210", "789 12 6118"
)
]
employeesDF = spark. \
createDataFrame(employees,
schema="""employee_id INT, first_name STRING,
last_name STRING, salary FLOAT, bonus STRING, nationality STRING,
phone_number STRING, ssn STRING"""
)
employeesDF.show()
+-----------+----------+---------+------+-----+--------------+----------------+-----------+
|employee_id|first_name|last_name|salary|bonus| nationality| phone_number| ssn|
+-----------+----------+---------+------+-----+--------------+----------------+-----------+
| 1| Scott| Tiger|1000.0| 10| united states| +1 123 456 7890|123 45 6789|
| 2| Henry| Ford|1250.0| null| India|+91 234 567 8901|456 78 9123|
| 3| Nick| Junior| 750.0| |united KINGDOM|+44 111 111 1111|222 33 4444|
| 4| Bill| Gomes|1500.0| 2| AUSTRALIA|+61 987 654 3210|789 12 6118|
+-----------+----------+---------+------+-----+--------------+----------------+-----------+
from pyspark.sql.functions import col, upper, when
when?
Signature: when(condition, value)
Docstring:
Evaluates a list of conditions and returns one of multiple possible result expressions.
If :func:`Column.otherwise` is not invoked, None is returned for unmatched conditions.
:param condition: a boolean :class:`Column` expression.
:param value: a literal value, or a :class:`Column` expression.
>>> df.select(when(df['age'] == 2, 3).otherwise(4).alias("age")).collect()
[Row(age=3), Row(age=4)]
>>> df.select(when(df.age == 2, df.age + 1).alias("age")).collect()
[Row(age=3), Row(age=None)]
.. versionadded:: 1.4
File: /opt/spark-2.4.7-bin-hadoop2.7/python/pyspark/sql/functions.py
Type: function
employeesDF. \
withColumn('sort_column', when(upper(col('nationality')) == 'UNITED STATES', 0).otherwise(1)). \
orderBy('sort_column', 'nationality'). \
show()
+-----------+----------+---------+------+-----+--------------+----------------+-----------+-----------+
|employee_id|first_name|last_name|salary|bonus| nationality| phone_number| ssn|sort_column|
+-----------+----------+---------+------+-----+--------------+----------------+-----------+-----------+
| 1| Scott| Tiger|1000.0| 10| united states| +1 123 456 7890|123 45 6789| 0|
| 4| Bill| Gomes|1500.0| 2| AUSTRALIA|+61 987 654 3210|789 12 6118| 1|
| 2| Henry| Ford|1250.0| null| India|+91 234 567 8901|456 78 9123| 1|
| 3| Nick| Junior| 750.0| |united KINGDOM|+44 111 111 1111|222 33 4444| 1|
+-----------+----------+---------+------+-----+--------------+----------------+-----------+-----------+
from pyspark.sql.functions import expr
employeesDF. \
withColumn(
'sort_column',
expr("""CASE WHEN upper(nationality) = 'UNITED STATES' THEN 0 else 1 END""")
). \
orderBy('sort_column', 'nationality'). \
show()
+-----------+----------+---------+------+-----+--------------+----------------+-----------+-----------+
|employee_id|first_name|last_name|salary|bonus| nationality| phone_number| ssn|sort_column|
+-----------+----------+---------+------+-----+--------------+----------------+-----------+-----------+
| 1| Scott| Tiger|1000.0| 10| united states| +1 123 456 7890|123 45 6789| 0|
| 4| Bill| Gomes|1500.0| 2| AUSTRALIA|+61 987 654 3210|789 12 6118| 1|
| 2| Henry| Ford|1250.0| null| India|+91 234 567 8901|456 78 9123| 1|
| 3| Nick| Junior| 750.0| |united KINGDOM|+44 111 111 1111|222 33 4444| 1|
+-----------+----------+---------+------+-----+--------------+----------------+-----------+-----------+
Sort the data in employeesDF using bonus. Data should be sorted numerically and null and empty values should come at the end.
employeesDF.show()
+-----------+----------+---------+------+-----+--------------+----------------+-----------+
|employee_id|first_name|last_name|salary|bonus| nationality| phone_number| ssn|
+-----------+----------+---------+------+-----+--------------+----------------+-----------+
| 1| Scott| Tiger|1000.0| 10| united states| +1 123 456 7890|123 45 6789|
| 2| Henry| Ford|1250.0| null| India|+91 234 567 8901|456 78 9123|
| 3| Nick| Junior| 750.0| |united KINGDOM|+44 111 111 1111|222 33 4444|
| 4| Bill| Gomes|1500.0| 2| AUSTRALIA|+61 987 654 3210|789 12 6118|
+-----------+----------+---------+------+-----+--------------+----------------+-----------+
employeesDF.printSchema()
root
|-- employee_id: integer (nullable = true)
|-- first_name: string (nullable = true)
|-- last_name: string (nullable = true)
|-- salary: float (nullable = true)
|-- bonus: string (nullable = true)
|-- nationality: string (nullable = true)
|-- phone_number: string (nullable = true)
|-- ssn: string (nullable = true)
employeesDF. \
orderBy('bonus'). \
show()
+-----------+----------+---------+------+-----+--------------+----------------+-----------+
|employee_id|first_name|last_name|salary|bonus| nationality| phone_number| ssn|
+-----------+----------+---------+------+-----+--------------+----------------+-----------+
| 2| Henry| Ford|1250.0| null| India|+91 234 567 8901|456 78 9123|
| 3| Nick| Junior| 750.0| |united KINGDOM|+44 111 111 1111|222 33 4444|
| 1| Scott| Tiger|1000.0| 10| united states| +1 123 456 7890|123 45 6789|
| 4| Bill| Gomes|1500.0| 2| AUSTRALIA|+61 987 654 3210|789 12 6118|
+-----------+----------+---------+------+-----+--------------+----------------+-----------+
employeesDF. \
orderBy(col('bonus').cast('int')). \
show()
+-----------+----------+---------+------+-----+--------------+----------------+-----------+
|employee_id|first_name|last_name|salary|bonus| nationality| phone_number| ssn|
+-----------+----------+---------+------+-----+--------------+----------------+-----------+
| 3| Nick| Junior| 750.0| |united KINGDOM|+44 111 111 1111|222 33 4444|
| 2| Henry| Ford|1250.0| null| India|+91 234 567 8901|456 78 9123|
| 4| Bill| Gomes|1500.0| 2| AUSTRALIA|+61 987 654 3210|789 12 6118|
| 1| Scott| Tiger|1000.0| 10| united states| +1 123 456 7890|123 45 6789|
+-----------+----------+---------+------+-----+--------------+----------------+-----------+
c = col('X')
help(c)
help(c.asc_nulls_last)
Help on method _ in module pyspark.sql.column:
_() method of pyspark.sql.column.Column instance
Returns a sort expression based on ascending order of the column, and null values
appear after non-null values.
>>> from pyspark.sql import Row
>>> df = spark.createDataFrame([('Tom', 80), (None, 60), ('Alice', None)], ["name", "height"])
>>> df.select(df.name).orderBy(df.name.asc_nulls_last()).collect()
[Row(name='Alice'), Row(name='Tom'), Row(name=None)]
.. versionadded:: 2.4
employeesDF. \
orderBy(employeesDF.bonus.cast('int').asc_nulls_last()). \
show()
+-----------+----------+---------+------+-----+--------------+----------------+-----------+
|employee_id|first_name|last_name|salary|bonus| nationality| phone_number| ssn|
+-----------+----------+---------+------+-----+--------------+----------------+-----------+
| 4| Bill| Gomes|1500.0| 2| AUSTRALIA|+61 987 654 3210|789 12 6118|
| 1| Scott| Tiger|1000.0| 10| united states| +1 123 456 7890|123 45 6789|
| 3| Nick| Junior| 750.0| |united KINGDOM|+44 111 111 1111|222 33 4444|
| 2| Henry| Ford|1250.0| null| India|+91 234 567 8901|456 78 9123|
+-----------+----------+---------+------+-----+--------------+----------------+-----------+