Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 39 additions & 35 deletions python/pyspark/ml/connect/feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
# limitations under the License.
#

import pickle
from typing import Any, Union, List, Tuple, Callable, Dict, Optional

import numpy as np
import pandas as pd
import pyarrow as pa

from pyspark import keyword_only
from pyspark.sql import DataFrame
Expand Down Expand Up @@ -133,27 +133,29 @@ def map_value(x: "np.ndarray") -> "np.ndarray":
return transform_fn

def _get_core_model_filename(self) -> str:
return self.__class__.__name__ + ".sklearn.pkl"
return self.__class__.__name__ + ".arrow.parquet"

def _save_core_model(self, path: str) -> None:
from sklearn.preprocessing import MaxAbsScaler as sk_MaxAbsScaler

sk_model = sk_MaxAbsScaler()
sk_model.scale_ = self.scale_values
sk_model.max_abs_ = self.max_abs_values
sk_model.n_features_in_ = len(self.max_abs_values) # type: ignore[arg-type]
sk_model.n_samples_seen_ = self.n_samples_seen

with open(path, "wb") as fp:
pickle.dump(sk_model, fp)
import pyarrow.parquet as pq

table = pa.Table.from_arrays(
[
pa.array([self.scale_values], pa.list_(pa.float64())),
pa.array([self.max_abs_values], pa.list_(pa.float64())),
pa.array([self.n_samples_seen], pa.int64()),
],
names=["scale", "max_abs", "n_samples"],
)
pq.write_table(table, path)

def _load_core_model(self, path: str) -> None:
with open(path, "rb") as fp:
sk_model = pickle.load(fp)
import pyarrow.parquet as pq

table = pq.read_table(path)

self.max_abs_values = sk_model.max_abs_
self.scale_values = sk_model.scale_
self.n_samples_seen = sk_model.n_samples_seen_
self.max_abs_values = np.array(table.column("scale")[0].as_py())
self.scale_values = np.array(table.column("max_abs")[0].as_py())
self.n_samples_seen = table.column("n_samples")[0].as_py()


class StandardScaler(Estimator, HasInputCol, HasOutputCol, ParamsReadWrite):
Expand Down Expand Up @@ -254,29 +256,31 @@ def map_value(x: "np.ndarray") -> "np.ndarray":
return transform_fn

def _get_core_model_filename(self) -> str:
return self.__class__.__name__ + ".sklearn.pkl"
return self.__class__.__name__ + ".arrow.parquet"

def _save_core_model(self, path: str) -> None:
from sklearn.preprocessing import StandardScaler as sk_StandardScaler

sk_model = sk_StandardScaler(with_mean=True, with_std=True)
sk_model.scale_ = self.scale_values
sk_model.var_ = self.std_values * self.std_values # type: ignore[operator]
sk_model.mean_ = self.mean_values
sk_model.n_features_in_ = len(self.std_values) # type: ignore[arg-type]
sk_model.n_samples_seen_ = self.n_samples_seen

with open(path, "wb") as fp:
pickle.dump(sk_model, fp)
import pyarrow.parquet as pq

table = pa.Table.from_arrays(
[
pa.array([self.scale_values], pa.list_(pa.float64())),
pa.array([self.mean_values], pa.list_(pa.float64())),
pa.array([self.std_values], pa.list_(pa.float64())),
pa.array([self.n_samples_seen], pa.int64()),
],
names=["scale", "mean", "std", "n_samples"],
)
pq.write_table(table, path)

def _load_core_model(self, path: str) -> None:
with open(path, "rb") as fp:
sk_model = pickle.load(fp)
import pyarrow.parquet as pq

table = pq.read_table(path)

self.std_values = np.sqrt(sk_model.var_)
self.scale_values = sk_model.scale_
self.mean_values = sk_model.mean_
self.n_samples_seen = sk_model.n_samples_seen_
self.scale_values = np.array(table.column("scale")[0].as_py())
self.mean_values = np.array(table.column("mean")[0].as_py())
self.std_values = np.array(table.column("std")[0].as_py())
self.n_samples_seen = table.column("n_samples")[0].as_py()


class ArrayAssembler(
Expand Down
13 changes: 0 additions & 13 deletions python/pyspark/ml/tests/connect/test_legacy_mode_feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
#

import os
import pickle
import tempfile
import unittest

Expand Down Expand Up @@ -85,12 +84,6 @@ def test_max_abs_scaler(self):
np.testing.assert_allclose(model.max_abs_values, loaded_model.max_abs_values)
assert model.n_samples_seen == loaded_model.n_samples_seen

# Test loading core model as scikit-learn model
with open(os.path.join(model_path, "MaxAbsScalerModel.sklearn.pkl"), "rb") as f:
sk_model = pickle.load(f)
sk_result = sk_model.transform(np.stack(list(local_df1.features)))
np.testing.assert_allclose(sk_result, expected_result)

def test_standard_scaler(self):
df1 = self.spark.createDataFrame(
[
Expand Down Expand Up @@ -141,12 +134,6 @@ def test_standard_scaler(self):
np.testing.assert_allclose(model.scale_values, loaded_model.scale_values)
assert model.n_samples_seen == loaded_model.n_samples_seen

# Test loading core model as scikit-learn model
with open(os.path.join(model_path, "StandardScalerModel.sklearn.pkl"), "rb") as f:
sk_model = pickle.load(f)
sk_result = sk_model.transform(np.stack(list(local_df1.features)))
np.testing.assert_allclose(sk_result, expected_result)

def test_array_assembler(self):
spark_df = self.spark.createDataFrame(
[
Expand Down