diff --git a/python/pyspark/ml/connect/feature.py b/python/pyspark/ml/connect/feature.py index b0e2028e43fa..2184b3c7f332 100644 --- a/python/pyspark/ml/connect/feature.py +++ b/python/pyspark/ml/connect/feature.py @@ -15,11 +15,11 @@ # limitations under the License. # -import pickle from typing import Any, Union, List, Tuple, Callable, Dict, Optional import numpy as np import pandas as pd +import pyarrow as pa from pyspark import keyword_only from pyspark.sql import DataFrame @@ -133,27 +133,29 @@ def map_value(x: "np.ndarray") -> "np.ndarray": return transform_fn def _get_core_model_filename(self) -> str: - return self.__class__.__name__ + ".sklearn.pkl" + return self.__class__.__name__ + ".arrow.parquet" def _save_core_model(self, path: str) -> None: - from sklearn.preprocessing import MaxAbsScaler as sk_MaxAbsScaler - - sk_model = sk_MaxAbsScaler() - sk_model.scale_ = self.scale_values - sk_model.max_abs_ = self.max_abs_values - sk_model.n_features_in_ = len(self.max_abs_values) # type: ignore[arg-type] - sk_model.n_samples_seen_ = self.n_samples_seen - - with open(path, "wb") as fp: - pickle.dump(sk_model, fp) + import pyarrow.parquet as pq + + table = pa.Table.from_arrays( + [ + pa.array([self.scale_values], pa.list_(pa.float64())), + pa.array([self.max_abs_values], pa.list_(pa.float64())), + pa.array([self.n_samples_seen], pa.int64()), + ], + names=["scale", "max_abs", "n_samples"], + ) + pq.write_table(table, path) def _load_core_model(self, path: str) -> None: - with open(path, "rb") as fp: - sk_model = pickle.load(fp) + import pyarrow.parquet as pq + + table = pq.read_table(path) - self.max_abs_values = sk_model.max_abs_ - self.scale_values = sk_model.scale_ - self.n_samples_seen = sk_model.n_samples_seen_ + self.max_abs_values = np.array(table.column("scale")[0].as_py()) + self.scale_values = np.array(table.column("max_abs")[0].as_py()) + self.n_samples_seen = table.column("n_samples")[0].as_py() class StandardScaler(Estimator, HasInputCol, HasOutputCol, ParamsReadWrite): @@ -254,29 +256,31 @@ def map_value(x: "np.ndarray") -> "np.ndarray": return transform_fn def _get_core_model_filename(self) -> str: - return self.__class__.__name__ + ".sklearn.pkl" + return self.__class__.__name__ + ".arrow.parquet" def _save_core_model(self, path: str) -> None: - from sklearn.preprocessing import StandardScaler as sk_StandardScaler - - sk_model = sk_StandardScaler(with_mean=True, with_std=True) - sk_model.scale_ = self.scale_values - sk_model.var_ = self.std_values * self.std_values # type: ignore[operator] - sk_model.mean_ = self.mean_values - sk_model.n_features_in_ = len(self.std_values) # type: ignore[arg-type] - sk_model.n_samples_seen_ = self.n_samples_seen - - with open(path, "wb") as fp: - pickle.dump(sk_model, fp) + import pyarrow.parquet as pq + + table = pa.Table.from_arrays( + [ + pa.array([self.scale_values], pa.list_(pa.float64())), + pa.array([self.mean_values], pa.list_(pa.float64())), + pa.array([self.std_values], pa.list_(pa.float64())), + pa.array([self.n_samples_seen], pa.int64()), + ], + names=["scale", "mean", "std", "n_samples"], + ) + pq.write_table(table, path) def _load_core_model(self, path: str) -> None: - with open(path, "rb") as fp: - sk_model = pickle.load(fp) + import pyarrow.parquet as pq + + table = pq.read_table(path) - self.std_values = np.sqrt(sk_model.var_) - self.scale_values = sk_model.scale_ - self.mean_values = sk_model.mean_ - self.n_samples_seen = sk_model.n_samples_seen_ + self.scale_values = np.array(table.column("scale")[0].as_py()) + self.mean_values = np.array(table.column("mean")[0].as_py()) + self.std_values = np.array(table.column("std")[0].as_py()) + self.n_samples_seen = table.column("n_samples")[0].as_py() class ArrayAssembler( diff --git a/python/pyspark/ml/tests/connect/test_legacy_mode_feature.py b/python/pyspark/ml/tests/connect/test_legacy_mode_feature.py index 2d0a37aca5c8..9fbf24d25342 100644 --- a/python/pyspark/ml/tests/connect/test_legacy_mode_feature.py +++ b/python/pyspark/ml/tests/connect/test_legacy_mode_feature.py @@ -17,7 +17,6 @@ # import os -import pickle import tempfile import unittest @@ -85,12 +84,6 @@ def test_max_abs_scaler(self): np.testing.assert_allclose(model.max_abs_values, loaded_model.max_abs_values) assert model.n_samples_seen == loaded_model.n_samples_seen - # Test loading core model as scikit-learn model - with open(os.path.join(model_path, "MaxAbsScalerModel.sklearn.pkl"), "rb") as f: - sk_model = pickle.load(f) - sk_result = sk_model.transform(np.stack(list(local_df1.features))) - np.testing.assert_allclose(sk_result, expected_result) - def test_standard_scaler(self): df1 = self.spark.createDataFrame( [ @@ -141,12 +134,6 @@ def test_standard_scaler(self): np.testing.assert_allclose(model.scale_values, loaded_model.scale_values) assert model.n_samples_seen == loaded_model.n_samples_seen - # Test loading core model as scikit-learn model - with open(os.path.join(model_path, "StandardScalerModel.sklearn.pkl"), "rb") as f: - sk_model = pickle.load(f) - sk_result = sk_model.transform(np.stack(list(local_df1.features))) - np.testing.assert_allclose(sk_result, expected_result) - def test_array_assembler(self): spark_df = self.spark.createDataFrame( [