Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion crates/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1942,7 +1942,12 @@ impl LazyFrame {
}

#[cfg(feature = "merge_sorted")]
pub fn merge_sorted<S>(self, other: LazyFrame, key: S) -> PolarsResult<LazyFrame>
pub fn merge_sorted<S>(
self,
other: LazyFrame,
key: S,
maintain_order: bool,
) -> PolarsResult<LazyFrame>
where
S: Into<PlSmallStr>,
{
Expand All @@ -1952,6 +1957,7 @@ impl LazyFrame {
input_left: Arc::new(self.logical_plan),
input_right: Arc::new(other.logical_plan),
key,
maintain_order,
};
Ok(LazyFrame::from_logical_plan(lp, self.opt_state))
}
Expand Down
1 change: 1 addition & 0 deletions crates/polars-mem-engine/src/planner/lp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -804,6 +804,7 @@ fn create_physical_plan_impl(
input_left,
input_right,
key,
maintain_order: _,
} => {
let (input_left, input_right) = state.with_new_branch(|new_state| {
(
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-plan/dsl-schema-hashes.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
"Dimension": "68880cdb10230df6c8c1632b073c80bd8ceb5c56a368c0cb438431ca9f3d3b31",
"DistinctOptionsDSL": "41be5ec69ef9a614f2b36ac5deadfecdea5cca847ae1ada9d4bc626ff52a5b38",
"DslFunction": "221f1a46a043c8ed54f57be981bf24509f04f5f91f0f08e0acc180d96f842ebf",
"DslPlan": "14caf5b73e69c4975ff3a57331891521ff5b78c96bbaf8d6cc9be57c82f3ea98",
"DslPlan": "037aeb1be892efd716c6934961e6df74dcd38815064b6d7efa72efe41e6e913d",
"Duration": "44999d59023085cbb592ce94b30d34f9b983081fc72bd6435a49bdf0869c0074",
"Duration2": "f251cb1bee2955a17c6defe1573bce21ddbe6cdf6eb9324a19cd37932ab29347",
"DynListLiteralValue": "2266a553cb4a943f7097f24539eaa802453cf8742675996215235bd682dec0e8",
Expand Down
3 changes: 2 additions & 1 deletion crates/polars-plan/src/dsl/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ pub enum DslPlan {
input_left: Arc<DslPlan>,
input_right: Arc<DslPlan>,
key: PlSmallStr,
maintain_order: bool,
},
IR {
// Keep the original Dsl around as we need that for serialization.
Expand Down Expand Up @@ -211,7 +212,7 @@ impl Clone for DslPlan {
#[cfg(feature = "pivot")]
Self::Pivot { input, on, on_columns, index, values, agg, separator, maintain_order, column_naming } => Self::Pivot { input: input.clone(), on: on.clone(), on_columns: on_columns.clone(), index: index.clone(), values: values.clone(), agg: agg.clone(), separator: separator.clone(), maintain_order: *maintain_order, column_naming: *column_naming },
#[cfg(feature = "merge_sorted")]
Self::MergeSorted { input_left, input_right, key } => Self::MergeSorted { input_left: input_left.clone(), input_right: input_right.clone(), key: key.clone() },
Self::MergeSorted { input_left, input_right, key, maintain_order } => Self::MergeSorted { input_left: input_left.clone(), input_right: input_right.clone(), key: key.clone(), maintain_order: *maintain_order },
Self::IR {node, dsl, version} => Self::IR {node: *node, dsl: dsl.clone(), version: *version},
}
}
Expand Down
5 changes: 5 additions & 0 deletions crates/polars-plan/src/dsl/serializable_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ pub(crate) enum SerializableDslPlanNode {
input_left: DslPlanKey,
input_right: DslPlanKey,
key: PlSmallStr,
maintain_order: bool,
},
IR {
dsl: DslPlanKey,
Expand Down Expand Up @@ -360,10 +361,12 @@ fn convert_dsl_plan_to_serializable_plan(
input_left,
input_right,
key,
maintain_order,
} => SP::MergeSorted {
input_left: dsl_plan_key(input_left, arenas),
input_right: dsl_plan_key(input_right, arenas),
key: key.clone(),
maintain_order: *maintain_order,
},
DP::IR {
dsl,
Expand Down Expand Up @@ -608,10 +611,12 @@ fn try_convert_serializable_plan_to_dsl_plan(
input_left,
input_right,
key,
maintain_order,
} => Ok(DP::MergeSorted {
input_left: get_dsl_plan(*input_left, ser_dsl_plan, arenas)?,
input_right: get_dsl_plan(*input_right, ser_dsl_plan, arenas)?,
key: key.clone(),
maintain_order: *maintain_order,
}),
SP::IR {
dsl: dsl_key,
Expand Down
2 changes: 2 additions & 0 deletions crates/polars-plan/src/plans/conversion/dsl_to_ir/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1502,6 +1502,7 @@ pub fn to_alp_impl(lp: DslPlan, ctxt: &mut DslConversionContext) -> PolarsResult
input_left,
input_right,
key,
maintain_order,
} => {
let input_left = to_alp_impl(owned(input_left), ctxt)
.map_err(|e| e.context(failed_here!(merge_sorted)))?;
Expand All @@ -1523,6 +1524,7 @@ pub fn to_alp_impl(lp: DslPlan, ctxt: &mut DslConversionContext) -> PolarsResult
input_left,
input_right,
key,
maintain_order,
}
},
DslPlan::IR { node, dsl, version } => {
Expand Down
1 change: 1 addition & 0 deletions crates/polars-plan/src/plans/ir/dot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ impl<'a> IRDotDisplay<'a> {
input_left,
input_right,
key,
..
} => {
recurse!(*input_left);
recurse!(*input_right);
Expand Down
2 changes: 2 additions & 0 deletions crates/polars-plan/src/plans/ir/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ impl<'a> IRDisplay<'a> {
input_left,
input_right,
key: _,
..
} => {
write_ir_non_recursive(f, ir_node, self.lp.expr_arena, output_schema, indent)?;
write!(f, ":")?;
Expand Down Expand Up @@ -1002,6 +1003,7 @@ pub fn write_ir_non_recursive(
input_left: _,
input_right: _,
key,
..
} => write!(f, "{:indent$}MERGE SORTED ON '{key}'", ""),
IR::Invalid => write!(f, "{:indent$}INVALID", ""),
}
Expand Down
1 change: 1 addition & 0 deletions crates/polars-plan/src/plans/ir/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ pub enum IR {
input_left: Node,
input_right: Node,
key: PlSmallStr,
maintain_order: bool,
},
#[default]
Invalid,
Expand Down
1 change: 1 addition & 0 deletions crates/polars-plan/src/plans/ir/tree_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ impl<'a> TreeFmtNode<'a> {
input_left,
input_right,
key,
..
} => ND(
wh(h, &format!("MERGE SORTED ON '{key}")),
[self.lp_node(Some("LEFT PLAN:".to_string()), *input_left)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,7 @@ impl ProjectionPushDown {
input_left,
input_right,
key,
maintain_order,
} => {
if ctx.has_pushed_down() {
// make sure that the filter column is projected
Expand All @@ -792,6 +793,7 @@ impl ProjectionPushDown {
input_left,
input_right,
key,
maintain_order,
})
},
Invalid => unreachable!(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,7 @@ impl SimplifyIRNodeOrder<'_> {
input_left,
input_right,
key: _,
..
} => {
let ([in_edge_lhs, in_edge_rhs], [out_edge]) = unpack_edges!(3);

Expand Down
2 changes: 2 additions & 0 deletions crates/polars-plan/src/plans/visitor/hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,10 @@ impl Hash for IRHashWrap<'_> {
input_left: _,
input_right: _,
key,
maintain_order,
} => {
key.hash(state);
maintain_order.hash(state);
},
IR::Invalid => unreachable!(),
}
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-python/src/lazyframe/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1503,12 +1503,12 @@ impl PyLazyFrame {
}

#[cfg(feature = "merge_sorted")]
fn merge_sorted(&self, other: Self, key: &str) -> PyResult<Self> {
fn merge_sorted(&self, other: Self, key: &str, maintain_order: bool) -> PyResult<Self> {
let out = self
.ldf
.read()
.clone()
.merge_sorted(other.ldf.into_inner(), key)
.merge_sorted(other.ldf.into_inner(), key, maintain_order)
.map_err(PyPolarsErr::from)?;
Ok(out.into())
}
Expand Down
4 changes: 4 additions & 0 deletions crates/polars-python/src/lazyframe/visitor/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,8 @@ pub struct MergeSorted {
input_right: usize,
#[pyo3(get)]
key: String,
#[pyo3(get)]
maintain_order: bool,
}

#[pyclass(frozen)]
Expand Down Expand Up @@ -744,10 +746,12 @@ pub(crate) fn into_py(py: Python<'_>, plan: &IR) -> PyResult<Py<PyAny>> {
input_left,
input_right,
key,
maintain_order,
} => MergeSorted {
input_left: input_left.0,
input_right: input_right.0,
key: key.to_string(),
maintain_order: *maintain_order,
}
.into_py_any(py),
IR::Invalid => Err(PyNotImplementedError::new_err("Invalid")),
Expand Down
27 changes: 24 additions & 3 deletions crates/polars-stream/src/nodes/merge_sorted.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,22 @@ pub struct MergeSortedNode {

starting_nulls: bool,

maintain_order: bool,

// Not yet merged buffers.
left_unmerged: VecDeque<DataFrame>,
right_unmerged: VecDeque<DataFrame>,
}

impl MergeSortedNode {
pub fn new() -> Self {
pub fn new(maintain_order: bool) -> Self {
Self {
seq: MorselSeq::default(),

starting_nulls: false,

maintain_order,

left_unmerged: VecDeque::new(),
right_unmerged: VecDeque::new(),
}
Expand All @@ -42,6 +46,7 @@ fn find_mergeable(

is_first: bool,
starting_nulls: &mut bool,
maintain_order: bool,
) -> PolarsResult<Option<(DataFrame, DataFrame)>> {
fn first_non_empty(vd: &mut VecDeque<DataFrame>) -> Option<DataFrame> {
let mut df = vd.pop_front()?;
Expand Down Expand Up @@ -133,13 +138,26 @@ fn find_mergeable(
} else if left_key_last.lt(&right_key_last)?.all() {
// @TODO: This is essentially search sorted, but that does not
// support categoricals at moment.
let gt_mask = right_key.gt(&left_key_last)?;
right_cutoff = gt_mask.first_true_idx().unwrap_or(gt_mask.len());
if maintain_order {
// When maintaining order, hold back right-side rows with keys
// equal to left's max, since more left rows with that key may
// arrive in later morsels.
let gte_mask = right_key.gt_eq(&left_key_last)?;
right_cutoff = gte_mask.first_true_idx().unwrap_or(gte_mask.len());
} else {
let gt_mask = right_key.gt(&left_key_last)?;
right_cutoff = gt_mask.first_true_idx().unwrap_or(gt_mask.len());
}
} else if left_key_last.gt(&right_key_last)?.all() {
// @TODO: This is essentially search sorted, but that does not
// support categoricals at moment.
let gt_mask = left_key.gt(&right_key_last)?;
left_cutoff = gt_mask.first_true_idx().unwrap_or(gt_mask.len());
} else if maintain_order {
// Keys are equal at both maxima. Hold back right-side rows with
// keys equal to the shared maximum to ensure left-biased ordering.
let gte_mask = right_key.gt_eq(&left_key_last)?;
right_cutoff = gte_mask.first_true_idx().unwrap_or(gte_mask.len());
}

let left_mergeable: DataFrame;
Expand Down Expand Up @@ -235,6 +253,7 @@ impl ComputeNode for MergeSortedNode {

let seq = &mut self.seq;
let starting_nulls = &mut self.starting_nulls;
let maintain_order = self.maintain_order;
let left_unmerged = &mut self.left_unmerged;
let right_unmerged = &mut self.right_unmerged;

Expand Down Expand Up @@ -319,6 +338,7 @@ impl ComputeNode for MergeSortedNode {
right_unmerged,
seq.to_u64() == 0,
starting_nulls,
maintain_order,
)? {
let left_mergeable =
Morsel::new(left_mergeable, *seq, source_token.clone());
Expand Down Expand Up @@ -379,6 +399,7 @@ impl ComputeNode for MergeSortedNode {
right_unmerged,
seq.to_u64() == 0,
starting_nulls,
maintain_order,
)? {
let left_mergeable =
Morsel::new(left_mergeable, *seq, source_token.clone());
Expand Down
1 change: 1 addition & 0 deletions crates/polars-stream/src/physical_plan/fmt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -804,6 +804,7 @@ fn visualize_plan_rec(
PhysNodeKind::MergeSorted {
input_left,
input_right,
..
} => ("merge-sorted".to_string(), &[*input_left, *input_right][..]),
#[cfg(feature = "ewma")]
PhysNodeKind::EwmMean { input, options: _ } => ("ewm-mean".to_string(), &[*input][..]),
Expand Down
3 changes: 3 additions & 0 deletions crates/polars-stream/src/physical_plan/lower_ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,10 +331,12 @@ pub fn lower_ir(
input_left,
input_right,
key,
maintain_order,
} => {
let input_left = *input_left;
let input_right = *input_right;
let key = key.clone();
let maintain_order = *maintain_order;

let mut phys_left = lower_ir!(input_left)?;
let mut phys_right = lower_ir!(input_right)?;
Expand Down Expand Up @@ -379,6 +381,7 @@ pub fn lower_ir(
PhysNodeKind::MergeSorted {
input_left: phys_left,
input_right: phys_right,
maintain_order,
}
},

Expand Down
1 change: 1 addition & 0 deletions crates/polars-stream/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,7 @@ pub enum PhysNodeKind {
MergeSorted {
input_left: PhysStream,
input_right: PhysStream,
maintain_order: bool,
},

#[cfg(feature = "ewma")]
Expand Down
3 changes: 2 additions & 1 deletion crates/polars-stream/src/physical_plan/to_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1318,11 +1318,12 @@ fn to_graph_rec<'a>(
MergeSorted {
input_left,
input_right,
maintain_order,
} => {
let left_input_key = to_graph_rec(input_left.node, ctx)?;
let right_input_key = to_graph_rec(input_right.node, ctx)?;
ctx.graph.add_node(
nodes::merge_sorted::MergeSortedNode::new(),
nodes::merge_sorted::MergeSortedNode::new(*maintain_order),
[
(left_input_key, input_left.port),
(right_input_key, input_right.port),
Expand Down
4 changes: 3 additions & 1 deletion py-polars/src/polars/_plr.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1154,7 +1154,9 @@ class PyLazyFrame:
def collect_schema(self) -> dict[str, Any]: ...
def unnest(self, columns: PySelector, separator: str | None) -> PyLazyFrame: ...
def count(self) -> PyLazyFrame: ...
def merge_sorted(self, other: PyLazyFrame, key: str) -> PyLazyFrame: ...
def merge_sorted(
self, other: PyLazyFrame, key: str, maintain_order: bool
) -> PyLazyFrame: ...
def hint_sorted(
self, columns: list[str], descending: list[bool], nulls_last: list[bool]
) -> PyLazyFrame: ...
Expand Down
Loading
Loading