Skip to content

Validate the memory consumption in SPM created by multi level merge #17029

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

ding-young
Copy link
Contributor

@ding-young ding-young commented Aug 4, 2025

Which issue does this PR close?

Rationale for this change

In multi-level merge, we reserve estimated memory need for merging sorted spill files first, and bypass global memory pool when creating SortPreservingMergeStream(shortly SPM). The purpose of it is to ensure that we can finish SPM step without lacking memory by keeping worst case memory reservation til SPM ends.

  • grow merge_reservation based on max batch memory per spill file

    let mut memory_reservation = self.reservation.new_empty();
    // Don't account for existing streams memory
    // as we are not holding the memory for them
    let mut sorted_streams = mem::take(&mut self.sorted_streams);
    let (sorted_spill_files, buffer_size) = self
    .get_sorted_spill_files_to_merge(
    2,
    // we must have at least 2 streams to merge
    2_usize.saturating_sub(sorted_streams.len()),
    &mut memory_reservation,
    )?;

  • bypass global buffer pool (use unbounded memory pool)

    if !all_in_memory {
    // Don't track memory used by this stream as we reserve that memory by worst case sceneries
    // (reserving memory for the biggest batch in each stream)
    // TODO - avoid this hack as this can be broken easily when `SortPreservingMergeStream`
    // changes the implementation to use more/less memory
    builder = builder.with_bypass_mempool();
    } else {
    // If we are only merging in-memory streams, we need to use the memory reservation
    // because we don't know the maximum size of the batches in the streams
    builder = builder.with_reservation(self.reservation.new_empty());
    }

Since we use UnboundedMemoryPool as a trick, we don't validate whether this memory_reservation is the actual upper limit when SPM step for multi level merge. Therefore, we need to validate the memory consumption in SPM does not exceed the size of memory_reservation.

What changes are included in this PR?

This PR add check to SpillReadStream so that whenever a spill stream is polled, the memory size of the batch being read does not exceed max_record_batch_memory + margin. This allows us to detect cases where we made an incorrect (underestimated) memory reservation — for example, when the batch consumes more memory after the write-read cycle than originally expected.

This PR creates a separate GreedyMemoryPool size of memory_reservation instead of using UnboundedMemoryPool when merging spill files (and in-memory streams) on multi-level merge.

Are these changes tested?

Yes, and following tests related to spilling fail 😢
Maybe our previous worst-case memory estimation was wrong, but don't understand why at this point. We need more investigation here. I'll put more details in comments.

Are there any user-facing changes?

@github-actions github-actions bot added execution Related to the execution crate physical-plan Changes to the physical-plan crate labels Aug 4, 2025
@ding-young
Copy link
Contributor Author

List of failing tests

// fuzz_cases 
fuzz_cases::aggregate_fuzz::test_single_mode_aggregate_single_mode_aggregate_with_spill
fuzz_cases::spilling_fuzz_in_memory_constrained_env::test_aggregate_with_high_cardinality_with_limited_memory_and_different_sizes_of_record_batch
fuzz_cases::spilling_fuzz_in_memory_constrained_env::test_aggregate_with_high_cardinality_with_limited_memory_and_different_sizes_of_record_batch_and_changing_memory_reservation
fuzz_cases::spilling_fuzz_in_memory_constrained_env::test_aggregate_with_high_cardinality_with_limited_memory_and_different_sizes_of_record_batch_and_take_all_memory
fuzz_cases::spilling_fuzz_in_memory_constrained_env::test_aggregate_with_high_cardinality_with_limited_memory_and_large_record_batch
fuzz_cases::spilling_fuzz_in_memory_constrained_env::test_sort_with_limited_memory_and_large_record_batch
// memory_limit
memory_limit::test_stringview_external_sort 
memory_limit::test_stringview_external_sort

Possible cause?

There are two problem discovered while working on this PR.

First, we don't reserve memory for in-memory streams (since we don't know the batch size without polling it) when we build SortPreservingMergeStream with both in-memory streams and sorted spill files. The merge_reservation only accounts for worst case memory usage for sorted spill files, so this means that we'll bypass the global memory pool but does not check the memory consumed by in-memory streams.

Second, it seems like the actual memory usage reported by local memory pool exceeds the size of precomputed memory_reservation even when there are only sorted spill files to merge. Failing tests indicate this, and when I printed out the size of UnboundedMemoryPool, it seems like we underestimated the worst case memory consumption for merge.

There are 6 spill files to merge, and we reserved 3676800 bytes based on max record batch size for each spill file, but the peak used bytes of UnboundedMemoryPool are 28280376. Note that this test test_stringview_external_sort uses FairSpillPool as global memory pool, so I guess the usage log for UnboundedMemoryPool accounts for SPM in multi level merge.

// modify `with_bypass_mempool` in this PR to use UnbountedMemoryPool
// RUST_LOG=debug cargo test -p datafusion memory_limit::test_stringview_external_sort -- --exact --nocapture
[merge_sorted_runs_within_mem_limit] spill6, inmem0
[merge_sorted_runs_within_mem_limit] memory_reservation:3676800
create new merge sort with bypass mempool
[mem pool] 1394496 used
[mem pool] 2788992 used
[mem pool] 3639880 used
[mem pool] 4570768 used
[mem pool] 5965448 used
[mem pool] 7360128 used
[mem pool] 8754744 used
[mem pool] 10149360 used
[mem pool] 11543832 used
[mem pool] 12938304 used
[mem pool] 14332872 used
[mem pool] 15727440 used
[mem pool] 17122440 used
[mem pool] 18517440 used
[mem pool] 19912376 used
[mem pool] 21307312 used
[mem pool] 22701912 used
[mem pool] 24096512 used
[mem pool] 25491152 used
[mem pool] 26885792 used
[mem pool] 28280376 used

@2010YOUY01 @rluvaton
Could you help me check if the issue is really with estimation or memory management, rather than me logging things incorrectly? I'm not sure where this discrepancy is coming from, since it's not only the string array-related tests that are failing.

@rluvaton
Copy link
Contributor

rluvaton commented Aug 4, 2025

Note: sort spill remaining in memory batch while row_hash does not, also sort account for memory used by the SortPerservingMergeStream which row_hash does not.

How much is the difference for the fuzz tests I added that check memory constrained envs? as it only tests couple of simple columns that are easier to reason about.

The kernels that are used in the sort stream might over estimate and also note that even if you request X capacity you might get more than that.

@rluvaton
Copy link
Contributor

rluvaton commented Aug 4, 2025

I saw that SortPreservingMergeStream using different way to calculate memory which does not take into account the sliced data, see

let mut counted_buffers: HashSet<NonNull<u8>> = HashSet::new();

I think this is it

@2010YOUY01
Copy link
Contributor

If it fails, I think this approach will make the debugging very painful.

I have an alternative idea to make this validation more fine-grained:
Let's say there are 3 spills to merge, each has estimated max batch size 10M, 15M, 12M
Then we can only check during merging, each stream's batch size is always less than [10M, 15M, 12M]

Though this approach is less comprehensive, and can be a bit hacky when implementing (to directly extend operator for this check), but it can make trouble-shooting much easier.

@github-actions github-actions bot added the core Core DataFusion crate label Aug 8, 2025
@ding-young
Copy link
Contributor Author

Update: I took alternative approach similar to what @2010YOUY01 suggested.

I have an alternative idea to make this validation more fine-grained: Let's say there are 3 spills to merge, each has estimated max batch size 10M, 15M, 12M Then we can only check during merging, each stream's batch size is always less than [10M, 15M, 12M]

I switched back to using UnboundedMemoryPool, but instead added check to SpillReadStream so that whenever a spill stream is polled, the memory size of the batch being read does not exceed max_record_batch_memory. This allows us to detect cases where we made an incorrect (underestimated) memory reservation — for example, when the batch consumes more memory after the write-read cycle than originally expected.

There is a slight discrepancy due to minor vector allocations, so I added a margin to the check. Fortunately, in most cases, the validation passes. However, for external sorting with string views, the validation currently fails, so further investigation is needed.

@ding-young
Copy link
Contributor Author

ding-young commented Aug 8, 2025

How much is the difference for the fuzz tests I added that check memory constrained envs? as it only tests couple of simple columns that are easier to reason about.

I also did some additional debugging to understand why SortPreservingMergeStream ends up using more memory than the pre-reserved amount. The root cause I identified is as follows:

Except for slight discrepancies due to vector allocation overhead, I found several key sources of memory underestimation:

  1. In-memory stream not accounted for SPM:

When performing SPM (SortPreservingMerge) over both spill files and in-memory streams, we only reserve memory using
get_reserved_byte_for_record_batch_size(spill.max_record_batch_memory * buffer_len) * (per spill). However, if there are 2 spill streams and 1 in-memory stream, the reservation for the in-memory stream is not considered at all.

  1. Incorrect logic in get_reserved_byte_for_record_batch_size:

The estimation was based on a fixed 2× multiplier, without considering difference in sort key & sort payload columns and data type. In reality, this varies significantly and the current logic often under(/over)estimates. This is a known issue(#14748) and I’m actively working on it.

  1. SPM buffers at most 2 (buffer_len * (batch and cursor)) per stream:

Looking at the implementation, SPM can buffer both the previous and current (cursor, batch) for each stream simultaneously.
See

batches: Vec::with_capacity(stream_count * 2),
and
self.prev_cursors[stream_idx] = self.cursors[stream_idx].take();

That means, in the worst case, SPM can use up to 2 × get_reserved_byte_for_record_batch_size(spill.max_record_batch_memory * buffer_len) * (per stream) So I think we should double the reservation per stream to be safe.

@ding-young ding-young force-pushed the verify-mem-multi-level branch from 5ec8edd to 68cffc6 Compare August 8, 2025 05:31
@github-actions github-actions bot removed the core Core DataFusion crate label Aug 8, 2025
Copy link
Contributor

@2010YOUY01 2010YOUY01 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, this approach is a good idea.

I think this PR is ready, however I think we should work on fixing the failures first, then come back to this PR.

@@ -54,8 +54,13 @@ use futures::{FutureExt as _, Stream};
struct SpillReaderStream {
schema: SchemaRef,
state: SpillReaderStreamState,
/// how much memory the largest memory batch is taking
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend to also explain: this is used for validation, and link to the multi-level merge's doc for the background.

> max_record_batch_memory + MEMORY_MARGIN
{
return Poll::Ready(Some(Err(
DataFusionError::ResourcesExhausted(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use resources_err!(...) macro here

@2010YOUY01
Copy link
Contributor

How much is the difference for the fuzz tests I added that check memory constrained envs? as it only tests couple of simple columns that are easier to reason about.

I also did some additional debugging to understand why SortPreservingMergeStream ends up using more memory than the pre-reserved amount. The root cause I identified is as follows:

Except for slight discrepancies due to vector allocation overhead, I found several key sources of memory underestimation:

  1. In-memory stream not accounted for SPM:

When performing SPM (SortPreservingMerge) over both spill files and in-memory streams, we only reserve memory using get_reserved_byte_for_record_batch_size(spill.max_record_batch_memory * buffer_len) * (per spill). However, if there are 2 spill streams and 1 in-memory stream, the reservation for the in-memory stream is not considered at all.

  1. Incorrect logic in get_reserved_byte_for_record_batch_size:

The estimation was based on a fixed 2× multiplier, without considering difference in sort key & sort payload columns and data type. In reality, this varies significantly and the current logic often under(/over)estimates. This is a known issue(#14748) and I’m actively working on it.

  1. SPM buffers at most 2 (buffer_len * (batch and cursor)) per stream:

Looking at the implementation, SPM can buffer both the previous and current (cursor, batch) for each stream simultaneously. See

batches: Vec::with_capacity(stream_count * 2),

and

self.prev_cursors[stream_idx] = self.cursors[stream_idx].take();

That means, in the worst case, SPM can use up to 2 × get_reserved_byte_for_record_batch_size(spill.max_record_batch_memory * buffer_len) * (per stream) So I think we should double the reservation per stream to be safe.

For point 1: I vaguely remember in multi level merge, there is a logic to re-spill in-memory batches before the final merge, so that we don't have to special handlings for the mixed in-mem + spills case 🤔 If I’m not remembering it correctly, or we have missed some edges cases, we should do it (before the final merge, spill all in-mems first) for simplicity now.

For point 2: I was expecting this should better be done after #15380, but it seems this optimization got stuck, I'll look into this issue in the next few days.

@ding-young
Copy link
Contributor Author

For point 1: I vaguely remember in multi level merge, there is a logic to re-spill in-memory batches before the final merge, so that we don't have to special handlings for the mixed in-mem + spills case 🤔 If I’m not remembering it correctly, or we have missed some edges cases, we should do it (before the final merge, spill all in-mems first) for simplicity now.

After taking another look, it seems that the in-mem + spill case only happens in the first-round merge. After that, everything gets spilled. So while it's true that this case may use more memory than the reservation, it doesn't seem to be the major case, and I’ll hold off on addressing it for now.

For point 2: I was expecting this should better be done after #15380, but it seems this optimization got stuck, I'll look into this issue in the next few days.

I’ve opened a new PR to address it. Would appreciate it if you could take a look :)

Besides that, just as a side note: I’m currently looking into a failing test case in this PR (memory validation). It’s related to StringViewArray, and I’m digging into why get_array_memory_size and get_sliced_size are so different even after running gc() before spilling.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
execution Related to the execution crate physical-plan Changes to the physical-plan crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Validate the memory consumption in SortPreservingMergeStream
3 participants