66
77import concurrent .futures
88from collections .abc import Iterator
9+ from functools import partial
910from multiprocessing .context import BaseContext
1011from typing import TYPE_CHECKING , Any , TypeVar
1112
2324 P = TypeVar ("P" , bound = pd .DataFrame | pd .Series [Any ])
2425
2526
27+ def apply_pandas_op_parallel (
28+ obj ,
29+ op ,
30+ n_processes : int ,
31+ progress : bool = False ,
32+ progress_nested : bool = False ,
33+ mp_context : BaseContext | None = None ,
34+ ):
35+ iterator = get_chunks (obj , n_chunks = n_processes )
36+ if progress :
37+ try :
38+ from tqdm .auto import tqdm
39+ except ImportError as exc :
40+ raise MissingOptionalDependencyError ( # noqa: TRY003
41+ "apply_pandas_op_parallel(..., progress=True)" , requirement = "tdqm"
42+ ) from exc
43+
44+ iterator = tqdm (iterator , desc = "submitting to pool" )
45+
46+ with concurrent .futures .ProcessPoolExecutor (
47+ max_workers = n_processes , mp_context = mp_context
48+ ) as pool :
49+ futures = [
50+ pool .submit (
51+ op ,
52+ chunk ,
53+ progress = progress_nested ,
54+ progress_bar_position = i ,
55+ )
56+ for i , chunk in enumerate (iterator )
57+ ]
58+
59+ iterator_results = concurrent .futures .as_completed (futures )
60+ if progress :
61+ iterator_results = tqdm (
62+ iterator_results ,
63+ desc = "Retrieving parallel results" ,
64+ total = len (futures ),
65+ )
66+
67+ res_l = [future .result () for future in iterator_results ]
68+
69+ # Late import to avoid hard dependency on pandas
70+ try :
71+ import pandas as pd
72+ except ImportError as exc :
73+ raise MissingOptionalDependencyError (
74+ "apply_pandas_op_parallel" , requirement = "pandas"
75+ ) from exc
76+
77+ # This assumes that the index isn't mangled.
78+ # Using pix.concat might be safer,
79+ # or we make the concatenation injectable.
80+ res = pd .concat (res_l )
81+
82+ return res
83+
84+
85+ def differentiate_parallel_helper (
86+ series : pd .Series [Timeseries ],
87+ progress : bool = False ,
88+ progress_bar_position : int = 0 ,
89+ ) -> pd .Series [Timeseries ]:
90+ if progress :
91+ try :
92+ from tqdm .auto import tqdm
93+ except ImportError as exc :
94+ raise MissingOptionalDependencyError ( # noqa: TRY003
95+ "dist(..., progress=True)" , requirement = "tdqm"
96+ ) from exc
97+
98+ tqdm_kwargs = dict (position = progress_bar_position )
99+ tqdm .pandas (** tqdm_kwargs )
100+ meth_to_call = "progress_map"
101+ # No-one knows why this is needed, but it is in jupyter notebooks
102+ print (end = " " )
103+
104+ else :
105+ meth_to_call = "map"
106+
107+ res = getattr (series , meth_to_call )(
108+ lambda x : x .differentiate (),
109+ # name="injectable?",
110+ )
111+
112+ return res
113+
114+
26115class SeriesCTAccessor :
27116 """
28117 [`pd.Series`][pandas.Series] accessors
@@ -89,7 +178,7 @@ def to_df(self, increase_resolution: int | None = None) -> pd.DataFrame:
89178 return df
90179
91180 # TODO: add this to DataFrame accessor to allow for time filtering in the middle
92- def to_sns_df (self , increase_resolution : int = 100 ):
181+ def to_sns_df (self , increase_resolution : int = 100 ) -> pd . DataFrame :
93182 # TODO: progress bar and parallelisation
94183 # TODO: time_units and out_units passing
95184 return (
@@ -102,6 +191,33 @@ def to_sns_df(self, increase_resolution: int = 100):
102191 .reset_index ()
103192 )
104193
194+ def differentiate (
195+ self ,
196+ # res_name: str = "ts",
197+ progress : bool = False ,
198+ progress_nested : bool = False ,
199+ n_processes : int = 1 ,
200+ mp_context : BaseContext | None = None ,
201+ ) -> pd .Series [Timeseries ]: # type: ignore
202+ if n_processes == 1 :
203+ res = differentiate_parallel_helper (
204+ self ._series ,
205+ progress = progress ,
206+ )
207+
208+ return res
209+
210+ res = apply_pandas_op_parallel (
211+ self ._series ,
212+ op = differentiate_parallel_helper ,
213+ n_processes = n_processes ,
214+ progress = progress ,
215+ progress_nested = progress_nested ,
216+ mp_context = mp_context ,
217+ )
218+
219+ return res
220+
105221 def plot (
106222 self ,
107223 label : str | tuple [str , ...] | None = None ,
@@ -215,8 +331,7 @@ def get_timeseries_parallel_helper(
215331 tqdm_kwargs = dict (position = progress_bar_position )
216332 tqdm .pandas (** tqdm_kwargs )
217333 meth_to_call = "progress_apply"
218- # No-one knows why this is needed, but it is
219- # jupyter notebooks
334+ # No-one knows why this is needed, but it is in jupyter notebooks
220335 print (end = " " )
221336
222337 else :
@@ -288,56 +403,21 @@ def to_timeseries( # noqa: PLR0913
288403
289404 return res
290405
291- # I think it should be possible to split out a
292- # `apply_pandas_op_parallel` or similar function.
293- iterator = get_chunks (self ._df , n_chunks = n_processes )
294- if progress :
295- try :
296- from tqdm .auto import tqdm
297- except ImportError as exc :
298- raise MissingOptionalDependencyError ( # noqa: TRY003
299- "to_timeseries(..., progress=True)" , requirement = "tdqm"
300- ) from exc
301-
302- iterator = tqdm (iterator , desc = "submitting to pool" )
303-
304- with concurrent .futures .ProcessPoolExecutor (
305- max_workers = n_processes , mp_context = mp_context
306- ) as pool :
307- futures = [
308- pool .submit (
309- get_timeseries_parallel_helper ,
310- chunk ,
311- interpolation = interpolation ,
312- time_units = time_units ,
313- units_col = units_col ,
314- idx_separator = idx_separator ,
315- ur = ur ,
316- progress = progress_nested ,
317- progress_bar_position = i ,
318- )
319- for i , chunk in enumerate (iterator )
320- ]
321-
322- iterator_results = concurrent .futures .as_completed (futures )
323- if progress :
324- iterator_results = tqdm (
325- iterator_results ,
326- desc = "Retrieving parallel results" ,
327- total = len (futures ),
328- )
329-
330- res_l = [future .result () for future in iterator_results ]
331-
332- # Late import to avoid hard dependency on pandas
333- try :
334- import pandas as pd
335- except ImportError as exc :
336- raise MissingOptionalDependencyError (
337- "interpolate" , requirement = "pandas"
338- ) from exc
339-
340- res = pd .concat (res_l )
406+ res = apply_pandas_op_parallel (
407+ self ._df ,
408+ op = partial (
409+ get_timeseries_parallel_helper ,
410+ interpolation = interpolation ,
411+ time_units = time_units ,
412+ units_col = units_col ,
413+ idx_separator = idx_separator ,
414+ ur = ur ,
415+ ),
416+ n_processes = n_processes ,
417+ progress = progress ,
418+ progress_nested = progress_nested ,
419+ mp_context = mp_context ,
420+ )
341421
342422 return res
343423
0 commit comments