Skip to content

Commit a72fd41

Browse files
xinrong-mengzhengruifeng
authored andcommitted
[SPARK-52394][PS] Fix autocorr divide-by-zero error under ANSI mode
### What changes were proposed in this pull request? Fix autocorr divide-by-zero error under ANSI mode ### Why are the changes needed? Ensure pandas on Spark works well with ANSI mode on. Part of https://issues.apache.org/jira/browse/SPARK-52169. ### Does this PR introduce _any_ user-facing change? When ANSI is on, FROM ```py >>> s = ps.Series([1, 0, 0, 0]) >>> s.autocorr() ... 25/08/04 13:25:13 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 33) org.apache.spark.SparkArithmeticException: [DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If necessary set "spark.sql.ansi.enabled" to "false" to bypass this error. SQLSTATE: 22012 == DataFrame == "corr" was called from ... ``` TO ```py >>> s = ps.Series([1, 0, 0, 0]) >>> s.autocorr() nan ``` ### How was this patch tested? Unit tests. Commands below passed ``` 1004 SPARK_ANSI_SQL_MODE=true ./python/run-tests --python-executables=python3.11 --testnames "pyspark.pandas.tests.series.test_stat SeriesStatTests.test_autocorr" 1009 SPARK_ANSI_SQL_MODE=false ./python/run-tests --python-executables=python3.11 --testnames "pyspark.pandas.tests.series.test_stat SeriesStatTests.test_autocorr ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #51192 from xinrong-meng/autocorr. Authored-by: Xinrong Meng <[email protected]> Signed-off-by: Ruifeng Zheng <[email protected]>
1 parent e4ded08 commit a72fd41

File tree

2 files changed

+15
-5
lines changed

2 files changed

+15
-5
lines changed

python/pyspark/pandas/series.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3410,11 +3410,18 @@ def autocorr(self, lag: int = 1) -> float:
34103410
else:
34113411
lag_scol = F.lag(scol, lag).over(Window.orderBy(NATURAL_ORDER_COLUMN_NAME))
34123412
lag_col_name = verify_temp_column_name(sdf, "__autocorr_lag_tmp_col__")
3413-
corr = (
3414-
sdf.withColumn(lag_col_name, lag_scol)
3415-
.select(F.corr(scol, F.col(lag_col_name)))
3416-
.head()[0]
3417-
)
3413+
3414+
sdf_lag = sdf.withColumn(lag_col_name, lag_scol)
3415+
if is_ansi_mode_enabled(sdf.sparkSession):
3416+
# Compute covariance between the original and lagged columns.
3417+
# If the covariance is None or zero (indicating no linear relationship),
3418+
# return NaN, otherwise, proceeding to compute correlation may raise
3419+
# DIVIDE_BY_ZERO under ANSI mode.
3420+
cov_value = sdf_lag.select(F.covar_samp(scol, F.col(lag_col_name))).head()[0]
3421+
if cov_value is None or cov_value == 0.0:
3422+
return np.nan
3423+
corr = sdf_lag.select(F.corr(scol, F.col(lag_col_name))).head()[0]
3424+
34183425
return np.nan if corr is None else corr
34193426

34203427
@with_ansi_mode_context

python/pyspark/pandas/tests/series/test_stat.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -606,6 +606,9 @@ def test_autocorr(self):
606606
with self.assertRaisesRegex(TypeError, r"lag should be an int; however, got"):
607607
psser.autocorr(1.0)
608608

609+
psser = ps.Series([1, 0, 0, 0])
610+
self.assertTrue(bool(np.isnan(psser.autocorr())))
611+
609612
def _test_autocorr(self, pdf):
610613
psdf = ps.from_pandas(pdf)
611614
for lag in range(-10, 10):

0 commit comments

Comments
 (0)