Skip to content

Conversation

@Kontinuation
Copy link
Member

This patch adds a probe-side partitioned stream provider for repartitioning the probe side, and create probe streams for specified partitions. This partitioned stream provider is integrated into the spatial join execution flow to support larger-than-memory dataset by breaking the data into smaller partitions, where each partition could be fully loaded into memory.

Currently only non-knn joins are supported. We'll add larger-than-memory KNN join support in subsequent patches.

@Kontinuation Kontinuation requested a review from Copilot January 30, 2026 18:38
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR implements spatial partitioned stream processing for non-KNN spatial joins, enabling joins to handle larger-than-memory datasets by partitioning data into smaller chunks that can be loaded into memory. The main additions include a partitioned probe stream provider, updated stream state management for processing multiple partitions sequentially, and bitmap tracking for deduplicating probe rows across partitions in outer joins.

Changes:

  • Added probe-side partitioned stream infrastructure to repartition and spill probe data
  • Extended spatial join stream state machine to iterate through multiple spatial partitions
  • Implemented bitmap tracking for probe-side Multi partition to handle right outer joins correctly

Reviewed changes

Copilot reviewed 11 out of 12 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
rust/sedona-spatial-join/src/utils/join_utils.rs Added functions for tracking visited probe rows across partitions and adjusting indices with bitmap information
rust/sedona-spatial-join/src/stream.rs Extended state machine to process multiple spatial partitions sequentially with proper bitmap tracking
rust/sedona-spatial-join/src/probe/partitioned_stream_provider.rs New probe stream provider for repartitioning and spilling probe data to disk
rust/sedona-spatial-join/src/probe/non_partitioned_stream.rs Wrapper stream for non-partitioned probe processing with metrics
rust/sedona-spatial-join/src/probe/first_pass_stream.rs Stream implementation for first-pass processing that splits and spills probe data
rust/sedona-spatial-join/src/probe.rs Module definition with shared probe metrics structure
rust/sedona-spatial-join/src/prepare.rs Refactored to create probe stream options and spatial partitioners for probe side
rust/sedona-spatial-join/src/operand_evaluator.rs Minor style improvement in conditional logic
rust/sedona-spatial-join/src/lib.rs Added probe module to library exports
rust/sedona-spatial-join/src/index/partitioned_index_provider.rs Exposed wait_for_index method for public use
rust/sedona-spatial-join/src/exec.rs Updated to pass session config and runtime environment to stream constructor

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +67 to +76
pub(crate) fn need_probe_multi_partition_bitmap(join_type: JoinType) -> bool {
matches!(
join_type,
JoinType::Right
| JoinType::RightAnti
| JoinType::RightSemi
| JoinType::RightMark
| JoinType::Full
)
}
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function name need_probe_multi_partition_bitmap is somewhat ambiguous. Consider renaming to need_multi_partition_visited_bitmap or requires_probe_deduplication_bitmap to more clearly indicate that this bitmap is specifically for tracking visited/matched rows across multiple build partitions.

Copilot uses AI. Check for mistakes.
Comment on lines +287 to +298
fn regular(partition_id: u32) -> Self {
Self {
partition_id,
partition: SpatialPartition::Regular(partition_id),
}
}

fn multi(partition_id: u32) -> Self {
Self {
partition_id,
partition: SpatialPartition::Multi,
}
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PartitionDescriptor struct contains both partition_id and partition where partition can be Regular(partition_id). This creates redundancy and potential inconsistency. Consider deriving partition_id from partition when needed, or ensuring these fields are always synchronized.

Suggested change
fn regular(partition_id: u32) -> Self {
Self {
partition_id,
partition: SpatialPartition::Regular(partition_id),
}
}
fn multi(partition_id: u32) -> Self {
Self {
partition_id,
partition: SpatialPartition::Multi,
}
fn new(partition_id: u32, partition: SpatialPartition) -> Self {
if let SpatialPartition::Regular(inner_id) = partition {
debug_assert_eq!(
partition_id, inner_id,
"PartitionDescriptor: partition_id ({}) must match inner id in SpatialPartition::Regular ({})",
partition_id, inner_id
);
}
Self {
partition_id,
partition,
}
}
fn regular(partition_id: u32) -> Self {
Self::new(partition_id, SpatialPartition::Regular(partition_id))
}
fn multi(partition_id: u32) -> Self {
Self::new(partition_id, SpatialPartition::Multi)

Copilot uses AI. Check for mistakes.
Comment on lines +762 to +766
let visited_probe_side = if matches!(partition_desc.partition, SpatialPartition::Multi) {
self.visited_multi_probe_side.clone()
} else {
None
};
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Arc clone operation for visited_multi_probe_side happens on every batch iteration within the Multi partition. Consider storing this value once when entering the Multi partition state instead of cloning it for each batch.

Copilot uses AI. Check for mistakes.
Comment on lines +287 to +295
SpatialPartition::Multi => {
let spilled = self.slots.get_spilled_partition(partition)?;
Ok(Box::pin(
ExternalEvaluatedBatchStream::try_from_spill_files(
Arc::clone(&self.schema),
spilled.into_spill_files(),
)?,
))
}
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the Multi partition, get_spilled_partition is used instead of take_spilled_partition, which allows the Multi partition to be consumed multiple times (as shown in tests). However, calling into_spill_files() on a reference consumes the underlying data. This appears to work because get_spilled_partition likely returns an owned value, but the asymmetry with the Regular partition handling (which uses take_spilled_partition) could lead to confusion or bugs. Verify that this implementation correctly supports multiple consumptions of the Multi partition.

Copilot uses AI. Check for mistakes.
Comment on lines +96 to +98
callback
.call(Err(DataFusionError::Shared(err_arc.clone())))
.ok();
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error returned by callback.call() is silently discarded with .ok(). If the callback fails to propagate the error state, this could lead to silent failures. Consider logging the callback error or combining it with the original error.

Suggested change
callback
.call(Err(DataFusionError::Shared(err_arc.clone())))
.ok();
if let Err(callback_err) =
callback.call(Err(DataFusionError::Shared(err_arc.clone())))
{
eprintln!(
"FirstPassStream callback failed while handling error: {:?}",
callback_err
);
}

Copilot uses AI. Check for mistakes.
Comment on lines +304 to +313
let mut partition_bounds = Vec::with_capacity(num_partitions);
for k in 0..num_partitions {
let partition = SpatialPartition::Regular(k as u32);
let partition_bound = merged_spilled_partitions
.spilled_partition(partition)?
.bounding_box()
.cloned()
.unwrap_or(BoundingBox::empty());
partition_bounds.push(partition_bound);
}
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The loop calls spilled_partition() for each partition, which may involve lookups. Consider collecting all partition bounds in a single pass if the underlying data structure supports batch access.

Copilot uses AI. Check for mistakes.
Comment on lines +612 to +613
use arrow_array::Array;
use arrow_array::RecordBatch;
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Multiple imports from the same crate should be combined into a single use statement with braces for consistency and readability.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant