@@ -1722,6 +1722,7 @@ def shuffle_partitions(
17221722 index ,
17231723 shuffle_functions : "ShuffleFunctions" ,
17241724 final_shuffle_func ,
1725+ right_partitions = None ,
17251726 ):
17261727 """
17271728 Return shuffled partitions.
@@ -1736,6 +1737,9 @@ def shuffle_partitions(
17361737 An object implementing the functions that we will be using to perform this shuffle.
17371738 final_shuffle_func : Callable(pandas.DataFrame) -> pandas.DataFrame
17381739 Function that shuffles the data within each new partition.
1740+ right_partitions : np.ndarray, optional
1741+ Partitions to broadcast to `self` partitions. If specified, the method builds range-partitioning
1742+ for `right_partitions` basing on bins calculated for `partitions`, then performs broadcasting.
17391743
17401744 Returns
17411745 -------
@@ -1774,18 +1778,57 @@ def shuffle_partitions(
17741778 for partition in row_partitions
17751779 ]
17761780 ).T
1777- # We need to convert every partition that came from the splits into a full-axis column partition.
1778- new_partitions = [
1781+
1782+ if right_partitions is None :
1783+ # We need to convert every partition that came from the splits into a column partition.
1784+ return np .array (
1785+ [
1786+ [
1787+ cls ._column_partitions_class (
1788+ row_partition , full_axis = False
1789+ ).apply (final_shuffle_func )
1790+ ]
1791+ for row_partition in split_row_partitions
1792+ ]
1793+ )
1794+
1795+ right_row_parts = cls .row_partitions (right_partitions )
1796+ right_split_row_partitions = np .array (
1797+ [
1798+ partition .split (
1799+ shuffle_functions .split_fn ,
1800+ num_splits = num_bins ,
1801+ extract_metadata = False ,
1802+ )
1803+ for partition in right_row_parts
1804+ ]
1805+ ).T
1806+ return np .array (
17791807 [
17801808 cls ._column_partitions_class (row_partition , full_axis = False ).apply (
1781- final_shuffle_func
1809+ final_shuffle_func ,
1810+ other_axis_partition = cls ._column_partitions_class (
1811+ right_row_partitions
1812+ ),
1813+ )
1814+ for right_row_partitions , row_partition in zip (
1815+ right_split_row_partitions , split_row_partitions
17821816 )
17831817 ]
1784- for row_partition in split_row_partitions
1785- ]
1786- return np .array (new_partitions )
1818+ )
1819+
17871820 else :
17881821 # If there are not pivots we can simply apply the function row-wise
1822+ if right_partitions is None :
1823+ return np .array (
1824+ [row_part .apply (final_shuffle_func ) for row_part in row_partitions ]
1825+ )
1826+ right_row_parts = cls .row_partitions (right_partitions )
17891827 return np .array (
1790- [row_part .apply (final_shuffle_func ) for row_part in row_partitions ]
1828+ [
1829+ row_part .apply (
1830+ final_shuffle_func , other_axis_partition = right_row_part
1831+ )
1832+ for right_row_part , row_part in zip (right_row_parts , row_partitions )
1833+ ]
17911834 )
0 commit comments