diff --git a/dask_ml/feature_extraction/text.py b/dask_ml/feature_extraction/text.py index 9710d03a6..2c261fa8b 100644 --- a/dask_ml/feature_extraction/text.py +++ b/dask_ml/feature_extraction/text.py @@ -3,6 +3,8 @@ """ import itertools +import math +from typing import Literal, Optional import dask import dask.array as da @@ -13,8 +15,10 @@ import scipy.sparse import sklearn.base import sklearn.feature_extraction.text +import sparse from dask.delayed import Delayed from distributed import get_client, wait +from sklearn.base import BaseEstimator, OneToOneFeatureMixin, TransformerMixin from sklearn.utils.validation import check_is_fitted @@ -280,3 +284,97 @@ def _merge_vocabulary(*vocabularies): ) } return vocabulary + + +def _handle_zeros_in_scale(scale: da.Array) -> da.Array: + + constant_mask = scale < 10 * np.finfo(scale.dtype).eps + + scale = da.where(constant_mask, 1.0, scale) + + return scale + + +def _normalize(X: da.Array, norm: Literal["l2", "l1", "max"] = "l2") -> da.Array: + if norm not in ("l1", "l2", "max"): + raise ValueError("'%s' is not a supported norm" % norm) + + if norm == "l1": + norms = da.abs(X).sum(axis=1) + elif norm == "l2": + norms = da.sqrt(da.square(X).sum(axis=1)) + elif norm == "max": + norms = da.max(da.abs(X), axis=1) + + norms = _handle_zeros_in_scale(norms) + X = X / norms[:, np.newaxis] + + return X + + +def _get_lazy_row_count_as_dask_array(a: da.Array) -> da.Array: + chunk_len = a.map_blocks( + lambda a: np.asarray(a.shape[0], dtype=int).reshape(1, 1), + dtype=int, + chunks=tuple(len(c) * (1,) for c in a.chunks), + ) + + return chunk_len[:, 0].sum() + + +class TfidfTransformer( + OneToOneFeatureMixin, TransformerMixin, BaseEstimator, auto_wrap_output_keys=None +): + + def __init__( + self, + *, + norm: Optional[Literal["l2", "l1"]] = "l2", + use_idf: bool = True, + smooth_idf: bool = True, + sublinear_tf: bool = False, + ): + self.norm = norm + self.use_idf = use_idf + self.smooth_idf = smooth_idf + self.sublinear_tf = sublinear_tf + + def fit(self, X, y=None): + if not self.use_idf: + return self + + X = X.map_blocks(lambda a: sparse.as_coo(a).astype(np.float64)) + + n_samples = ( + X.shape[0] + if not math.isnan(X.shape[0]) + else _get_lazy_row_count_as_dask_array(X) + ) + + df = da.count_nonzero(X, axis=0) + + if self.smooth_idf: + n_samples += 1 + df += 1 + + idf_ = da.log(n_samples / df) + 1 + + self.idf_ = idf_.compute() + + return self + + def transform(self, X): + if self.use_idf: + check_is_fitted(self, "idf_") + + X = X.map_blocks(lambda a: sparse.as_coo(a).astype(np.float64)) + + if self.sublinear_tf: + X = da.where(X != 0, da.log(X) + 1, X) + + tf_idf = X * self.idf_ if self.use_idf else X + + if self.norm: + tf_idf = _normalize(tf_idf, norm=self.norm) + + return tf_idf diff --git a/tests/feature_extraction/test_text.py b/tests/feature_extraction/test_text.py index e833a30d3..2560fcb0c 100644 --- a/tests/feature_extraction/test_text.py +++ b/tests/feature_extraction/test_text.py @@ -188,3 +188,34 @@ def test_count_vectorizer_remote_vocabulary(): ) m.fit_transform(b) assert m.vocabulary_ is remote_vocabulary + + +@pytest.mark.parametrize("norm", [None, "l2", "l1"]) +@pytest.mark.parametrize("smooth_idf", [True, False]) +@pytest.mark.parametrize("sublinear_tf", [True, False]) +@pytest.mark.parametrize("use_idf", [True, False]) +@pytest.mark.parametrize("rechunk", [True, False]) +def test_tf_idf(norm, smooth_idf, sublinear_tf, use_idf, rechunk): + corpus = db.from_sequence(JUNK_FOOD_DOCS) + x_d = dask_ml.feature_extraction.text.CountVectorizer().fit_transform(corpus) + if rechunk: + x_d.compute_chunk_sizes() + x_d = x_d.rechunk(1, 6) + # forget chunk_sizes/shape, to trigger _get_lazy_row_count_as_dask_array + x_d._chunks = ((np.nan, np.nan, np.nan, np.nan, np.nan, np.nan), (6,)) + + d_tf_idf = dask_ml.feature_extraction.text.TfidfTransformer( + norm=norm, smooth_idf=smooth_idf, sublinear_tf=sublinear_tf, use_idf=use_idf + ) + d_tf_idf_result = d_tf_idf.fit_transform(x_d).compute() + + x_n = x_d.compute() + sk_tf_idf = sklearn.feature_extraction.text.TfidfTransformer( + norm=norm, smooth_idf=smooth_idf, sublinear_tf=sublinear_tf, use_idf=use_idf + ) + sk_tf_idf_result = sk_tf_idf.fit_transform(x_n) + + np.testing.assert_array_almost_equal( + d_tf_idf_result.todense().astype(np.float64), + sk_tf_idf_result.todense().astype(np.float64), + )