Skip to content

Commit 6e861d0

Browse files
sfc-gh-jkewsfc-gh-joshisfc-gh-mvashishtha
authored
FEAT-#7676: in-place casting between DataFrame engines (#7666)
Addresses a performance issue with hybrid execution where a sequence of merge operations using the same DataFrames could result in transfer thrashing. In this PR we have the arguments of the operation move in-place so that subsequent operations all stay on the same backend. This behavior can be turned off by setting the `BackendMergeCastInPlace` variable to false. <!-- Please give a short brief about these changes. --> - [x] first commit message and PR title follow format outlined [here](https://modin.readthedocs.io/en/latest/development/contributing.html#commit-message-formatting) > **_NOTE:_** If you edit the PR title to match this format, you need to add another commit (even if it's empty) or amend your last commit for the CI job that checks the PR title to pick up the new PR title. - [x] passes `flake8 modin/ asv_bench/benchmarks scripts/doc_checker.py` - [x] passes `black --check modin/ asv_bench/benchmarks scripts/doc_checker.py` - [x] signed commit with `git commit -s` <!-- you can amend your commit with a signature via `git commit -amend -s` --> - [x] Resolves #7676 <!-- issue must be created for each patch --> - [x] tests added and passing - [x] module layout described at `docs/development/architecture.rst` is up-to-date <!-- if you have added, renamed or removed files or directories please update the documentation accordingly --> --------- Co-authored-by: Jonathan Shi <149419494+sfc-gh-joshi@users.noreply.github.com> Co-authored-by: Mahesh Vashishtha <mahesh.vashishtha@snowflake.com>
1 parent 86107d4 commit 6e861d0

File tree

8 files changed

+115
-22
lines changed

8 files changed

+115
-22
lines changed

modin/config/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
AsyncReadMode,
2020
AutoSwitchBackend,
2121
Backend,
22+
BackendMergeCastInPlace,
2223
BenchmarkMode,
2324
CIAWSAccessKeyID,
2425
CIAWSSecretAccessKey,
@@ -78,6 +79,7 @@
7879
"GpuCount",
7980
"Memory",
8081
"Backend",
82+
"BackendMergeCastInPlace",
8183
"Execution",
8284
"AutoSwitchBackend",
8385
"ShowBackendSwitchProgress",

modin/config/envvars.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1385,6 +1385,30 @@ def disable(cls) -> None:
13851385
cls.put(False)
13861386

13871387

1388+
class BackendMergeCastInPlace(EnvironmentVariable, type=bool):
1389+
"""
1390+
Whether to cast a DataFrame in-place when performing a merge when using hybrid mode.
1391+
1392+
This flag modifies the behavior of a cast performed on operations involving more
1393+
than one type of query compiler. If enabled the actual cast will be performed in-place
1394+
and the input DataFrame will have a new backend. If disabled the original DataFrame
1395+
will remain on the same underlying engine.
1396+
"""
1397+
1398+
varname = "MODIN_BACKEND_MERGE_CAST_IN_PLACE"
1399+
default = True
1400+
1401+
@classmethod
1402+
def enable(cls) -> None:
1403+
"""Enable casting in place when performing a merge operation betwen two different compilers."""
1404+
cls.put(True)
1405+
1406+
@classmethod
1407+
def disable(cls) -> None:
1408+
"""Disable casting in place when performing a merge operation betwen two different compilers."""
1409+
cls.put(False)
1410+
1411+
13881412
class DynamicPartitioning(EnvironmentVariable, type=bool):
13891413
"""
13901414
Set to true to use Modin's dynamic-partitioning implementation where possible.

modin/core/storage_formats/pandas/query_compiler.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2717,6 +2717,8 @@ def fillna(self, **kwargs):
27172717
full_axis = method is not None or limit is not None
27182718
new_dtypes = None
27192719
if isinstance(value, BaseQueryCompiler):
2720+
# This code assumes that the operation occurs with the same query compiler
2721+
assert isinstance(value, PandasQueryCompiler)
27202722
if squeeze_self:
27212723
# Self is a Series type object
27222724
if full_axis:

modin/core/storage_formats/pandas/query_compiler_caster.py

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
from pandas.core.indexes.frozen import FrozenList
3232
from typing_extensions import Self
3333

34-
from modin.config import AutoSwitchBackend, Backend
34+
from modin.config import AutoSwitchBackend, Backend, BackendMergeCastInPlace
3535
from modin.config import context as config_context
3636
from modin.core.storage_formats.base.query_compiler import (
3737
BaseQueryCompiler,
@@ -92,12 +92,12 @@ def _normalize_class_name(class_of_wrapped_fn: Optional[str]) -> str:
9292
"_getattr__from_extension_impl",
9393
"get_backend",
9494
"move_to",
95-
"_update_inplace",
9695
"set_backend",
9796
"_get_extension",
9897
"_query_compiler",
9998
"_get_query_compiler",
10099
"_copy_into",
100+
"_update_inplace",
101101
"is_backend_pinned",
102102
"_set_backend_pinned",
103103
"pin_backend",
@@ -121,6 +121,7 @@ def _normalize_class_name(class_of_wrapped_fn: Optional[str]) -> str:
121121
"_set_backend_pinned",
122122
"pin_backend",
123123
"unpin_backend",
124+
"_update_inplace",
124125
}
125126

126127

@@ -1120,10 +1121,20 @@ def cast_to_qc(arg):
11201121
and arg.get_backend() != result_backend
11211122
):
11221123
return arg
1123-
cast = arg.set_backend(
1124-
result_backend,
1125-
switch_operation=f"{_normalize_class_name(class_of_wrapped_fn)}.{name}",
1126-
)
1124+
if BackendMergeCastInPlace.get():
1125+
arg.set_backend(
1126+
result_backend,
1127+
switch_operation=f"{_normalize_class_name(class_of_wrapped_fn)}.{name}",
1128+
inplace=True,
1129+
)
1130+
assert arg.get_backend() == result_backend
1131+
cast = arg
1132+
else:
1133+
cast = arg.set_backend(
1134+
result_backend,
1135+
switch_operation=f"{_normalize_class_name(class_of_wrapped_fn)}.{name}",
1136+
inplace=False,
1137+
)
11271138
inplace_update_trackers.append(
11281139
InplaceUpdateTracker(
11291140
input_castable=arg,
@@ -1156,7 +1167,7 @@ def cast_to_qc(arg):
11561167
new_castable,
11571168
) in inplace_update_trackers:
11581169
new_qc = new_castable._get_query_compiler()
1159-
if original_qc is not new_qc:
1170+
if BackendMergeCastInPlace.get() or original_qc is not new_qc:
11601171
new_castable._copy_into(original_castable)
11611172

11621173
return _maybe_switch_backend_post_op(

modin/pandas/series.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2925,6 +2925,17 @@ def set_backend(
29252925
*,
29262926
switch_operation: Optional[str] = None,
29272927
) -> Optional[Self]:
2928+
# A series which is moved, potentially without its parent needs to
2929+
# have it's parent reset. This is aligned with CoW chained assigment
2930+
# semantics as well, but it is a little different from existing modin
2931+
# semantics. This is why we only do this for hybrid and inplace
2932+
# modification.
2933+
if (
2934+
inplace
2935+
and self._parent is not None
2936+
and backend != self._parent.get_backend()
2937+
):
2938+
self._parent = None
29282939
return super().set_backend(
29292940
backend=backend, inplace=inplace, switch_operation=switch_operation
29302941
)

modin/tests/pandas/extensions/test_pd_extensions.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,10 @@ def python_concat(*args, **kwargs):
163163
== "pandas_concat_result"
164164
)
165165

166+
# With inplace casting we need to reset the original dataframes
167+
modin_on_pandas_df.move_to("Pandas", inplace=True)
168+
modin_on_python_df.move_to("Python_Test", inplace=True)
169+
166170
assert (
167171
pd.concat([modin_on_python_df, modin_on_pandas_df])
168172
== "python_concat_result"

modin/tests/pandas/native_df_interoperability/test_compiler_caster.py

Lines changed: 39 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -413,7 +413,8 @@ def test_two_same_backend(pico_df):
413413

414414
def test_cast_to_second_backend_with_concat(pico_df, cluster_df, caplog):
415415
with caplog.at_level(level=logging.INFO, logger=DEFAULT_LOGGER_NAME):
416-
df3 = pd.concat([pico_df, cluster_df], axis=1)
416+
# We have to copy the input dataframes because of inplace merging
417+
df3 = pd.concat([pico_df.copy(), cluster_df.copy()], axis=1)
417418
assert pico_df.get_backend() == "Pico"
418419
assert cluster_df.get_backend() == "Cluster"
419420
assert df3.get_backend() == "Cluster" # result should be on cluster
@@ -431,7 +432,10 @@ def test_cast_to_second_backend_with_concat_uses_second_backend_api_override(
431432
register_pd_accessor(name="concat", backend="Cluster")(
432433
lambda *args, **kwargs: "custom_concat_result"
433434
)
434-
assert pd.concat([pico_df, cluster_df], axis=1) == "custom_concat_result"
435+
# copy dataframes for concat to allow for in-place merging
436+
assert (
437+
pd.concat([pico_df.copy(), cluster_df.copy()], axis=1) == "custom_concat_result"
438+
)
435439
assert pico_df.get_backend() == "Pico"
436440
assert cluster_df.get_backend() == "Cluster"
437441

@@ -449,14 +453,16 @@ def test_moving_pico_to_cluster_in_place_calls_set_backend_only_once_github_issu
449453

450454
def test_cast_to_second_backend_with___init__(pico_df, cluster_df):
451455
df3 = pd.DataFrame({"pico": pico_df.iloc[:, 0], "cluster": cluster_df.iloc[:, 0]})
452-
assert pico_df.get_backend() == "Pico"
456+
assert (
457+
pico_df.get_backend() == "Pico"
458+
) # pico stays despite in-place casting by iloc
453459
assert cluster_df.get_backend() == "Cluster"
454460
assert df3.get_backend() == "Cluster" # result should be on cluster
455461

456462

457463
def test_cast_to_first_backend(pico_df, cluster_df):
458464
df3 = pd.concat([cluster_df, pico_df], axis=1)
459-
assert pico_df.get_backend() == "Pico"
465+
assert pico_df.get_backend() == "Cluster" # pico_df was cast in place by concat
460466
assert cluster_df.get_backend() == "Cluster"
461467
assert df3.get_backend() == cluster_df.get_backend() # result should be on cluster
462468

@@ -468,7 +474,7 @@ def test_cast_to_first_backend_with_concat_uses_first_backend_api_override(
468474
lambda *args, **kwargs: "custom_concat_result"
469475
)
470476
assert pd.concat([cluster_df, pico_df], axis=1) == "custom_concat_result"
471-
assert pico_df.get_backend() == "Pico"
477+
assert pico_df.get_backend() == "Cluster" # pico was cast in place by concat
472478
assert cluster_df.get_backend() == "Cluster"
473479

474480

@@ -479,7 +485,7 @@ def test_cast_to_first_backend_with___init__(pico_df, cluster_df):
479485
"pico": pico_df.iloc[:, 0],
480486
}
481487
)
482-
assert pico_df.get_backend() == "Pico"
488+
assert pico_df.get_backend() == "Pico" # Pico not cast in place by iloc
483489
assert cluster_df.get_backend() == "Cluster"
484490
assert df3.get_backend() == "Cluster" # result should be on cluster
485491

@@ -557,31 +563,33 @@ def test_two_two_qc_types_default_rhs(default_df, cluster_df):
557563
# so we default to the caller
558564
df3 = pd.concat([default_df, cluster_df], axis=1)
559565
assert default_df.get_backend() == "Test_casting_default"
560-
assert cluster_df.get_backend() == "Cluster"
566+
assert (
567+
cluster_df.get_backend() == "Test_casting_default"
568+
) # in place cast to default by concat
561569
assert df3.get_backend() == default_df.get_backend() # should move to default
562570

563571

564572
def test_two_two_qc_types_default_lhs(default_df, cluster_df):
565573
# none of the query compilers know about each other here
566574
# so we default to the caller
567575
df3 = pd.concat([cluster_df, default_df], axis=1)
568-
assert default_df.get_backend() == "Test_casting_default"
576+
assert default_df.get_backend() == "Cluster" # in place cast to Cluster by concat
569577
assert cluster_df.get_backend() == "Cluster"
570578
assert df3.get_backend() == cluster_df.get_backend() # should move to cluster
571579

572580

573581
def test_two_two_qc_types_default_2_rhs(default_df, cloud_df):
574582
# cloud knows a bit about costing; so we prefer moving to there
575583
df3 = pd.concat([default_df, cloud_df], axis=1)
576-
assert default_df.get_backend() == "Test_casting_default"
584+
assert default_df.get_backend() == "Cloud" # inplace cast to Cloud by concat
577585
assert cloud_df.get_backend() == "Cloud"
578586
assert df3.get_backend() == cloud_df.get_backend() # should move to cloud
579587

580588

581589
def test_two_two_qc_types_default_2_lhs(default_df, cloud_df):
582590
# cloud knows a bit about costing; so we prefer moving to there
583591
df3 = pd.concat([cloud_df, default_df], axis=1)
584-
assert default_df.get_backend() == "Test_casting_default"
592+
assert default_df.get_backend() == "Cloud" # inplace cast to Cloud by concat
585593
assert cloud_df.get_backend() == "Cloud"
586594
assert df3.get_backend() == cloud_df.get_backend() # should move to cloud
587595

@@ -651,6 +659,22 @@ def test_qc_mixed_loc(pico_df, cloud_df):
651659
assert cloud_df1[pico_df1[0][0]][pico_df1[0][1]] == 1
652660

653661

662+
def test_merge_in_place(default_df, lazy_df, cloud_df):
663+
# lazy_df tries to pawn off work on other engines
664+
df = default_df.merge(lazy_df)
665+
assert df.get_backend() is default_df.get_backend()
666+
# Both arguments now have the same qc type
667+
assert lazy_df.get_backend() is default_df.get_backend()
668+
669+
with config_context(BackendMergeCastInPlace=False):
670+
lazy_df = lazy_df.move_to("Lazy")
671+
cloud_df = cloud_df.move_to("Cloud")
672+
df = cloud_df.merge(lazy_df)
673+
assert type(df) is type(cloud_df)
674+
assert lazy_df.get_backend() == "Lazy"
675+
assert cloud_df.get_backend() == "Cloud"
676+
677+
654678
def test_information_asymmetry(default_df, cloud_df, eager_df, lazy_df):
655679
# normally, the default query compiler should be chosen
656680
# here, but since eager knows about default, but not
@@ -1487,7 +1511,11 @@ def test_groupby_apply_switches_for_small_input(
14871511
pandas_result = operation(pandas_df)
14881512
df_equals(modin_result, pandas_result)
14891513
assert modin_result.get_backend() == expected_backend
1490-
assert modin_df.get_backend() == expected_backend
1514+
if groupby_class == "DataFrameGroupBy":
1515+
assert modin_df.get_backend() == expected_backend
1516+
# The original dataframe does not move with the SeriesGroupBy
1517+
if groupby_class == "SeriesGroupBy":
1518+
assert modin_df.get_backend() == "Big_Data_Cloud"
14911519

14921520
def test_T_switches(self):
14931521
# Ensure that calling df.T triggers a switch (GH#7653)

modin/tests/pandas/native_df_interoperability/utils.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515

1616
from modin import set_execution
1717
from modin.config import Engine, StorageFormat
18+
from modin.config import context as config_context
19+
from modin.config.envvars import Backend
1820
from modin.tests.pandas.utils import (
1921
NoModinException,
2022
create_test_dfs,
@@ -43,9 +45,13 @@ def create_test_df_in_defined_mode(
4345

4446
if not isinstance(native, bool):
4547
raise ValueError("`native` should be True or False.")
46-
48+
hybrid_backend = "Pandas" if native else Backend.get()
4749
with switch_to_native_execution() if native else nullcontext():
48-
return create_test_dfs(*args, post_fn=post_fn, backend=backend, **kwargs)
50+
with config_context(AutoSwitchBackend=False, Backend=hybrid_backend):
51+
modin_df, pandas_df = create_test_dfs(
52+
*args, post_fn=post_fn, backend=backend, **kwargs
53+
)
54+
return modin_df, pandas_df
4955

5056

5157
def create_test_series_in_defined_mode(
@@ -56,8 +62,13 @@ def create_test_series_in_defined_mode(
5662
if not isinstance(native, bool):
5763
raise ValueError("`native` should be True or False.")
5864

65+
hybrid_backend = "Pandas" if native else "Ray"
5966
with switch_to_native_execution() if native else nullcontext():
60-
return create_test_series(vals, sort=sort, backend=backend, **kwargs)
67+
with config_context(AutoSwitchBackend=False, Backend=hybrid_backend):
68+
modin_ser, pandas_ser = create_test_series(
69+
vals, sort=sort, backend=backend, **kwargs
70+
)
71+
return modin_ser, pandas_ser
6172

6273

6374
def eval_general_interop(
@@ -110,7 +121,7 @@ def execute_callable(fn, inplace=False, md_kwargs={}, pd_kwargs={}):
110121
assert (
111122
type(md_e) is type(expected_exception)
112123
and md_e.args == expected_exception.args
113-
), f"not acceptable Modin's exception: [{repr(md_e)}]"
124+
), f"not acceptable Modin's exception: [{repr(md_e)}] expected {expected_exception}"
114125
assert (
115126
pd_e.args == expected_exception.args
116127
), f"not acceptable Pandas' exception: [{repr(pd_e)}]"

0 commit comments

Comments
 (0)