perf: Add native streaming interpolate#27185
perf: Add native streaming interpolate#27185coastalwhite wants to merge 4 commits intopola-rs:mainfrom
interpolate#27185Conversation
| let source_token = SourceToken::new(); | ||
|
|
||
| let Some(recv) = recv else { | ||
| // Input exhausted. Flush pending trailing nulls as actual nulls — no right endpoint |
There was a problem hiding this comment.
AI emdash detected
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #27185 +/- ##
==========================================
- Coverage 81.80% 81.22% -0.58%
==========================================
Files 1816 1817 +1
Lines 250592 250730 +138
Branches 3144 3144
==========================================
- Hits 204985 203646 -1339
- Misses 44803 46280 +1477
Partials 804 804 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
xref: pola-rs#20947. This adds a native interpolate node for the streaming engine. This node is heavily based on the `BackwardFillNode`, but needs a few small adjustments.
5c903c5 to
c6e5c82
Compare
| @given( | ||
| data=series( | ||
| name="a", | ||
| allowed_dtypes=[ |
There was a problem hiding this comment.
Nit: @given(data=series(name="a", allowed_dtypes=NUMERIC_DTYPES))
Or is there a reason to not exclude pl.{U,}Int128? When I test it it works locally.
| let pending = *pending_nulls; | ||
| let mut send = send.serial(); | ||
| join_handles.push(scope.spawn_task(TaskPriority::High, async move { | ||
| let morsel_size = get_ideal_morsel_size(); |
There was a problem hiding this comment.
Please fix: Could you check stop_requested on the source token (and update pending)? I am concerned that some user might input a very weird dataframe at some point (with many nulls at the end).
|
|
||
| // Parallel worker threads. | ||
| for (mut send, mut recv) in senders.into_iter().zip(distr_receivers) { | ||
| let source_token = source_token.clone(); |
There was a problem hiding this comment.
Please fix: This source_token should not be local, but it should be passed by the serial distributing task from the input morsels.
| if send.send(morsel).await.is_err() { | ||
| break; | ||
| } | ||
| wait_group.wait().await; |
There was a problem hiding this comment.
Hint: This is not strictly necessary, because this task does not generate more morsels than it consumes, but it also does not matter to wait either.
There was a problem hiding this comment.
I think my understanding is a bit off. We merged the pipelines into the serial input and then distributed them again over the pipelines. If we didn't have these wait groups, what is stopping two morsels from entering the same pipeline at the same time?
xref: #20947.
This adds a native interpolate node for the streaming engine. This node is heavily based on the
BackwardFillNode, but needs a few small adjustments.I added a parametric test to verify its behavior against the in-memory engine which uncovered #27184. So this also fixes #27184.
Used and verified the AI.