Dealing with NullsΒΆ
Let us understand how to deal with nulls using functions that are available in Spark.
We can use
coalesce
to return first non null value.We also have traditional SQL style functions such as
nvl
. However, they can be used either withexpr
orselectExpr
.
Let us start spark context for this Notebook so that we can execute the code provided. You can sign up for our 10 node state of the art cluster/labs to learn Spark SQL using our unique integrated LMS.
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession. \
builder. \
config('spark.ui.port', '0'). \
config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
enableHiveSupport(). \
appName(f'{username} | Python - Processing Column Data'). \
master('yarn'). \
getOrCreate()
If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.
Using Spark SQL
spark2-sql \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
Using Scala
spark2-shell \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
Using Pyspark
pyspark2 \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
employees = [(1, "Scott", "Tiger", 1000.0, 10,
"united states", "+1 123 456 7890", "123 45 6789"
),
(2, "Henry", "Ford", 1250.0, None,
"India", "+91 234 567 8901", "456 78 9123"
),
(3, "Nick", "Junior", 750.0, '',
"united KINGDOM", "+44 111 111 1111", "222 33 4444"
),
(4, "Bill", "Gomes", 1500.0, 10,
"AUSTRALIA", "+61 987 654 3210", "789 12 6118"
)
]
employeesDF = spark. \
createDataFrame(employees,
schema="""employee_id INT, first_name STRING,
last_name STRING, salary FLOAT, bonus STRING, nationality STRING,
phone_number STRING, ssn STRING"""
)
employeesDF.show()
+-----------+----------+---------+------+-----+--------------+----------------+-----------+
|employee_id|first_name|last_name|salary|bonus| nationality| phone_number| ssn|
+-----------+----------+---------+------+-----+--------------+----------------+-----------+
| 1| Scott| Tiger|1000.0| 10| united states| +1 123 456 7890|123 45 6789|
| 2| Henry| Ford|1250.0| null| India|+91 234 567 8901|456 78 9123|
| 3| Nick| Junior| 750.0| |united KINGDOM|+44 111 111 1111|222 33 4444|
| 4| Bill| Gomes|1500.0| 10| AUSTRALIA|+61 987 654 3210|789 12 6118|
+-----------+----------+---------+------+-----+--------------+----------------+-----------+
from pyspark.sql.functions import coalesce
employeesDF. \
withColumn('bonus', coalesce('bonus', 0)). \
show()
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-6-a3e3c43ef418> in <module>
1 employeesDF. \
----> 2 withColumn('bonus', coalesce('bonus', 0)). \
3 show()
/usr/hdp/current/spark2-client/python/pyspark/sql/functions.py in coalesce(*cols)
323 """
324 sc = SparkContext._active_spark_context
--> 325 jc = sc._jvm.functions.coalesce(_to_seq(sc, cols, _to_java_column))
326 return Column(jc)
327
/usr/hdp/current/spark2-client/python/pyspark/sql/column.py in _to_seq(sc, cols, converter)
64 """
65 if converter:
---> 66 cols = [converter(c) for c in cols]
67 return sc._jvm.PythonUtils.toSeq(cols)
68
/usr/hdp/current/spark2-client/python/pyspark/sql/column.py in <listcomp>(.0)
64 """
65 if converter:
---> 66 cols = [converter(c) for c in cols]
67 return sc._jvm.PythonUtils.toSeq(cols)
68
/usr/hdp/current/spark2-client/python/pyspark/sql/column.py in _to_java_column(col)
52 "{0} of type {1}. "
53 "For column literals, use 'lit', 'array', 'struct' or 'create_map' "
---> 54 "function.".format(col, type(col)))
55 return jcol
56
TypeError: Invalid argument, not a string or column: 0 of type <class 'int'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.
from pyspark.sql.functions import lit
employeesDF. \
withColumn('bonus1', coalesce('bonus', lit(0))). \
show()
+-----------+----------+---------+------+-----+--------------+----------------+-----------+------+
|employee_id|first_name|last_name|salary|bonus| nationality| phone_number| ssn|bonus1|
+-----------+----------+---------+------+-----+--------------+----------------+-----------+------+
| 1| Scott| Tiger|1000.0| 10| united states| +1 123 456 7890|123 45 6789| 10|
| 2| Henry| Ford|1250.0| null| India|+91 234 567 8901|456 78 9123| 0|
| 3| Nick| Junior| 750.0| |united KINGDOM|+44 111 111 1111|222 33 4444| |
| 4| Bill| Gomes|1500.0| 10| AUSTRALIA|+61 987 654 3210|789 12 6118| 10|
+-----------+----------+---------+------+-----+--------------+----------------+-----------+------+
from pyspark.sql.functions import col
employeesDF. \
withColumn('bonus1', col('bonus').cast('int')). \
show()
+-----------+----------+---------+------+-----+--------------+----------------+-----------+------+
|employee_id|first_name|last_name|salary|bonus| nationality| phone_number| ssn|bonus1|
+-----------+----------+---------+------+-----+--------------+----------------+-----------+------+
| 1| Scott| Tiger|1000.0| 10| united states| +1 123 456 7890|123 45 6789| 10|
| 2| Henry| Ford|1250.0| null| India|+91 234 567 8901|456 78 9123| null|
| 3| Nick| Junior| 750.0| |united KINGDOM|+44 111 111 1111|222 33 4444| null|
| 4| Bill| Gomes|1500.0| 10| AUSTRALIA|+61 987 654 3210|789 12 6118| 10|
+-----------+----------+---------+------+-----+--------------+----------------+-----------+------+
employeesDF. \
withColumn('bonus1', coalesce(col('bonus').cast('int'), lit(0))). \
show()
+-----------+----------+---------+------+-----+--------------+----------------+-----------+------+
|employee_id|first_name|last_name|salary|bonus| nationality| phone_number| ssn|bonus1|
+-----------+----------+---------+------+-----+--------------+----------------+-----------+------+
| 1| Scott| Tiger|1000.0| 10| united states| +1 123 456 7890|123 45 6789| 10|
| 2| Henry| Ford|1250.0| null| India|+91 234 567 8901|456 78 9123| 0|
| 3| Nick| Junior| 750.0| |united KINGDOM|+44 111 111 1111|222 33 4444| 0|
| 4| Bill| Gomes|1500.0| 10| AUSTRALIA|+61 987 654 3210|789 12 6118| 10|
+-----------+----------+---------+------+-----+--------------+----------------+-----------+------+
from pyspark.sql.functions import expr
employeesDF. \
withColumn('bonus', expr("nvl(bonus, 0)")). \
show()
+-----------+----------+---------+------+-----+--------------+----------------+-----------+
|employee_id|first_name|last_name|salary|bonus| nationality| phone_number| ssn|
+-----------+----------+---------+------+-----+--------------+----------------+-----------+
| 1| Scott| Tiger|1000.0| 10| united states| +1 123 456 7890|123 45 6789|
| 2| Henry| Ford|1250.0| 0| India|+91 234 567 8901|456 78 9123|
| 3| Nick| Junior| 750.0| |united KINGDOM|+44 111 111 1111|222 33 4444|
| 4| Bill| Gomes|1500.0| 10| AUSTRALIA|+61 987 654 3210|789 12 6118|
+-----------+----------+---------+------+-----+--------------+----------------+-----------+
employeesDF. \
withColumn('bonus', expr("nvl(nullif(bonus, ''), 0)")). \
show()
+-----------+----------+---------+------+-----+--------------+----------------+-----------+
|employee_id|first_name|last_name|salary|bonus| nationality| phone_number| ssn|
+-----------+----------+---------+------+-----+--------------+----------------+-----------+
| 1| Scott| Tiger|1000.0| 10| united states| +1 123 456 7890|123 45 6789|
| 2| Henry| Ford|1250.0| 0| India|+91 234 567 8901|456 78 9123|
| 3| Nick| Junior| 750.0| 0|united KINGDOM|+44 111 111 1111|222 33 4444|
| 4| Bill| Gomes|1500.0| 10| AUSTRALIA|+61 987 654 3210|789 12 6118|
+-----------+----------+---------+------+-----+--------------+----------------+-----------+
employeesDF. \
withColumn('payment', col('salary') + (col('salary') * coalesce(col('bonus').cast('int'), lit(0)) / 100)). \
show()
+-----------+----------+---------+------+-----+--------------+----------------+-----------+-------+
|employee_id|first_name|last_name|salary|bonus| nationality| phone_number| ssn|payment|
+-----------+----------+---------+------+-----+--------------+----------------+-----------+-------+
| 1| Scott| Tiger|1000.0| 10| united states| +1 123 456 7890|123 45 6789| 1100.0|
| 2| Henry| Ford|1250.0| null| India|+91 234 567 8901|456 78 9123| 1250.0|
| 3| Nick| Junior| 750.0| |united KINGDOM|+44 111 111 1111|222 33 4444| 750.0|
| 4| Bill| Gomes|1500.0| 10| AUSTRALIA|+61 987 654 3210|789 12 6118| 1650.0|
+-----------+----------+---------+------+-----+--------------+----------------+-----------+-------+