Simplify PySpark testing with DataFrame equality functions

Engineering


The DataFrame equality test functions were introduced in Apache Spark™ 3.5 and Databricks Runtime 14.2 to simplify PySpark unit testing. The full set of capabilities described in this blog post will be available starting with the upcoming Apache Spark 4.0 and Databricks Runtime 14.3.

Write more confident DataFrame transformations with DataFrame equality test functions

Working with data in PySpark involves applying transformations, aggregations, and manipulations to DataFrames. As transformations accumulate, how can you be confident that your code works as expected? PySpark equality test utility functions provide an efficient and effective way to check your data against expected outcomes, helping you identify unexpected differences and catch errors early in the analysis process. What’s more, they return intuitive information pinpointing precisely the differences so you can take action immediately without spending a lot of time debugging.

Using DataFrame equality test functions

Two equality test functions for PySpark DataFrames were introduced in Apache Spark 3.5: assertDataFrameEqual and assertSchemaEqual. Let’s take a look at how to use each of them.

assertDataFrameEqual: This function allows you to compare two PySpark DataFrames for equality with a single line of code, checking whether the data and schemas match. It returns descriptive information when there are differences.

Let’s walk through an example. First, we’ll create two DataFrames, intentionally introducing a difference in the first row:

df_expected = spark.createDataFrame(data=[("Alfred", 1500), ("Alfred", 2500), ("Anna", 
500), ("Anna", 3000)], schema=["name", "amount"])

df_actual = spark.createDataFrame(data=[("Alfred", 1200), ("Alfred", 2500), ("Anna", 500), 
("Anna", 3000)], schema=["name", "amount"])

Then we’ll call assertDataFrameEqual with the two DataFrames:

from pyspark.testing import assertDataFrameEqual

assertDataFrameEqual(df_actual, df_expected)

The function returns a descriptive message indicating that the first row in the two DataFrames is different. In this example, the first amounts listed for Alfred in this row are not the same (expected: 1500, actual: 1200):

With this information, you immediately know the problem with the DataFrame your code generated and can target your debugging based on that.

The function also has several options to control the strictness of the DataFrame comparison so that you can adjust it according to your specific use cases.

assertSchemaEqual: This function compares only the schemas of two DataFrames; it does not compare row data. It lets you validate whether the column names, data types, and nullable property are the same for two different DataFrames.

Let’s look at an example. First, we’ll create two DataFrames with different schemas:

schema_actual = "name STRING, amount DOUBLE"

data_expected = [["Alfred", 1500], ["Alfred", 2500], ["Anna", 500], ["Anna", 3000]]
data_actual = [["Alfred", 1500.0], ["Alfred", 2500.0], ["Anna", 500.0], ["Anna", 3000.0]]

df_expected = spark.createDataFrame(data = data_expected)
df_actual = spark.createDataFrame(data = data_actual, schema = schema_actual)

Now, let’s call assertSchemaEqual with these two DataFrame schemas:

from pyspark.testing import assertSchemaEqual

assertSchemaEqual(df_actual.schema, df_expected.schema)

The function determines that the schemas of the two DataFrames are different, and the output indicates where they diverge:

DataFrames

In this example, there are two differences: the data type of the amount column is LONG in the actual DataFrame but DOUBLE in the expected DataFrame, and because we created the expected DataFrame without specifying a schema, the column names are also different.

Both of these differences are highlighted in the function output, as illustrated here.

assertPandasOnSparkEqual is not covered in this blog post since it is deprecated from Apache Spark 3.5.1 and scheduled to be removed in the upcoming Apache Spark 4.0.0. For testing Pandas API on Spark, see Pandas API on Spark equality test functions.

Structured output for debugging differences in PySpark DataFrames

While the assertDataFrameEqual and assertSchemaEqual functions are primarily aimed at unit testing, where you typically use smaller datasets to test your PySpark functions, you might use them with DataFrames with more than just a few rows and columns. In such scenarios, you can easily retrieve the row data for rows that are different to make further debugging easier.

Let’s take a look at how to do that. We’ll use the same data we used earlier to create two DataFrames:

df_expected = spark.createDataFrame(data=[("Alfred", 1500), ("Alfred", 2500), 
("Anna", 500), ("Anna", 3000)], schema=["name", "amount"])
df_actual = spark.createDataFrame(data=[("Alfred", 1200), ("Alfred", 2500), ("Anna", 
500), ("Anna", 3000)], schema=["name", "amount"])

And now we’ll grab the data that differs between the two DataFrames from the assertion error objects after calling assertDataFrameEqual:

from pyspark.testing import assertDataFrameEqual
from pyspark.errors import PySparkAssertionError

try:
    assertDataFrameEqual(df_actual, df_expected, includeDiffRows=True)
except PySparkAssertionError as e:
    # `e.data` here looks like:
    # [(Row(name='Alfred', amount=1200), Row(name='Alfred', amount=1500))]
    spark.createDataFrame(e.data, schema=["Actual", "Expected"]).show() 

Creating a DataFrame based on the rows that are different and showing it, as we’ve done in this example, illustrates how easy it is to access this information:

DataFrame

As you can see, information on the rows that are different is immediately available for further analysis. You no longer have to write code to extract this information from the actual and expected DataFrames for debugging purposes.

This feature will be available from the upcoming Apache Spark 4.0 and DBR 14.3.

Pandas API on Spark equality test functions

In addition to the functions for testing the equality of PySpark DataFrames, Pandas API on Spark users will have access to the following DataFrame equality test functions:

  • assert_frame_equal
  • assert_series_equal
  • assert_index_equal

The functions provide options for controlling the strictness of comparisons and are great for unit testing your Pandas API on Spark DataFrames. They provide the exact same API as the pandas test utility functions, so you can use them without changing existing pandas test code that you want to run using Pandas API on Spark.

Here are a couple of examples demonstrating the use of assert_frame_equal with different parameters, comparing Pandas API on Spark DataFrames:

from pyspark.pandas.testing import assert_frame_equal
import pyspark.pandas as ps

# Create two slightly different Pandas API on Spark DataFrames
df1 = ps.DataFrame({"a": [1, 2, 3], "b": [4.0, 5.0, 6.0]})
df2 = ps.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]})  # 'b' column as integers

# Validate DataFrame equality with strict data type checking
assert_frame_equal(df1, df2, check_dtype=True)

In this example, the schemas of the two DataFrames are different. The function output lists the differences, as shown here:

DataFrames

We can specify that we want the function to compare column data even when the columns do not have the same data type using the check_dtype argument, as in this example:

# DataFrames are equal with check_dtype=False
assert_frame_equal(df1, df2, check_dtype=False)

Since we specified that assert_frame_equal should ignore column data types, it now considers the two DataFrames equal.

These functions also allow comparisons between Pandas API on Spark objects and pandas objects, facilitating compatibility checks between different DataFrame libraries, as illustrated in this example:

import pandas as pd

# Pandas DataFrame
df_pandas = pd.DataFrame({"a": [1, 2, 3], "b": [4.0, 5.0, 6.0]})

# Comparing Pandas API on Spark DataFrame with the Pandas DataFrame
assert_frame_equal(df1, df_pandas)

# Comparing Pandas API on Spark Series with the Pandas Series
assert_series_equal(df1.a, df_pandas.a)

# Comparing Pandas API on Spark Index with the Pandas Index
assert_index_equal(df1.index, df_pandas.index)

Using the new PySpark DataFrame and Pandas API on Spark equality test functions is a great way to make sure your PySpark code works as expected. These functions help you not only catch errors but also understand exactly what has gone wrong, enabling you to quickly and easily identify where the problem is. Check out the Testing PySpark page for more information.

These functions will be available from the upcoming Apache Spark 4.0. DBR 14.2 already supports it.



Source link

Leave a Reply

Your email address will not be published. Required fields are marked *