Skip to content

Conversation

xinrong-meng
Copy link
Member

@xinrong-meng xinrong-meng commented Jun 16, 2025

What changes were proposed in this pull request?

Fix autocorr divide-by-zero error under ANSI mode

Why are the changes needed?

Ensure pandas on Spark works well with ANSI mode on.
Part of https://issues.apache.org/jira/browse/SPARK-52169.

Does this PR introduce any user-facing change?

When ANSI is on,

FROM

>>> s = ps.Series([1, 0, 0, 0])
>>> s.autocorr()
...
25/08/04 13:25:13 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 33)
org.apache.spark.SparkArithmeticException: [DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If necessary set "spark.sql.ansi.enabled" to "false" to bypass this error. SQLSTATE: 22012
== DataFrame ==
"corr" was called from
...

TO

>>> s = ps.Series([1, 0, 0, 0])
>>> s.autocorr()
nan

How was this patch tested?

Unit tests.

Commands below passed

 1004  SPARK_ANSI_SQL_MODE=true ./python/run-tests --python-executables=python3.11 --testnames "pyspark.pandas.tests.series.test_stat SeriesStatTests.test_autocorr"

 1009  SPARK_ANSI_SQL_MODE=false ./python/run-tests --python-executables=python3.11 --testnames "pyspark.pandas.tests.series.test_stat SeriesStatTests.test_autocorr

Was this patch authored or co-authored using generative AI tooling?

No.

else:
lag_scol = F.lag(scol, lag).over(Window.orderBy(NATURAL_ORDER_COLUMN_NAME))
lag_col_name = verify_temp_column_name(sdf, "__autocorr_lag_tmp_col__")
corr = (
sdf.withColumn(lag_col_name, lag_scol)
.select(F.corr(scol, F.col(lag_col_name)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how does corr affected by ansi?

Copy link
Member Author

@xinrong-meng xinrong-meng Aug 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The example below shows how F.corr(col1, col2) fails with a DIVIDE_BY_ZERO error when ANSI on

>>> df = spark.createDataFrame(
...     [(1, None), (0, 1), (0, 0), (0, 0)],
...     ["val", "lag"]
... )
>>> df.show()
+---+----+
|val| lag|
+---+----+
|  1|NULL|
|  0|   1|
|  0|   0|
|  0|   0|
+---+----+
>>> df.select(F.corr("val", "lag")).show()
...
org.apache.spark.SparkArithmeticException: [DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If necessary set "spark.sql.ansi.enabled" to "false" to bypass this error. SQLSTATE: 22012
== DataFrame ==
"corr" was called from
...

>>> spark.conf.set("spark.sql.ansi.enabled", False)
>>> df.select(F.corr("val", "lag")).show()
+--------------+
|corr(val, lag)|
+--------------+
|          NULL|
+--------------+

@xinrong-meng xinrong-meng changed the title [WIP][SPARK-52394][PS] Fix autocorr divide-by-zero error under ANSI mode [SPARK-52394][PS] Fix autocorr divide-by-zero error under ANSI mode Aug 4, 2025
@xinrong-meng
Copy link
Member Author

May I get a review @zhengruifeng @ueshin thank you!


sdf_lag = sdf.withColumn(lag_col_name, lag_scol)
if is_ansi_mode_enabled(sdf.sparkSession):
cov_value = sdf_lag.select(F.covar_samp(scol, F.col(lag_col_name))).first()[0]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Why is this using first() instead of head()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Used head for consistency.


sdf_lag = sdf.withColumn(lag_col_name, lag_scol)
if is_ansi_mode_enabled(sdf.sparkSession):
cov_value = sdf_lag.select(F.covar_samp(scol, F.col(lag_col_name))).first()[0]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure the relationship between these functions and corr. Could you add a comment to explain why it works?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, added comments

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sdf_lag = sdf.withColumn(lag_col_name, lag_scol)
if is_ansi_mode_enabled(sdf.sparkSession):
# Compute covariance between the original and lagged columns.
# If the covariance is None or zero (indicating no linear relationship),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about the relationship between corr and covar_samp now, if the ANSI failure will throw DIVIDE_BY_ZERO, then is it possible to try-catch the error?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might not be able to catch the error because it’s a Spark runtime error thrown from executor. But based on my testing, the calculation of corr fails specifically at covar_samp for the test case ps.Series([1, 0, 0, 0]). According to the formula, when covar_samp is 0, the correlation should be 0 anyway. WDYT?

@zhengruifeng
Copy link
Contributor

merged to master

@xinrong-meng
Copy link
Member Author

Thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants