diff --git a/python/pyspark/sql/connect/session.py b/python/pyspark/sql/connect/session.py index f2c06a2350a90..6ccffc718d064 100644 --- a/python/pyspark/sql/connect/session.py +++ b/python/pyspark/sql/connect/session.py @@ -619,7 +619,7 @@ def createDataFrame( safecheck = configs["spark.sql.execution.pandas.convertToArrowArraySafely"] - ser = ArrowStreamPandasSerializer(cast(str, timezone), safecheck == "true") + ser = ArrowStreamPandasSerializer(cast(str, timezone), safecheck == "true", False) _table = pa.Table.from_batches( [ diff --git a/python/pyspark/sql/pandas/conversion.py b/python/pyspark/sql/pandas/conversion.py index 18360fd813921..0a1b1c917c36d 100644 --- a/python/pyspark/sql/pandas/conversion.py +++ b/python/pyspark/sql/pandas/conversion.py @@ -739,7 +739,7 @@ def _create_from_pandas_with_arrow( jsparkSession = self._jsparkSession safecheck = self._jconf.arrowSafeTypeConversion() - ser = ArrowStreamPandasSerializer(timezone, safecheck) + ser = ArrowStreamPandasSerializer(timezone, safecheck, False) @no_type_check def reader_func(temp_filename): diff --git a/python/pyspark/sql/pandas/serializers.py b/python/pyspark/sql/pandas/serializers.py index a6a1504e6b50a..8367dca0228c4 100644 --- a/python/pyspark/sql/pandas/serializers.py +++ b/python/pyspark/sql/pandas/serializers.py @@ -19,6 +19,7 @@ Serializers for PyArrow and pandas conversions. See `pyspark.serializers` for more details. """ +from decimal import Decimal from itertools import groupby from typing import TYPE_CHECKING, Optional @@ -251,12 +252,50 @@ class ArrowStreamPandasSerializer(ArrowStreamSerializer): If True, conversion from Arrow to Pandas checks for overflow/truncation assign_cols_by_name : bool If True, then Pandas DataFrames will get columns by name + int_to_decimal_coercion_enabled : bool + If True, applies additional coercions in Python before converting to Arrow + This has performance penalties. """ - def __init__(self, timezone, safecheck): + def __init__(self, timezone, safecheck, int_to_decimal_coercion_enabled): super(ArrowStreamPandasSerializer, self).__init__() self._timezone = timezone self._safecheck = safecheck + self._int_to_decimal_coercion_enabled = int_to_decimal_coercion_enabled + + @staticmethod + def _apply_python_coercions(series, arrow_type): + """ + Apply additional coercions to the series in Python before converting to Arrow: + - Convert integer series to decimal type. + When we have a pandas series of integers that needs to be converted to + pyarrow.decimal128 (with precision < 20), PyArrow fails with precision errors. + Explicitly cast to Decimal first. + + Parameters + ---------- + series : pandas.Series + The series to potentially convert + arrow_type : pyarrow.DataType + The target arrow type + + Returns + ------- + pandas.Series + The potentially converted pandas series + """ + import pyarrow.types as types + import pandas as pd + + # Convert integer series to Decimal objects + if ( + types.is_decimal(arrow_type) + and series.dtype.kind in ["i", "u"] # integer types (signed/unsigned) + and not series.empty + ): + series = series.apply(lambda x: Decimal(x) if pd.notna(x) else None) + + return series def arrow_to_pandas( self, arrow_column, idx, struct_in_pandas="dict", ndarray_as_list=False, spark_type=None @@ -326,6 +365,9 @@ def _create_array(self, series, arrow_type, spark_type=None, arrow_cast=False): ) series = conv(series) + if self._int_to_decimal_coercion_enabled: + series = self._apply_python_coercions(series, arrow_type) + if hasattr(series.array, "__arrow_array__"): mask = None else: @@ -444,8 +486,11 @@ def __init__( ndarray_as_list=False, arrow_cast=False, input_types=None, + int_to_decimal_coercion_enabled=False, ): - super(ArrowStreamPandasUDFSerializer, self).__init__(timezone, safecheck) + super(ArrowStreamPandasUDFSerializer, self).__init__( + timezone, safecheck, int_to_decimal_coercion_enabled + ) self._assign_cols_by_name = assign_cols_by_name self._df_for_struct = df_for_struct self._struct_in_pandas = struct_in_pandas @@ -799,7 +844,7 @@ class ArrowStreamPandasUDTFSerializer(ArrowStreamPandasUDFSerializer): Serializer used by Python worker to evaluate Arrow-optimized Python UDTFs. """ - def __init__(self, timezone, safecheck): + def __init__(self, timezone, safecheck, int_to_decimal_coercion_enabled): super(ArrowStreamPandasUDTFSerializer, self).__init__( timezone=timezone, safecheck=safecheck, @@ -819,6 +864,8 @@ def __init__(self, timezone, safecheck): ndarray_as_list=True, # Enables explicit casting for mismatched return types of Arrow Python UDTFs. arrow_cast=True, + # Enable additional coercions for UDTF serialization + int_to_decimal_coercion_enabled=int_to_decimal_coercion_enabled, ) self._converter_map = dict() @@ -905,6 +952,9 @@ def _create_array(self, series, arrow_type, spark_type=None, arrow_cast=False): conv = self._get_or_create_converter_from_pandas(dt) series = conv(series) + if self._int_to_decimal_coercion_enabled: + series = self._apply_python_coercions(series, arrow_type) + if hasattr(series.array, "__arrow_array__"): mask = None else: @@ -1036,9 +1086,13 @@ def __init__( state_object_schema, arrow_max_records_per_batch, prefers_large_var_types, + int_to_decimal_coercion_enabled, ): super(ApplyInPandasWithStateSerializer, self).__init__( - timezone, safecheck, assign_cols_by_name + timezone, + safecheck, + assign_cols_by_name, + int_to_decimal_coercion_enabled=int_to_decimal_coercion_enabled, ) self.pickleSer = CPickleSerializer() self.utf8_deserializer = UTF8Deserializer() @@ -1406,9 +1460,19 @@ class TransformWithStateInPandasSerializer(ArrowStreamPandasUDFSerializer): Limit of the number of records that can be written to a single ArrowRecordBatch in memory. """ - def __init__(self, timezone, safecheck, assign_cols_by_name, arrow_max_records_per_batch): + def __init__( + self, + timezone, + safecheck, + assign_cols_by_name, + arrow_max_records_per_batch, + int_to_decimal_coercion_enabled, + ): super(TransformWithStateInPandasSerializer, self).__init__( - timezone, safecheck, assign_cols_by_name + timezone, + safecheck, + assign_cols_by_name, + int_to_decimal_coercion_enabled=int_to_decimal_coercion_enabled, ) self.arrow_max_records_per_batch = arrow_max_records_per_batch self.key_offsets = None @@ -1482,9 +1546,20 @@ class TransformWithStateInPandasInitStateSerializer(TransformWithStateInPandasSe Same as input parameters in TransformWithStateInPandasSerializer. """ - def __init__(self, timezone, safecheck, assign_cols_by_name, arrow_max_records_per_batch): + def __init__( + self, + timezone, + safecheck, + assign_cols_by_name, + arrow_max_records_per_batch, + int_to_decimal_coercion_enabled, + ): super(TransformWithStateInPandasInitStateSerializer, self).__init__( - timezone, safecheck, assign_cols_by_name, arrow_max_records_per_batch + timezone, + safecheck, + assign_cols_by_name, + arrow_max_records_per_batch, + int_to_decimal_coercion_enabled, ) self.init_key_offsets = None diff --git a/python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py b/python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py index 1f9532352679a..67398a46bce8d 100644 --- a/python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py +++ b/python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py @@ -281,6 +281,54 @@ def check_apply_in_pandas_returning_incompatible_type(self): error_message_regex=expected, ) + def test_cogroup_apply_int_to_decimal_coercion(self): + left = self.data1.limit(3) + right = self.data2.limit(3) + + def int_to_decimal_merge(lft, rgt): + return pd.DataFrame( + [ + { + "id": 1, + "decimal_result": 98765, + "left_count": len(lft), + "right_count": len(rgt), + } + ] + ) + + with self.sql_conf( + {"spark.sql.execution.pythonUDF.pandas.intToDecimalCoercionEnabled": True} + ): + result = ( + left.groupby("id") + .cogroup(right.groupby("id")) + .applyInPandas( + int_to_decimal_merge, + "id long, decimal_result decimal(10,2), left_count long, right_count long", + ) + .collect() + ) + self.assertTrue(len(result) > 0) + for row in result: + self.assertEqual(row.decimal_result, 98765.00) + + with self.sql_conf( + {"spark.sql.execution.pythonUDF.pandas.intToDecimalCoercionEnabled": False} + ): + with self.assertRaisesRegex( + PythonException, "Exception thrown when converting pandas.Series" + ): + ( + left.groupby("id") + .cogroup(right.groupby("id")) + .applyInPandas( + int_to_decimal_merge, + "id long, decimal_result decimal(10,2), left_count long, right_count long", + ) + .collect() + ) + def test_mixed_scalar_udfs_followed_by_cogrouby_apply(self): df = self.spark.range(0, 10).toDF("v1") df = df.withColumn("v2", udf(lambda x: x + 1, "int")(df["v1"])).withColumn( diff --git a/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py b/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py index c3d16cba16390..9965e2acc4b51 100644 --- a/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py +++ b/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py @@ -387,6 +387,37 @@ def check_apply_in_pandas_returning_incompatible_type(self): output_schema="id long, mean string", ) + def test_apply_in_pandas_int_to_decimal_coercion(self): + def int_to_decimal_func(key, pdf): + return pd.DataFrame([{"id": key[0], "decimal_result": 12345}]) + + with self.sql_conf( + {"spark.sql.execution.pythonUDF.pandas.intToDecimalCoercionEnabled": True} + ): + result = ( + self.data.groupby("id") + .applyInPandas(int_to_decimal_func, schema="id long, decimal_result decimal(10,2)") + .collect() + ) + + self.assertTrue(len(result) > 0) + for row in result: + self.assertEqual(row.decimal_result, 12345.00) + + with self.sql_conf( + {"spark.sql.execution.pythonUDF.pandas.intToDecimalCoercionEnabled": False} + ): + with self.assertRaisesRegex( + PythonException, "Exception thrown when converting pandas.Series" + ): + ( + self.data.groupby("id") + .applyInPandas( + int_to_decimal_func, schema="id long, decimal_result decimal(10,2)" + ) + .collect() + ) + def test_datatype_string(self): df = self.data diff --git a/python/pyspark/sql/tests/pandas/test_pandas_grouped_map_with_state.py b/python/pyspark/sql/tests/pandas/test_pandas_grouped_map_with_state.py index e1b8d7c76d183..e4f307ebc96ae 100644 --- a/python/pyspark/sql/tests/pandas/test_pandas_grouped_map_with_state.py +++ b/python/pyspark/sql/tests/pandas/test_pandas_grouped_map_with_state.py @@ -23,6 +23,7 @@ import unittest from typing import cast +from decimal import Decimal from pyspark.sql.streaming.state import GroupStateTimeout, GroupState from pyspark.sql.types import ( @@ -31,6 +32,7 @@ StructType, StructField, Row, + DecimalType, ) from pyspark.testing.sqlutils import ( ReusedSQLTestCase, @@ -59,7 +61,12 @@ def conf(cls): cfg.set("spark.sql.shuffle.partitions", "5") return cfg - def _test_apply_in_pandas_with_state_basic(self, func, check_results): + def _test_apply_in_pandas_with_state_basic(self, func, check_results, output_type=None): + if output_type is None: + output_type = StructType( + [StructField("key", StringType()), StructField("countAsString", StringType())] + ) + input_path = tempfile.mkdtemp() def prepare_test_resource(): @@ -75,9 +82,6 @@ def prepare_test_resource(): q.stop() self.assertTrue(df.isStreaming) - output_type = StructType( - [StructField("key", StringType()), StructField("countAsString", StringType())] - ) state_type = StructType([StructField("c", LongType())]) q = ( @@ -314,6 +318,26 @@ def assert_test(): finally: q.stop() + def test_apply_in_pandas_with_state_int_to_decimal_coercion(self): + def func(key, pdf_iter, state): + assert isinstance(state, GroupState) + yield pd.DataFrame({"key": [key[0]], "decimal_sum": [1]}) + + def check_results(batch_df, _): + assert set(batch_df.sort("key").collect()) == { + Row(key="hello", decimal_sum=Decimal("1.00")), + Row(key="this", decimal_sum=Decimal("1.00")), + }, "Decimal coercion failed: " + str(batch_df.sort("key").collect()) + + output_type = StructType( + [StructField("key", StringType()), StructField("decimal_sum", DecimalType(10, 2))] + ) + + with self.sql_conf( + {"spark.sql.execution.pythonUDF.pandas.intToDecimalCoercionEnabled": True} + ): + self._test_apply_in_pandas_with_state_basic(func, check_results, output_type) + class GroupedApplyInPandasWithStateTests( GroupedApplyInPandasWithStateTestsMixin, ReusedSQLTestCase diff --git a/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py b/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py index 007ed5de2fbd7..00e03c6da19bb 100644 --- a/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py +++ b/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py @@ -35,6 +35,7 @@ Row, IntegerType, TimestampType, + DecimalType, ) from pyspark.testing import assertDataFrameEqual from pyspark.testing.sqlutils import ( @@ -1714,6 +1715,74 @@ def check_basic_state(batch_df, batch_id): and "column family state must be nullable" in error_msg ) + def test_transform_with_state_int_to_decimal_coercion(self): + if not self.use_pandas(): + return + + class IntToDecimalProcessor(StatefulProcessor): + def init(self, handle): + count_schema = StructType([StructField("value", IntegerType(), True)]) + self.count_state = handle.getValueState("count", count_schema) + + def handleInputRows(self, key, rows, timerValues): + if self.count_state.exists(): + count = self.count_state.get()[0] + else: + count = 0 + count += len(list(rows)) + self.count_state.update((count,)) + + import pandas as pd + + yield pd.DataFrame( + {"id": [key[0]], "decimal_result": [12345]} # Integer to be coerced to decimal + ) + + def close(self): + pass + + data = [("1", "a"), ("1", "b"), ("2", "c")] + df = self.spark.createDataFrame(data, ["id", "value"]) + + output_schema = StructType( + [ + StructField("id", StringType(), True), + StructField("decimal_result", DecimalType(10, 2), True), + ] + ) + + with self.sql_conf( + {"spark.sql.execution.pythonUDF.pandas.intToDecimalCoercionEnabled": True} + ): + result = ( + df.groupBy("id") + .transformWithStateInPandas( + statefulProcessor=IntToDecimalProcessor(), + outputStructType=output_schema, + outputMode="Update", + timeMode="None", + ) + .collect() + ) + self.assertTrue(len(result) > 0) + + with self.sql_conf( + {"spark.sql.execution.pythonUDF.pandas.intToDecimalCoercionEnabled": False} + ): + with self.assertRaisesRegex( + Exception, "Exception thrown when converting pandas.Series" + ): + ( + df.groupBy("id") + .transformWithStateInPandas( + statefulProcessor=IntToDecimalProcessor(), + outputStructType=output_schema, + outputMode="Update", + timeMode="None", + ) + .collect() + ) + @unittest.skipIf( not have_pyarrow or os.environ.get("PYTHON_GIL", "?") == "0", diff --git a/python/pyspark/sql/tests/pandas/test_pandas_udf.py b/python/pyspark/sql/tests/pandas/test_pandas_udf.py index c2769bf882b6b..d67bed462345a 100644 --- a/python/pyspark/sql/tests/pandas/test_pandas_udf.py +++ b/python/pyspark/sql/tests/pandas/test_pandas_udf.py @@ -343,6 +343,61 @@ def udf(column): with self.sql_conf({"spark.sql.execution.pandas.convertToArrowArraySafely": False}): df.withColumn("udf", udf("id")).collect() + def test_pandas_udf_int_to_decimal_coercion(self): + import pandas as pd + from decimal import Decimal + + df = self.spark.range(0, 3) + + @pandas_udf(returnType="decimal(10,2)") + def int_to_decimal_udf(column): + values = [123, 456, 789] + return pd.Series([values[int(val) % len(values)] for val in column]) + + with self.sql_conf( + {"spark.sql.execution.pythonUDF.pandas.intToDecimalCoercionEnabled": True} + ): + result = df.withColumn("decimal_val", int_to_decimal_udf("id")).collect() + self.assertEqual(result[0]["decimal_val"], 123.00) + self.assertEqual(result[1]["decimal_val"], 456.00) + self.assertEqual(result[2]["decimal_val"], 789.00) + + with self.sql_conf( + {"spark.sql.execution.pythonUDF.pandas.intToDecimalCoercionEnabled": False} + ): + self.assertRaisesRegex( + PythonException, + "Exception thrown when converting pandas.Series", + df.withColumn("decimal_val", int_to_decimal_udf("id")).collect, + ) + + @pandas_udf(returnType="decimal(25,1)") + def high_precision_udf(column): + values = [1, 2, 3] + return pd.Series([values[int(val) % len(values)] for val in column]) + + with self.sql_conf( + {"spark.sql.execution.pythonUDF.pandas.intToDecimalCoercionEnabled": True} + ): + result = df.withColumn("decimal_val", high_precision_udf("id")).collect() + self.assertEqual(len(result), 3) + self.assertEqual(result[0]["decimal_val"], Decimal("1.0")) + self.assertEqual(result[1]["decimal_val"], Decimal("2.0")) + self.assertEqual(result[2]["decimal_val"], Decimal("3.0")) + + with self.sql_conf( + {"spark.sql.execution.pythonUDF.pandas.intToDecimalCoercionEnabled": False} + ): + # Also not supported. + # This can be fixed by enabling arrow_cast + # This is currently not the case for SQL_SCALAR_PANDAS_UDF and + # SQL_SCALAR_PANDAS_ITER_UDF. + self.assertRaisesRegex( + PythonException, + "Exception thrown when converting pandas.Series", + df.withColumn("decimal_val", high_precision_udf("id")).collect, + ) + def test_pandas_udf_timestamp_ntz(self): # SPARK-36626: Test TimestampNTZ in pandas UDF @pandas_udf(returnType="timestamp_ntz") diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 5ee6c1dafd052..caf4889aa2638 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -1238,8 +1238,16 @@ def read_udtf(pickleSer, infile, eval_type): ).lower() == "true" ) + int_to_decimal_coercion_enabled = ( + runner_conf.get( + "spark.sql.execution.pythonUDF.pandas.intToDecimalCoercionEnabled", "false" + ).lower() + == "true" + ) timezone = runner_conf.get("spark.sql.session.timeZone", None) - ser = ArrowStreamPandasUDTFSerializer(timezone, safecheck) + ser = ArrowStreamPandasUDTFSerializer( + timezone, safecheck, int_to_decimal_coercion_enabled=int_to_decimal_coercion_enabled + ) else: ser = ArrowStreamUDTFSerializer() @@ -1960,12 +1968,23 @@ def read_udfs(pickleSer, infile, eval_type): runner_conf.get("spark.sql.execution.pandas.convertToArrowArraySafely", "false").lower() == "true" ) + int_to_decimal_coercion_enabled = ( + runner_conf.get( + "spark.sql.execution.pythonUDF.pandas.intToDecimalCoercionEnabled", "false" + ).lower() + == "true" + ) _assign_cols_by_name = assign_cols_by_name(runner_conf) if eval_type == PythonEvalType.SQL_COGROUPED_MAP_ARROW_UDF: ser = CogroupArrowUDFSerializer(_assign_cols_by_name) elif eval_type == PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF: - ser = CogroupPandasUDFSerializer(timezone, safecheck, _assign_cols_by_name) + ser = CogroupPandasUDFSerializer( + timezone, + safecheck, + _assign_cols_by_name, + int_to_decimal_coercion_enabled=int_to_decimal_coercion_enabled, + ) elif eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF_WITH_STATE: arrow_max_records_per_batch = runner_conf.get( "spark.sql.execution.arrow.maxRecordsPerBatch", 10000 @@ -1979,6 +1998,7 @@ def read_udfs(pickleSer, infile, eval_type): state_object_schema, arrow_max_records_per_batch, prefers_large_var_types, + int_to_decimal_coercion_enabled=int_to_decimal_coercion_enabled, ) elif eval_type == PythonEvalType.SQL_TRANSFORM_WITH_STATE_PANDAS_UDF: arrow_max_records_per_batch = runner_conf.get( @@ -1987,7 +2007,11 @@ def read_udfs(pickleSer, infile, eval_type): arrow_max_records_per_batch = int(arrow_max_records_per_batch) ser = TransformWithStateInPandasSerializer( - timezone, safecheck, _assign_cols_by_name, arrow_max_records_per_batch + timezone, + safecheck, + _assign_cols_by_name, + arrow_max_records_per_batch, + int_to_decimal_coercion_enabled=int_to_decimal_coercion_enabled, ) elif eval_type == PythonEvalType.SQL_TRANSFORM_WITH_STATE_PANDAS_INIT_STATE_UDF: arrow_max_records_per_batch = runner_conf.get( @@ -1996,7 +2020,11 @@ def read_udfs(pickleSer, infile, eval_type): arrow_max_records_per_batch = int(arrow_max_records_per_batch) ser = TransformWithStateInPandasInitStateSerializer( - timezone, safecheck, _assign_cols_by_name, arrow_max_records_per_batch + timezone, + safecheck, + _assign_cols_by_name, + arrow_max_records_per_batch, + int_to_decimal_coercion_enabled=int_to_decimal_coercion_enabled, ) elif eval_type == PythonEvalType.SQL_TRANSFORM_WITH_STATE_PYTHON_ROW_UDF: arrow_max_records_per_batch = runner_conf.get( @@ -2062,6 +2090,7 @@ def read_udfs(pickleSer, infile, eval_type): ndarray_as_list, arrow_cast, input_types, + int_to_decimal_coercion_enabled=int_to_decimal_coercion_enabled, ) else: batch_size = int(os.environ.get("PYTHON_UDF_BATCH_SIZE", "100")) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 5b5051fb4f7a9..ff919d2b278a7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3928,6 +3928,15 @@ object SQLConf { .booleanConf .createWithDefault(false) + val PYTHON_UDF_PANDAS_INT_TO_DECIMAL_COERCION_ENABLED = + buildConf("spark.sql.execution.pythonUDF.pandas.intToDecimalCoercionEnabled") + .doc("When true, convert int to Decimal python objects before converting " + + "Pandas.Series to Arrow array during serialization." + + "Disabled by default, impacts performance.") + .version("4.1.0") + .booleanConf + .createWithDefault(false) + val PYTHON_TABLE_UDF_ARROW_ENABLED = buildConf("spark.sql.execution.pythonUDTF.arrow.enabled") .doc("Enable Arrow optimization for Python UDTFs.") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala index c842705b11620..70d2b17de2e72 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala @@ -136,8 +136,12 @@ object ArrowPythonRunner { val legacyPandasConversionUDF = Seq( SQLConf.PYTHON_UDF_LEGACY_PANDAS_CONVERSION_ENABLED.key -> conf.legacyPandasConversionUDF.toString) + val intToDecimalCoercion = Seq( + SQLConf.PYTHON_UDF_PANDAS_INT_TO_DECIMAL_COERCION_ENABLED.key -> + conf.getConf(SQLConf.PYTHON_UDF_PANDAS_INT_TO_DECIMAL_COERCION_ENABLED, false).toString) Map(timeZoneConf ++ pandasColsByName ++ arrowSafeTypeCheck ++ arrowAyncParallelism ++ useLargeVarTypes ++ + intToDecimalCoercion ++ legacyPandasConversion ++ legacyPandasConversionUDF: _*) } }