From f665255add2b498dcc92dd56f1bc2619ee8397cc Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Sun, 19 Oct 2025 16:49:22 +0300 Subject: [PATCH 01/21] perf: add optimized zip implementation for scalars This is useful for `IF THEN ELSE END` TODO: - [ ] Need to add comments if missing - [ ] Add benchmark --- arrow-select/src/zip.rs | 384 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 380 insertions(+), 4 deletions(-) diff --git a/arrow-select/src/zip.rs b/arrow-select/src/zip.rs index 2efd2e749921..961f0f068924 100644 --- a/arrow-select/src/zip.rs +++ b/arrow-select/src/zip.rs @@ -18,9 +18,20 @@ //! [`zip`]: Combine values from two arrays based on boolean mask use crate::filter::SlicesIterator; +use arrow_array::cast::AsArray; +use arrow_array::types::{BinaryType, ByteArrayType, LargeBinaryType, LargeUtf8Type, Utf8Type}; use arrow_array::*; +use arrow_buffer::{ + ArrowNativeType, BooleanBuffer, Buffer, MutableBuffer, NullBuffer, OffsetBuffer, ScalarBuffer, +}; +use arrow_data::ArrayData; use arrow_data::transform::MutableArrayData; -use arrow_schema::ArrowError; +use arrow_schema::{ArrowError, DataType}; +use std::fmt::{Debug, Formatter}; +use std::hash::Hash; +use std::marker::PhantomData; +use std::ops::{BitAnd, Not}; +use std::sync::Arc; /// Zip two arrays by some boolean mask. /// @@ -86,8 +97,16 @@ pub fn zip( truthy: &dyn Datum, falsy: &dyn Datum, ) -> Result { - let (truthy, truthy_is_scalar) = truthy.get(); - let (falsy, falsy_is_scalar) = falsy.get(); + let (truthy_array, truthy_is_scalar) = truthy.get(); + let (falsy_array, falsy_is_scalar) = falsy.get(); + + if falsy_is_scalar && truthy_is_scalar { + let zipper = ScalarZipper::try_new(truthy, falsy)?; + return zipper.zip_impl.create_output(mask); + } + + let truthy = truthy_array; + let falsy = falsy_array; if truthy.data_type() != falsy.data_type() { return Err(ArrowError::InvalidArgumentError( @@ -119,7 +138,17 @@ pub fn zip( let falsy = falsy.to_data(); let truthy = truthy.to_data(); - let mut mutable = MutableArrayData::new(vec![&truthy, &falsy], false, truthy.len()); + zip_impl(mask, &truthy, truthy_is_scalar, &falsy, falsy_is_scalar) +} + +fn zip_impl( + mask: &BooleanArray, + truthy: &ArrayData, + truthy_is_scalar: bool, + falsy: &ArrayData, + falsy_is_scalar: bool, +) -> Result { + let mut mutable = MutableArrayData::new(vec![truthy, falsy], false, truthy.len()); // the SlicesIterator slices only the true values. So the gaps left by this iterator we need to // fill with falsy values @@ -166,6 +195,353 @@ pub fn zip( Ok(make_array(data)) } +/// Zipper for 2 scalars +/// +/// Useful for using in `IF THEN ELSE END` expressions +/// +#[derive(Debug, Clone)] +pub struct ScalarZipper { + zip_impl: Arc, +} + +impl ScalarZipper { + pub fn try_new(truthy: &dyn Datum, falsy: &dyn Datum) -> Result { + let (truthy, truthy_is_scalar) = truthy.get(); + let (falsy, falsy_is_scalar) = falsy.get(); + + if truthy.data_type() != falsy.data_type() { + return Err(ArrowError::InvalidArgumentError( + "arguments need to have the same data type".into(), + )); + } + + if !truthy_is_scalar { + return Err(ArrowError::InvalidArgumentError( + "only scalar arrays are supported".into(), + )); + } + + if !falsy_is_scalar { + return Err(ArrowError::InvalidArgumentError( + "only scalar arrays are supported".into(), + )); + } + + if truthy.len() != 1 { + return Err(ArrowError::InvalidArgumentError( + "scalar arrays must have 1 element".into(), + )); + } + if falsy.len() != 1 { + return Err(ArrowError::InvalidArgumentError( + "scalar arrays must have 1 element".into(), + )); + } + + macro_rules! primitive_size_helper { + ($t:ty) => { + Arc::new(PrimitiveScalarImpl::<$t>::new(truthy, falsy)) as Arc + }; + } + + let zip_impl = downcast_primitive! { + truthy.data_type() => (primitive_size_helper), + DataType::Utf8 => { + Arc::new(BytesScalarImpl::::new(truthy, falsy)) as Arc + }, + DataType::LargeUtf8 => { + Arc::new(BytesScalarImpl::::new(truthy, falsy)) as Arc + }, + DataType::Binary => { + Arc::new(BytesScalarImpl::::new(truthy, falsy)) as Arc + }, + DataType::LargeBinary => { + Arc::new(BytesScalarImpl::::new(truthy, falsy)) as Arc + }, + _ => { + Arc::new(FallbackImpl::new(truthy, falsy)) as Arc + }, + }; + + Ok(Self { zip_impl }) + } +} + +/// Impl for creating output array based on input boolean array +trait ZipImpl: Debug { + /// Creating output array based on input boolean array + fn create_output(&self, input: &BooleanArray) -> Result; +} + +#[derive(Debug, PartialEq)] +struct FallbackImpl { + truthy: ArrayData, + falsy: ArrayData, +} + +impl FallbackImpl { + fn new(left: &dyn Array, right: &dyn Array) -> Self { + Self { + truthy: left.to_data(), + falsy: right.to_data(), + } + } +} + +impl ZipImpl for FallbackImpl { + fn create_output(&self, predicate: &BooleanArray) -> Result { + zip_impl(predicate, &self.truthy, false, &self.falsy, false) + } +} + +struct PrimitiveScalarImpl { + data_type: DataType, + truthy: Option, + falsy: Option, +} + +impl Debug for PrimitiveScalarImpl { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PrimitiveScalarImpl") + .field("data_type", &self.data_type) + .field("then_value", &self.truthy) + .field("else_value", &self.falsy) + .finish() + } +} + +impl PrimitiveScalarImpl { + fn new(then_value: &dyn Array, else_value: &dyn Array) -> Self { + Self { + data_type: then_value.data_type().clone(), + truthy: Self::get_value_from_scalar(then_value), + falsy: Self::get_value_from_scalar(else_value), + } + } + + fn get_value_from_scalar(scalar: &dyn Array) -> Option { + if scalar.is_null(0) { + None + } else { + let value = scalar.as_primitive::().value(0); + + Some(value) + } + } +} + +impl PrimitiveScalarImpl { + fn get_scalar_and_null_buffer_for_single_non_nullable( + predicate: BooleanBuffer, + value: T::Native, + ) -> (Vec, Option) { + let result_len = predicate.len(); + let nulls = NullBuffer::new(predicate); + let scalars = vec![value; result_len]; + + (scalars, Some(nulls)) + } +} + +impl ZipImpl for PrimitiveScalarImpl { + fn create_output(&self, predicate: &BooleanArray) -> Result { + let result_len = predicate.len(); + // Nulls are treated as false + let predicate = combine_nulls_and_false(predicate); + + let (scalars, nulls): (Vec, Option) = match (self.truthy, self.falsy) + { + (Some(then_val), Some(else_val)) => { + let scalars: Vec = predicate + .iter() + .map(|b| if b { then_val } else { else_val }) + .collect(); + + (scalars, None) + } + (Some(then_val), None) => { + // If a value is true we need the TRUTHY and the null buffer will have 1 (meaning not null) + // If a value is false we need the FALSY and the null buffer will have 0 (meaning null) + + Self::get_scalar_and_null_buffer_for_single_non_nullable(predicate, then_val) + } + (None, Some(else_val)) => { + // Flipping the boolean buffer as we want the opposite of the THEN case + // + // if the condition is true we want null so we need to NOT the value so we get 0 (meaning null) + // if the condition is false we want the FALSY value so we need to NOT the value so we get 1 (meaning not null) + let predicate = predicate.not(); + + Self::get_scalar_and_null_buffer_for_single_non_nullable(predicate, else_val) + } + (None, None) => { + // All values are null + let nulls = NullBuffer::new_null(result_len); + let scalars = vec![T::default_value(); result_len]; + + (scalars, Some(nulls)) + } + }; + + let scalars = ScalarBuffer::::from(scalars); + let output = PrimitiveArray::::try_new(scalars, nulls)?; + + // Keep decimal precisions, scales or timestamps timezones + let output = output.with_data_type(self.data_type.clone()); + + Ok(Arc::new(output)) + } +} + +#[derive(PartialEq, Hash)] +struct BytesScalarImpl { + truthy: Option>, + falsy: Option>, + phantom: PhantomData, +} + +impl Debug for BytesScalarImpl { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("BytesScalarImpl") + .field("then_value", &self.truthy) + .field("else_value", &self.falsy) + .finish() + } +} + +impl BytesScalarImpl { + fn new(then_value: &dyn Array, else_value: &dyn Array) -> Self { + Self { + truthy: Self::get_value_from_scalar(then_value), + falsy: Self::get_value_from_scalar(else_value), + phantom: PhantomData, + } + } + + fn get_value_from_scalar(scalar: &dyn Array) -> Option> { + if scalar.is_null(0) { + None + } else { + let bytes: &[u8] = scalar.as_bytes::().value(0).as_ref(); + + Some(bytes.to_vec()) + } + } + + fn get_scalar_and_null_buffer_for_single_non_nullable( + predicate: BooleanBuffer, + value: &[u8], + ) -> (Buffer, OffsetBuffer, Option) { + let value_length = value.len(); + let offsets = OffsetBuffer::::from_lengths( + predicate.iter().map(|b| if b { value_length } else { 0 }), + ); + + let length = offsets.last().map(|o| o.as_usize()).unwrap_or(0); + + let bytes_iter = predicate + .iter() + .flat_map(|b| if b { value } else { &[] }) + .copied(); + + let bytes = unsafe { + // Safety: the iterator is trusted length as we limit it to the known length + MutableBuffer::from_trusted_len_iter( + bytes_iter + // Limiting the bytes so the iterator will be trusted length + .take(length), + ) + }; + + // If a value is true we need the TRUTHY and the null buffer will have 1 (meaning not null) + // If a value is false we need the FALSY and the null buffer will have 0 (meaning null) + let nulls = NullBuffer::new(predicate); + + (bytes.into(), offsets, Some(nulls)) + } +} + +impl ZipImpl for BytesScalarImpl { + fn create_output(&self, predicate: &BooleanArray) -> Result { + let result_len = predicate.len(); + // Nulls are treated as false + let predicate = combine_nulls_and_false(predicate); + + let (bytes, offsets, nulls): (Buffer, OffsetBuffer, Option) = + match (self.truthy.as_deref(), self.falsy.as_deref()) { + (Some(then_val), Some(else_val)) => { + let then_length = then_val.len(); + let else_length = else_val.len(); + let offsets = OffsetBuffer::::from_lengths(predicate.iter().map( + |b| { + if b { then_length } else { else_length } + }, + )); + + let length = offsets.last().map(|o| o.as_usize()).unwrap_or(0); + + let bytes_iter = predicate + .iter() + .flat_map(|b| if b { then_val } else { else_val }) + .copied(); + + let bytes = unsafe { + // Safety: the iterator is trusted length as we limit it to the known length + MutableBuffer::from_trusted_len_iter( + bytes_iter + // Limiting the bytes so the iterator will be trusted length + .take(length), + ) + }; + + (bytes.into(), offsets, None) + } + (Some(then_val), None) => { + Self::get_scalar_and_null_buffer_for_single_non_nullable(predicate, then_val) + } + (None, Some(else_val)) => { + // Flipping the boolean buffer as we want the opposite of the THEN case + // + // if the condition is true we want null so we need to NOT the value so we get 0 (meaning null) + // if the condition is false we want the ELSE value so we need to NOT the value so we get 1 (meaning not null) + let predicate = predicate.not(); + Self::get_scalar_and_null_buffer_for_single_non_nullable(predicate, else_val) + } + (None, None) => { + // All values are null + let nulls = NullBuffer::new_null(result_len); + + ( + // Empty bytes + Buffer::from(&[]), + // All nulls so all lengths are 0 + OffsetBuffer::::from_lengths(std::iter::repeat_n(0, result_len)), + Some(nulls), + ) + } + }; + + let output = unsafe { + // Safety: the values are based on valid inputs + // and `try_new` is expensive for strings as it validate that the input is valid utf8 + GenericByteArray::::new_unchecked(offsets, Buffer::from(bytes), nulls) + }; + + Ok(Arc::new(output)) + } +} + +fn combine_nulls_and_false(predicate: &BooleanArray) -> BooleanBuffer { + if let Some(nulls) = predicate.nulls().filter(|n| n.null_count() > 0) { + predicate.values().bitand( + // nulls are represented as 0 (false) in the values buffer + nulls.inner(), + ) + } else { + predicate.values().clone() + } +} + #[cfg(test)] mod test { use super::*; From a0bbe7faaad6303355c5e9461f91a177e267861f Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Sun, 19 Oct 2025 23:46:35 +0300 Subject: [PATCH 02/21] improve long string performance by a lot (compared to my prev impl) --- arrow-buffer/src/buffer/mutable.rs | 69 ++++++++++++ arrow-select/src/filter.rs | 8 +- arrow-select/src/zip.rs | 174 +++++++++++++++++++++-------- 3 files changed, 206 insertions(+), 45 deletions(-) diff --git a/arrow-buffer/src/buffer/mutable.rs b/arrow-buffer/src/buffer/mutable.rs index 93d9d6b9ad84..8d5880cbdb64 100644 --- a/arrow-buffer/src/buffer/mutable.rs +++ b/arrow-buffer/src/buffer/mutable.rs @@ -222,6 +222,75 @@ impl MutableBuffer { } } + /// Creates a new [`MutableBuffer`] by repeating the contents of `slice_to_repeat` + /// `repeat_count` times. + pub fn new_repeated(repeat_count: usize, slice_to_repeat: &[T]) -> Self { + if slice_to_repeat.is_empty() || repeat_count == 0 { + return Self::new(0); + } + + // If we keep extending from ourself we will reach it pretty fast + let value_len = slice_to_repeat.len(); + let final_len = repeat_count * value_len; + let mut mutable = Self::with_capacity(final_len); + + mutable.push_slice_repeated(repeat_count, slice_to_repeat); + + mutable + } + + /// Adding to this mutable buffer `slice_to_repeat` repeated `repeat_count` times. + pub fn push_slice_repeated(&mut self, repeat_count: usize, slice_to_repeat: &[T]) { + if repeat_count == 0 || slice_to_repeat.is_empty() { + return; + } + + // Ensure capacity + let additional = repeat_count * mem::size_of_val(slice_to_repeat); + self.reserve(additional); + + // No need to special case small repeat counts + if repeat_count <= 3 { + for _ in 0..repeat_count { + self.extend_from_slice(slice_to_repeat); + } + + return; + } + + // If we keep extending from ourself we will reach it pretty fast + let value_len = slice_to_repeat.len(); + let final_len_to_repeat = repeat_count * value_len; + + let length_before = self.len; + + self.extend_from_slice(slice_to_repeat); + let mut added_repeats_length = mem::size_of_val(slice_to_repeat); + + // Copy in doubling steps to reduce number of copy calls + while added_repeats_length * 2 <= final_len_to_repeat { + unsafe { + let src = self.data.as_ptr().add(length_before) as *const u8; + let dst = self.data.as_ptr().add(self.len); + std::ptr::copy_nonoverlapping(src, dst, added_repeats_length) + } + self.len += added_repeats_length; + added_repeats_length *= 2; + } + + // Copy the rest of the required data in one go + let last_amount_to_copy = final_len_to_repeat - added_repeats_length; + assert!(last_amount_to_copy <= final_len_to_repeat, "the last copy should not overlap"); + + unsafe { + let src = self.data.as_ptr().add(length_before) as *const u8; + let dst = self.data.as_ptr().add(self.len); + + std::ptr::copy_nonoverlapping(src, dst, last_amount_to_copy) + } + self.len += last_amount_to_copy; + } + #[cold] fn reallocate(&mut self, capacity: usize) { let new_layout = Layout::from_size_align(capacity, self.layout.align()).unwrap(); diff --git a/arrow-select/src/filter.rs b/arrow-select/src/filter.rs index dace2bab728f..32c4be7d3e9d 100644 --- a/arrow-select/src/filter.rs +++ b/arrow-select/src/filter.rs @@ -58,7 +58,13 @@ pub struct SlicesIterator<'a>(BitSliceIterator<'a>); impl<'a> SlicesIterator<'a> { /// Creates a new iterator from a [BooleanArray] pub fn new(filter: &'a BooleanArray) -> Self { - Self(filter.values().set_slices()) + filter.values().into() + } +} + +impl<'a> From<&'a BooleanBuffer> for SlicesIterator<'a> { + fn from(buffer: &'a BooleanBuffer) -> Self { + Self(buffer.set_slices()) } } diff --git a/arrow-select/src/zip.rs b/arrow-select/src/zip.rs index 961f0f068924..2440930a415d 100644 --- a/arrow-select/src/zip.rs +++ b/arrow-select/src/zip.rs @@ -22,7 +22,8 @@ use arrow_array::cast::AsArray; use arrow_array::types::{BinaryType, ByteArrayType, LargeBinaryType, LargeUtf8Type, Utf8Type}; use arrow_array::*; use arrow_buffer::{ - ArrowNativeType, BooleanBuffer, Buffer, MutableBuffer, NullBuffer, OffsetBuffer, ScalarBuffer, + ArrowNativeType, BooleanBuffer, Buffer, MutableBuffer, NullBuffer, OffsetBuffer, + OffsetBufferBuilder, ScalarBuffer, }; use arrow_data::ArrayData; use arrow_data::transform::MutableArrayData; @@ -433,25 +434,29 @@ impl BytesScalarImpl { value: &[u8], ) -> (Buffer, OffsetBuffer, Option) { let value_length = value.len(); + + let number_of_true = predicate.count_set_bits(); + + // Fast path for all nulls + if number_of_true == 0 { + // All values are null + let nulls = NullBuffer::new_null(predicate.len()); + + return ( + // Empty bytes + Buffer::from(&[]), + // All nulls so all lengths are 0 + OffsetBuffer::::new_zeroed(predicate.len()), + Some(nulls), + ); + } + let offsets = OffsetBuffer::::from_lengths( predicate.iter().map(|b| if b { value_length } else { 0 }), ); - let length = offsets.last().map(|o| o.as_usize()).unwrap_or(0); - - let bytes_iter = predicate - .iter() - .flat_map(|b| if b { value } else { &[] }) - .copied(); - - let bytes = unsafe { - // Safety: the iterator is trusted length as we limit it to the known length - MutableBuffer::from_trusted_len_iter( - bytes_iter - // Limiting the bytes so the iterator will be trusted length - .take(length), - ) - }; + let bytes = MutableBuffer::new_repeated(number_of_true, value); + let bytes = Buffer::from(bytes); // If a value is true we need the TRUTHY and the null buffer will have 1 (meaning not null) // If a value is false we need the FALSY and the null buffer will have 0 (meaning null) @@ -459,6 +464,21 @@ impl BytesScalarImpl { (bytes.into(), offsets, Some(nulls)) } + + fn get_bytes_and_offset_for_all_same_value( + predicate: &BooleanBuffer, + value: &[u8], + ) -> (Buffer, OffsetBuffer) { + let value_length = value.len(); + + let offsets = + OffsetBuffer::::from_lengths(predicate.iter().map(|b| value_length)); + + let bytes = MutableBuffer::new_repeated(predicate.len(), value); + let bytes = Buffer::from(bytes); + + (bytes.into(), offsets) + } } impl ZipImpl for BytesScalarImpl { @@ -470,40 +490,19 @@ impl ZipImpl for BytesScalarImpl { let (bytes, offsets, nulls): (Buffer, OffsetBuffer, Option) = match (self.truthy.as_deref(), self.falsy.as_deref()) { (Some(then_val), Some(else_val)) => { - let then_length = then_val.len(); - let else_length = else_val.len(); - let offsets = OffsetBuffer::::from_lengths(predicate.iter().map( - |b| { - if b { then_length } else { else_length } - }, - )); - - let length = offsets.last().map(|o| o.as_usize()).unwrap_or(0); - - let bytes_iter = predicate - .iter() - .flat_map(|b| if b { then_val } else { else_val }) - .copied(); - - let bytes = unsafe { - // Safety: the iterator is trusted length as we limit it to the known length - MutableBuffer::from_trusted_len_iter( - bytes_iter - // Limiting the bytes so the iterator will be trusted length - .take(length), - ) - }; - - (bytes.into(), offsets, None) + let (bytes, offsets) = + Self::create_output_on_non_nulls(&predicate, then_val, else_val); + + (bytes, offsets, None) } (Some(then_val), None) => { Self::get_scalar_and_null_buffer_for_single_non_nullable(predicate, then_val) } (None, Some(else_val)) => { - // Flipping the boolean buffer as we want the opposite of the THEN case + // Flipping the boolean buffer as we want the opposite of the TRUE case // // if the condition is true we want null so we need to NOT the value so we get 0 (meaning null) - // if the condition is false we want the ELSE value so we need to NOT the value so we get 1 (meaning not null) + // if the condition is false we want the FALSE value so we need to NOT the value so we get 1 (meaning not null) let predicate = predicate.not(); Self::get_scalar_and_null_buffer_for_single_non_nullable(predicate, else_val) } @@ -515,7 +514,7 @@ impl ZipImpl for BytesScalarImpl { // Empty bytes Buffer::from(&[]), // All nulls so all lengths are 0 - OffsetBuffer::::from_lengths(std::iter::repeat_n(0, result_len)), + OffsetBuffer::::new_zeroed(predicate.len()), Some(nulls), ) } @@ -531,6 +530,93 @@ impl ZipImpl for BytesScalarImpl { } } +impl BytesScalarImpl { + fn create_output_on_non_nulls( + predicate: &BooleanBuffer, + then_val: &[u8], + else_val: &[u8], + ) -> (Buffer, OffsetBuffer<::Offset>) { + let true_count = predicate.count_set_bits(); + + match true_count { + 0 => { + // All values are falsy + + let (bytes, offsets) = + Self::get_bytes_and_offset_for_all_same_value(predicate, else_val); + + return (bytes, offsets); + } + n if n == predicate.len() => { + // All values are truthy + let (bytes, offsets) = + Self::get_bytes_and_offset_for_all_same_value(predicate, then_val); + + return (bytes, offsets); + } + + _ => { + // Fallback + } + } + + let total_number_of_bytes = + true_count * then_val.len() + (predicate.len() - true_count) * else_val.len(); + let mut mutable = MutableBuffer::with_capacity(total_number_of_bytes); + let mut offset_buffer_builder = OffsetBufferBuilder::::new(predicate.len()); + + // keep track of how much is filled + let mut filled = 0; + + let then_len = then_val.len(); + let else_len = else_val.len(); + + SlicesIterator::from(predicate).for_each(|(start, end)| { + // the gap needs to be filled with falsy values + if start > filled { + let false_repeat_count = start - filled; + // Push else value `repeat_count` times + mutable.push_slice_repeated( + false_repeat_count, + else_val, + ); + + for _ in 0..false_repeat_count { + offset_buffer_builder.push_length(else_len) + } + } + + let true_repeat_count = end - start; + // fill with truthy values + mutable.push_slice_repeated( + true_repeat_count, + then_val, + ); + + for _ in 0..true_repeat_count { + offset_buffer_builder.push_length(then_len) + } + filled = end; + }); + // the remaining part is falsy + if filled < predicate.len() { + let false_repeat_count = predicate.len() - filled; + // Copy the first item from the 'falsy' array into the output buffer. + mutable.push_slice_repeated( + false_repeat_count, + else_val, + ); + + for _ in 0..false_repeat_count { + offset_buffer_builder.push_length(else_len) + } + } + + + (mutable.into(), offset_buffer_builder.finish()) + } +} + fn combine_nulls_and_false(predicate: &BooleanArray) -> BooleanBuffer { if let Some(nulls) = predicate.nulls().filter(|n| n.null_count() > 0) { predicate.values().bitand( From f733a258648d89b7c13b710f15b80ac2314f2202 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Sun, 19 Oct 2025 23:46:49 +0300 Subject: [PATCH 03/21] format --- arrow-buffer/src/buffer/mutable.rs | 11 +++++++++-- arrow-select/src/zip.rs | 16 +++------------- 2 files changed, 12 insertions(+), 15 deletions(-) diff --git a/arrow-buffer/src/buffer/mutable.rs b/arrow-buffer/src/buffer/mutable.rs index 8d5880cbdb64..3225cb492bd1 100644 --- a/arrow-buffer/src/buffer/mutable.rs +++ b/arrow-buffer/src/buffer/mutable.rs @@ -240,7 +240,11 @@ impl MutableBuffer { } /// Adding to this mutable buffer `slice_to_repeat` repeated `repeat_count` times. - pub fn push_slice_repeated(&mut self, repeat_count: usize, slice_to_repeat: &[T]) { + pub fn push_slice_repeated( + &mut self, + repeat_count: usize, + slice_to_repeat: &[T], + ) { if repeat_count == 0 || slice_to_repeat.is_empty() { return; } @@ -280,7 +284,10 @@ impl MutableBuffer { // Copy the rest of the required data in one go let last_amount_to_copy = final_len_to_repeat - added_repeats_length; - assert!(last_amount_to_copy <= final_len_to_repeat, "the last copy should not overlap"); + assert!( + last_amount_to_copy <= final_len_to_repeat, + "the last copy should not overlap" + ); unsafe { let src = self.data.as_ptr().add(length_before) as *const u8; diff --git a/arrow-select/src/zip.rs b/arrow-select/src/zip.rs index 2440930a415d..7668b5802b3e 100644 --- a/arrow-select/src/zip.rs +++ b/arrow-select/src/zip.rs @@ -576,10 +576,7 @@ impl BytesScalarImpl { if start > filled { let false_repeat_count = start - filled; // Push else value `repeat_count` times - mutable.push_slice_repeated( - false_repeat_count, - else_val, - ); + mutable.push_slice_repeated(false_repeat_count, else_val); for _ in 0..false_repeat_count { offset_buffer_builder.push_length(else_len) @@ -588,10 +585,7 @@ impl BytesScalarImpl { let true_repeat_count = end - start; // fill with truthy values - mutable.push_slice_repeated( - true_repeat_count, - then_val, - ); + mutable.push_slice_repeated(true_repeat_count, then_val); for _ in 0..true_repeat_count { offset_buffer_builder.push_length(then_len) @@ -602,17 +596,13 @@ impl BytesScalarImpl { if filled < predicate.len() { let false_repeat_count = predicate.len() - filled; // Copy the first item from the 'falsy' array into the output buffer. - mutable.push_slice_repeated( - false_repeat_count, - else_val, - ); + mutable.push_slice_repeated(false_repeat_count, else_val); for _ in 0..false_repeat_count { offset_buffer_builder.push_length(else_len) } } - (mutable.into(), offset_buffer_builder.finish()) } } From 98aa6bf089814ede72a64c90f423992b7ca0c599 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Sun, 19 Oct 2025 23:53:44 +0300 Subject: [PATCH 04/21] fix lint and format --- arrow-select/src/zip.rs | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/arrow-select/src/zip.rs b/arrow-select/src/zip.rs index 7668b5802b3e..30191d753857 100644 --- a/arrow-select/src/zip.rs +++ b/arrow-select/src/zip.rs @@ -22,8 +22,8 @@ use arrow_array::cast::AsArray; use arrow_array::types::{BinaryType, ByteArrayType, LargeBinaryType, LargeUtf8Type, Utf8Type}; use arrow_array::*; use arrow_buffer::{ - ArrowNativeType, BooleanBuffer, Buffer, MutableBuffer, NullBuffer, OffsetBuffer, - OffsetBufferBuilder, ScalarBuffer, + BooleanBuffer, Buffer, MutableBuffer, NullBuffer, OffsetBuffer, OffsetBufferBuilder, + ScalarBuffer, }; use arrow_data::ArrayData; use arrow_data::transform::MutableArrayData; @@ -206,6 +206,13 @@ pub struct ScalarZipper { } impl ScalarZipper { + /// Try to create a new ScalarZipper from two scalar Datum + /// + /// # Errors + /// returns error if: + /// - the two Datum have different data types + /// - either Datum is not a scalar (or has more than 1 element) + /// pub fn try_new(truthy: &dyn Datum, falsy: &dyn Datum) -> Result { let (truthy, truthy_is_scalar) = truthy.get(); let (falsy, falsy_is_scalar) = falsy.get(); @@ -462,7 +469,7 @@ impl BytesScalarImpl { // If a value is false we need the FALSY and the null buffer will have 0 (meaning null) let nulls = NullBuffer::new(predicate); - (bytes.into(), offsets, Some(nulls)) + (bytes, offsets, Some(nulls)) } fn get_bytes_and_offset_for_all_same_value( @@ -471,13 +478,15 @@ impl BytesScalarImpl { ) -> (Buffer, OffsetBuffer) { let value_length = value.len(); - let offsets = - OffsetBuffer::::from_lengths(predicate.iter().map(|b| value_length)); + let offsets = OffsetBuffer::::from_lengths(std::iter::repeat_n( + value_length, + predicate.len(), + )); let bytes = MutableBuffer::new_repeated(predicate.len(), value); let bytes = Buffer::from(bytes); - (bytes.into(), offsets) + (bytes, offsets) } } @@ -523,7 +532,7 @@ impl ZipImpl for BytesScalarImpl { let output = unsafe { // Safety: the values are based on valid inputs // and `try_new` is expensive for strings as it validate that the input is valid utf8 - GenericByteArray::::new_unchecked(offsets, Buffer::from(bytes), nulls) + GenericByteArray::::new_unchecked(offsets, bytes, nulls) }; Ok(Arc::new(output)) From 7ccb30d47ad344cdbd801f9f0b58bb5adab3e492 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Mon, 20 Oct 2025 00:03:38 +0300 Subject: [PATCH 05/21] update comments --- arrow-select/src/zip.rs | 76 ++++++++++++++++++++--------------------- 1 file changed, 38 insertions(+), 38 deletions(-) diff --git a/arrow-select/src/zip.rs b/arrow-select/src/zip.rs index 30191d753857..54626f790eea 100644 --- a/arrow-select/src/zip.rs +++ b/arrow-select/src/zip.rs @@ -312,18 +312,18 @@ impl Debug for PrimitiveScalarImpl { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_struct("PrimitiveScalarImpl") .field("data_type", &self.data_type) - .field("then_value", &self.truthy) - .field("else_value", &self.falsy) + .field("truthy", &self.truthy) + .field("falsy", &self.falsy) .finish() } } impl PrimitiveScalarImpl { - fn new(then_value: &dyn Array, else_value: &dyn Array) -> Self { + fn new(truthy: &dyn Array, falsy: &dyn Array) -> Self { Self { - data_type: then_value.data_type().clone(), - truthy: Self::get_value_from_scalar(then_value), - falsy: Self::get_value_from_scalar(else_value), + data_type: truthy.data_type().clone(), + truthy: Self::get_value_from_scalar(truthy), + falsy: Self::get_value_from_scalar(falsy), } } @@ -359,28 +359,28 @@ impl ZipImpl for PrimitiveScalarImpl { let (scalars, nulls): (Vec, Option) = match (self.truthy, self.falsy) { - (Some(then_val), Some(else_val)) => { + (Some(truthy_val), Some(falsy_val)) => { let scalars: Vec = predicate .iter() - .map(|b| if b { then_val } else { else_val }) + .map(|b| if b { truthy_val } else { falsy_val }) .collect(); (scalars, None) } - (Some(then_val), None) => { + (Some(truthy_val), None) => { // If a value is true we need the TRUTHY and the null buffer will have 1 (meaning not null) // If a value is false we need the FALSY and the null buffer will have 0 (meaning null) - Self::get_scalar_and_null_buffer_for_single_non_nullable(predicate, then_val) + Self::get_scalar_and_null_buffer_for_single_non_nullable(predicate, truthy_val) } - (None, Some(else_val)) => { - // Flipping the boolean buffer as we want the opposite of the THEN case + (None, Some(falsy_val)) => { + // Flipping the boolean buffer as we want the opposite of the TRUE case // // if the condition is true we want null so we need to NOT the value so we get 0 (meaning null) // if the condition is false we want the FALSY value so we need to NOT the value so we get 1 (meaning not null) let predicate = predicate.not(); - Self::get_scalar_and_null_buffer_for_single_non_nullable(predicate, else_val) + Self::get_scalar_and_null_buffer_for_single_non_nullable(predicate, falsy_val) } (None, None) => { // All values are null @@ -411,17 +411,17 @@ struct BytesScalarImpl { impl Debug for BytesScalarImpl { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_struct("BytesScalarImpl") - .field("then_value", &self.truthy) - .field("else_value", &self.falsy) + .field("truthy", &self.truthy) + .field("falsy", &self.falsy) .finish() } } impl BytesScalarImpl { - fn new(then_value: &dyn Array, else_value: &dyn Array) -> Self { + fn new(truthy_value: &dyn Array, falsy_value: &dyn Array) -> Self { Self { - truthy: Self::get_value_from_scalar(then_value), - falsy: Self::get_value_from_scalar(else_value), + truthy: Self::get_value_from_scalar(truthy_value), + falsy: Self::get_value_from_scalar(falsy_value), phantom: PhantomData, } } @@ -498,22 +498,22 @@ impl ZipImpl for BytesScalarImpl { let (bytes, offsets, nulls): (Buffer, OffsetBuffer, Option) = match (self.truthy.as_deref(), self.falsy.as_deref()) { - (Some(then_val), Some(else_val)) => { + (Some(truthy_val), Some(falsy_val)) => { let (bytes, offsets) = - Self::create_output_on_non_nulls(&predicate, then_val, else_val); + Self::create_output_on_non_nulls(&predicate, truthy_val, falsy_val); (bytes, offsets, None) } - (Some(then_val), None) => { - Self::get_scalar_and_null_buffer_for_single_non_nullable(predicate, then_val) + (Some(truthy_val), None) => { + Self::get_scalar_and_null_buffer_for_single_non_nullable(predicate, truthy_val) } - (None, Some(else_val)) => { + (None, Some(falsy_val)) => { // Flipping the boolean buffer as we want the opposite of the TRUE case // // if the condition is true we want null so we need to NOT the value so we get 0 (meaning null) // if the condition is false we want the FALSE value so we need to NOT the value so we get 1 (meaning not null) let predicate = predicate.not(); - Self::get_scalar_and_null_buffer_for_single_non_nullable(predicate, else_val) + Self::get_scalar_and_null_buffer_for_single_non_nullable(predicate, falsy_val) } (None, None) => { // All values are null @@ -542,8 +542,8 @@ impl ZipImpl for BytesScalarImpl { impl BytesScalarImpl { fn create_output_on_non_nulls( predicate: &BooleanBuffer, - then_val: &[u8], - else_val: &[u8], + truthy_val: &[u8], + falsy_val: &[u8], ) -> (Buffer, OffsetBuffer<::Offset>) { let true_count = predicate.count_set_bits(); @@ -552,14 +552,14 @@ impl BytesScalarImpl { // All values are falsy let (bytes, offsets) = - Self::get_bytes_and_offset_for_all_same_value(predicate, else_val); + Self::get_bytes_and_offset_for_all_same_value(predicate, falsy_val); return (bytes, offsets); } n if n == predicate.len() => { // All values are truthy let (bytes, offsets) = - Self::get_bytes_and_offset_for_all_same_value(predicate, then_val); + Self::get_bytes_and_offset_for_all_same_value(predicate, truthy_val); return (bytes, offsets); } @@ -570,34 +570,34 @@ impl BytesScalarImpl { } let total_number_of_bytes = - true_count * then_val.len() + (predicate.len() - true_count) * else_val.len(); + true_count * truthy_val.len() + (predicate.len() - true_count) * falsy_val.len(); let mut mutable = MutableBuffer::with_capacity(total_number_of_bytes); let mut offset_buffer_builder = OffsetBufferBuilder::::new(predicate.len()); // keep track of how much is filled let mut filled = 0; - let then_len = then_val.len(); - let else_len = else_val.len(); + let truthy_len = truthy_val.len(); + let falsy_len = falsy_val.len(); SlicesIterator::from(predicate).for_each(|(start, end)| { // the gap needs to be filled with falsy values if start > filled { let false_repeat_count = start - filled; - // Push else value `repeat_count` times - mutable.push_slice_repeated(false_repeat_count, else_val); + // Push false value `repeat_count` times + mutable.push_slice_repeated(false_repeat_count, falsy_val); for _ in 0..false_repeat_count { - offset_buffer_builder.push_length(else_len) + offset_buffer_builder.push_length(falsy_len) } } let true_repeat_count = end - start; // fill with truthy values - mutable.push_slice_repeated(true_repeat_count, then_val); + mutable.push_slice_repeated(true_repeat_count, truthy_val); for _ in 0..true_repeat_count { - offset_buffer_builder.push_length(then_len) + offset_buffer_builder.push_length(truthy_len) } filled = end; }); @@ -605,10 +605,10 @@ impl BytesScalarImpl { if filled < predicate.len() { let false_repeat_count = predicate.len() - filled; // Copy the first item from the 'falsy' array into the output buffer. - mutable.push_slice_repeated(false_repeat_count, else_val); + mutable.push_slice_repeated(false_repeat_count, falsy_val); for _ in 0..false_repeat_count { - offset_buffer_builder.push_length(else_len) + offset_buffer_builder.push_length(falsy_len) } } From 5ac98798f22f80c0bc90a9139e2b46fe3d1d098f Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Mon, 20 Oct 2025 00:41:20 +0300 Subject: [PATCH 06/21] perf: add optimized function to create offset with same length --- arrow-array/src/array/list_array.rs | 2 +- arrow-buffer/src/buffer/offset.rs | 60 +++++++++++++++++++++++++++++ 2 files changed, 61 insertions(+), 1 deletion(-) diff --git a/arrow-array/src/array/list_array.rs b/arrow-array/src/array/list_array.rs index ec7983c303f0..55a3a4c0d678 100644 --- a/arrow-array/src/array/list_array.rs +++ b/arrow-array/src/array/list_array.rs @@ -466,7 +466,7 @@ impl From for GenericListArray< _ => unreachable!(), }; - let offsets = OffsetBuffer::from_lengths(std::iter::repeat_n(size, value.len())); + let offsets = OffsetBuffer::from_repeated_length(size, value.len()); Self { data_type: Self::DATA_TYPE_CONSTRUCTOR(field.clone()), diff --git a/arrow-buffer/src/buffer/offset.rs b/arrow-buffer/src/buffer/offset.rs index fe3a57a38248..de10e8993b8a 100644 --- a/arrow-buffer/src/buffer/offset.rs +++ b/arrow-buffer/src/buffer/offset.rs @@ -112,6 +112,9 @@ impl OffsetBuffer { /// assert_eq!(offsets.as_ref(), &[0, 1, 4, 9]); /// ``` /// + /// If you want to create an [`OffsetBuffer`] where each slice has the same length, + /// consider using the faster [`OffsetBuffer::from_repeated_length`] instead. + /// /// # Panics /// /// Panics on overflow @@ -133,6 +136,39 @@ impl OffsetBuffer { Self(out.into()) } + /// Create a new [`OffsetBuffer`] where each slice has the same length + /// `length`, repeated `n` times. + /// + /// + /// Example + /// ``` + /// # use arrow_buffer::OffsetBuffer; + /// let offsets = OffsetBuffer::::from_repeated_length(4, 3); + /// assert_eq!(offsets.as_ref(), &[0, 4, 8, 12]); + /// ``` + /// + /// # Panics + /// + /// Panics on overflow + pub fn from_repeated_length(length: usize, n: usize) -> Self { + if n == 0 { + return Self::new_empty(); + } + + if length == 0 { + return Self::new_zeroed(n); + } + + // Check for overflow + O::from_usize(length * n).expect("offset overflow"); + + let offsets = (0..=n) + .map(|index| O::usize_as(index * length)) + .collect::>(); + + Self(ScalarBuffer::from(offsets)) + } + /// Get an Iterator over the lengths of this [`OffsetBuffer`] /// /// ``` @@ -283,6 +319,30 @@ mod tests { OffsetBuffer::::from_lengths([usize::MAX, 1]); } + #[test] + #[should_panic(expected = "offset overflow")] + fn from_repeated_lengths_offset_length_overflow() { + OffsetBuffer::::from_repeated_length(i32::MAX as usize, 1); + } + + #[test] + #[should_panic(expected = "offset overflow")] + fn from_repeated_lengths_offset_repeat_overflow() { + OffsetBuffer::::from_repeated_length(1, i32::MAX as usize); + } + + #[test] + #[should_panic(expected = "usize overflow")] + fn from_repeated_lengths_usize_length_overflow() { + OffsetBuffer::::from_repeated_length(usize::MAX, 1); + } + + #[test] + #[should_panic(expected = "usize overflow")] + fn from_repeated_lengths_usize_repeat_overflow() { + OffsetBuffer::::from_repeated_length(1, usize::MAX); + } + #[test] fn get_lengths() { let offsets = OffsetBuffer::::new(ScalarBuffer::::from(vec![0, 1, 4, 9])); From e172f2e8d8939fe36b3b7739a2608f5a4d07a254 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Mon, 20 Oct 2025 00:58:54 +0300 Subject: [PATCH 07/21] add tests --- arrow-buffer/src/buffer/offset.rs | 88 +++++++++++++++++++++++++++++-- 1 file changed, 85 insertions(+), 3 deletions(-) diff --git a/arrow-buffer/src/buffer/offset.rs b/arrow-buffer/src/buffer/offset.rs index de10e8993b8a..307f027f681c 100644 --- a/arrow-buffer/src/buffer/offset.rs +++ b/arrow-buffer/src/buffer/offset.rs @@ -159,6 +159,10 @@ impl OffsetBuffer { return Self::new_zeroed(n); } + // Check for overflow + // Making sure we don't overflow usize or O when calculating the total length + length.checked_mul(n).expect("usize overflow"); + // Check for overflow O::from_usize(length * n).expect("offset overflow"); @@ -322,23 +326,29 @@ mod tests { #[test] #[should_panic(expected = "offset overflow")] fn from_repeated_lengths_offset_length_overflow() { - OffsetBuffer::::from_repeated_length(i32::MAX as usize, 1); + OffsetBuffer::::from_repeated_length(i32::MAX as usize / 4, 5); } #[test] #[should_panic(expected = "offset overflow")] fn from_repeated_lengths_offset_repeat_overflow() { - OffsetBuffer::::from_repeated_length(1, i32::MAX as usize); + OffsetBuffer::::from_repeated_length(1, i32::MAX as usize + 1); } #[test] - #[should_panic(expected = "usize overflow")] + #[should_panic(expected = "offset overflow")] fn from_repeated_lengths_usize_length_overflow() { OffsetBuffer::::from_repeated_length(usize::MAX, 1); } #[test] #[should_panic(expected = "usize overflow")] + fn from_repeated_lengths_usize_length_usize_overflow() { + OffsetBuffer::::from_repeated_length(usize::MAX, 2); + } + + #[test] + #[should_panic(expected = "offset overflow")] fn from_repeated_lengths_usize_repeat_overflow() { OffsetBuffer::::from_repeated_length(1, usize::MAX); } @@ -383,4 +393,76 @@ mod tests { let default = OffsetBuffer::::default(); assert_eq!(default.as_ref(), &[0]); } + + #[test] + fn from_repeated_length_basic() { + // Basic case with length 4, repeated 3 times + let buffer = OffsetBuffer::::from_repeated_length(4, 3); + assert_eq!(buffer.as_ref(), &[0, 4, 8, 12]); + + // Verify the lengths are correct + let lengths: Vec = buffer.lengths().collect(); + assert_eq!(lengths, vec![4, 4, 4]); + } + + #[test] + fn from_repeated_length_single_repeat() { + // Length 5, repeated once + let buffer = OffsetBuffer::::from_repeated_length(5, 1); + assert_eq!(buffer.as_ref(), &[0, 5]); + + let lengths: Vec = buffer.lengths().collect(); + assert_eq!(lengths, vec![5]); + } + + #[test] + fn from_repeated_length_zero_repeats() { + let buffer = OffsetBuffer::::from_repeated_length(10, 0); + assert_eq!(buffer, OffsetBuffer::::new_empty()); + } + + #[test] + fn from_repeated_length_zero_length() { + // Zero length, repeated 5 times (all zeros) + let buffer = OffsetBuffer::::from_repeated_length(0, 5); + assert_eq!(buffer.as_ref(), &[0, 0, 0, 0, 0, 0]); + + // All lengths should be 0 + let lengths: Vec = buffer.lengths().collect(); + assert_eq!(lengths, vec![0, 0, 0, 0, 0]); + } + + #[test] + fn from_repeated_length_large_values() { + // Test with larger values that don't overflow + let buffer = OffsetBuffer::::from_repeated_length(1000, 100); + assert_eq!(buffer[0], 0); + + // Verify all lengths are 1000 + let lengths: Vec = buffer.lengths().collect(); + assert_eq!(lengths.len(), 100); + assert!(lengths.iter().all(|&len| len == 1000)); + } + + #[test] + fn from_repeated_length_unit_length() { + // Length 1, repeated multiple times + let buffer = OffsetBuffer::::from_repeated_length(1, 10); + assert_eq!(buffer.as_ref(), &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); + + let lengths: Vec = buffer.lengths().collect(); + assert_eq!(lengths, vec![1; 10]); + } + + #[test] + fn from_repeated_length_max_safe_values() { + // Test with maximum safe values for i32 + // i32::MAX / 3 ensures we don't overflow when repeated twice + let third_max = (i32::MAX / 3) as usize; + let buffer = OffsetBuffer::::from_repeated_length(third_max, 2); + assert_eq!( + buffer.as_ref(), + &[0, third_max as i32, (third_max * 2) as i32] + ); + } } From 3317a39583b0b738ef882e5b4395f44d6e541880 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Mon, 20 Oct 2025 01:00:11 +0300 Subject: [PATCH 08/21] updated comment --- arrow-buffer/src/buffer/offset.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arrow-buffer/src/buffer/offset.rs b/arrow-buffer/src/buffer/offset.rs index 307f027f681c..66fa7dd22ec5 100644 --- a/arrow-buffer/src/buffer/offset.rs +++ b/arrow-buffer/src/buffer/offset.rs @@ -112,7 +112,7 @@ impl OffsetBuffer { /// assert_eq!(offsets.as_ref(), &[0, 1, 4, 9]); /// ``` /// - /// If you want to create an [`OffsetBuffer`] where each slice has the same length, + /// If you want to create an [`OffsetBuffer`] where all lengths are the same, /// consider using the faster [`OffsetBuffer::from_repeated_length`] instead. /// /// # Panics From 5f01d05c1cacfc5fb80edb14ae85b0182a28e3a3 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Mon, 20 Oct 2025 09:49:52 +0300 Subject: [PATCH 09/21] perf: add `repeat_slice_n_times` to `MutableBuffer` this will be used in: - https://github.com/apache/arrow-rs/pull/8653 --- arrow-buffer/src/buffer/mutable.rs | 219 +++++++++++++++++++++++++++++ 1 file changed, 219 insertions(+) diff --git a/arrow-buffer/src/buffer/mutable.rs b/arrow-buffer/src/buffer/mutable.rs index 93d9d6b9ad84..5239b7e46736 100644 --- a/arrow-buffer/src/buffer/mutable.rs +++ b/arrow-buffer/src/buffer/mutable.rs @@ -222,6 +222,104 @@ impl MutableBuffer { } } + /// Adding to this mutable buffer `slice_to_repeat` repeated `repeat_count` times. + /// + /// # Example + /// + /// ## Repeat the same string bytes multiple times + /// ``` + /// # use arrow_buffer::buffer::MutableBuffer; + /// let mut buffer = MutableBuffer::new(0); + /// let bytes_to_repeat = b"ab"; + /// buffer.repeat_slice_n_times(bytes_to_repeat, 3); + /// assert_eq!(buffer.as_slice(), b"ababab"); + /// ``` + pub fn repeat_slice_n_times( + &mut self, + slice_to_repeat: &[T], + repeat_count: usize, + ) { + if repeat_count == 0 || slice_to_repeat.is_empty() { + return; + } + + let bytes_to_repeat = size_of_val(slice_to_repeat); + + // Ensure capacity + self.reserve(repeat_count * bytes_to_repeat); + + // For smaller number of repeats, just extend directly as the overhead of the + // doubling strategy is not worth it + if repeat_count <= 3 { + for _ in 0..repeat_count { + self.extend_from_slice(slice_to_repeat); + } + + return; + } + + // We will use doubling strategy to fill the buffer in log(repeat_count) steps + + // If we keep extending from ourself we will reach it pretty fast + let final_len_to_repeat = repeat_count * bytes_to_repeat; + + // Save the length before we do all the copies to know where to start from + let length_before = self.len; + + // Copy the initial slice once + self.extend_from_slice(slice_to_repeat); + + // This tracks how much bytes we have added by repeating so far + let mut added_repeats_length = bytes_to_repeat; + assert_eq!( + self.len - length_before, + added_repeats_length, + "should copy exactly the same number of bytes" + ); + + // Copy in doubling steps to make the number of calls logarithmic and fast (as we copy large chunks at a time) + while added_repeats_length * 2 <= final_len_to_repeat { + unsafe { + // Get to the start of the data before we started copying anything + let src = self.data.as_ptr().add(length_before) as *const u8; + + // Go to the current location to copy to (end of current data) + let dst = self.data.as_ptr().add(self.len); + + // SAFETY: the pointers are not overlapping as there is `added_repeats_length` exactly + // between them + std::ptr::copy_nonoverlapping(src, dst, added_repeats_length) + } + + // Advance the length by the amount of data we just copied (doubled) + self.len += added_repeats_length; + + // Double the amount of data we have added so far + added_repeats_length *= 2; + } + + // Handle the remainder in single copy. + + // the amount left to copy is guaranteed to be less than what we have already copied + let last_amount_to_copy = final_len_to_repeat - added_repeats_length; + assert!( + last_amount_to_copy <= final_len_to_repeat, + "the last copy should not overlap" + ); + + if last_amount_to_copy == 0 { + return; + } + + unsafe { + let src = self.data.as_ptr().add(length_before) as *const u8; + let dst = self.data.as_ptr().add(self.len); + + std::ptr::copy_nonoverlapping(src, dst, last_amount_to_copy) + } + self.len += last_amount_to_copy; + } + #[cold] fn reallocate(&mut self, capacity: usize) { let new_layout = Layout::from_size_align(capacity, self.layout.align()).unwrap(); @@ -1184,4 +1282,125 @@ mod tests { assert_eq!(pool.used(), 0); } } + + fn create_expected_repeated_slice( + slice_to_repeat: &[T], + repeat_count: usize, + ) -> Buffer { + let mut expected = MutableBuffer::new(size_of_val(slice_to_repeat) * repeat_count); + for _ in 0..repeat_count { + // Not using push_slice_repeated as this is the function under test + expected.extend_from_slice(slice_to_repeat); + } + expected.into() + } + + // Helper to test a specific repeat count with various slice sizes + fn test_repeat_count( + repeat_count: usize, + test_data: &[T], + ) { + let mut buffer = MutableBuffer::new(0); + buffer.repeat_slice_n_times(test_data, repeat_count); + + let expected = create_expected_repeated_slice(test_data, repeat_count); + let result: Buffer = buffer.into(); + + assert_eq!( + result, + expected, + "Failed for repeat_count={}, slice_len={}", + repeat_count, + test_data.len() + ); + } + + #[test] + fn test_repeat_slice_count_edge_cases() { + // Empty slice + test_repeat_count(100, &[] as &[i32]); + + // Zero repeats + test_repeat_count(0, &[1i32, 2, 3]); + } + + #[test] + fn test_small_repeats_counts() { + // test any special implementation for small repeat counts + let data = &[1u8, 2, 3, 4, 5]; + + for _ in 1..=10 { + test_repeat_count(2, data); + } + } + + #[test] + fn test_different_size_of_i32_repeat_slice() { + let data: &[i32] = &[1, 2, 3]; + let data_with_single_item: &[i32] = &[42]; + + for data in &[data, data_with_single_item] { + for item in 1..=9 { + let base_repeat_count = 2_usize.pow(item); + test_repeat_count(base_repeat_count - 1, data); + test_repeat_count(base_repeat_count, data); + test_repeat_count(base_repeat_count + 1, data); + } + } + } + + #[test] + fn test_different_size_of_u8_repeat_slice() { + let data: &[u8] = &[1, 2, 3]; + let data_with_single_item: &[u8] = &[10]; + + for data in &[data, data_with_single_item] { + for item in 1..=9 { + let base_repeat_count = 2_usize.pow(item); + test_repeat_count(base_repeat_count - 1, data); + test_repeat_count(base_repeat_count, data); + test_repeat_count(base_repeat_count + 1, data); + } + } + } + + #[test] + fn test_different_size_of_u16_repeat_slice() { + let data: &[u16] = &[1, 2, 3]; + let data_with_single_item: &[u16] = &[10]; + + for data in &[data, data_with_single_item] { + for item in 1..=9 { + let base_repeat_count = 2_usize.pow(item); + test_repeat_count(base_repeat_count - 1, data); + test_repeat_count(base_repeat_count, data); + test_repeat_count(base_repeat_count + 1, data); + } + } + } + + #[test] + fn test_various_slice_lengths() { + // Test different slice lengths with same repeat pattern + let repeat_count = 37; // Arbitrary non-power-of-2 + + // Single element + test_repeat_count(repeat_count, &[42i32]); + + // Small slices + test_repeat_count(repeat_count, &[1i32, 2]); + test_repeat_count(repeat_count, &[1i32, 2, 3]); + test_repeat_count(repeat_count, &[1i32, 2, 3, 4]); + test_repeat_count(repeat_count, &[1i32, 2, 3, 4, 5]); + + // Larger slices + let data_10: Vec = (0..10).collect(); + test_repeat_count(repeat_count, &data_10); + + let data_100: Vec = (0..100).collect(); + test_repeat_count(repeat_count, &data_100); + + let data_1000: Vec = (0..1000).collect(); + test_repeat_count(repeat_count, &data_1000); + } } From 694eb722c5ee06327008c1d40d3f573253fcd0e3 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Mon, 20 Oct 2025 14:08:01 +0300 Subject: [PATCH 10/21] updated with #8658 changes --- arrow-select/src/zip.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/arrow-select/src/zip.rs b/arrow-select/src/zip.rs index 54626f790eea..2694d7634cf5 100644 --- a/arrow-select/src/zip.rs +++ b/arrow-select/src/zip.rs @@ -462,7 +462,9 @@ impl BytesScalarImpl { predicate.iter().map(|b| if b { value_length } else { 0 }), ); - let bytes = MutableBuffer::new_repeated(number_of_true, value); + let mut bytes = MutableBuffer::with_capacity(0); + bytes.repeat_slice_n_times(value, number_of_true); + let bytes = Buffer::from(bytes); // If a value is true we need the TRUTHY and the null buffer will have 1 (meaning not null) @@ -483,7 +485,8 @@ impl BytesScalarImpl { predicate.len(), )); - let bytes = MutableBuffer::new_repeated(predicate.len(), value); + let mut bytes = MutableBuffer::with_capacity(0); + bytes.repeat_slice_n_times(value, predicate.len()); let bytes = Buffer::from(bytes); (bytes, offsets) @@ -585,7 +588,7 @@ impl BytesScalarImpl { if start > filled { let false_repeat_count = start - filled; // Push false value `repeat_count` times - mutable.push_slice_repeated(false_repeat_count, falsy_val); + mutable.repeat_slice_n_times(falsy_val, false_repeat_count); for _ in 0..false_repeat_count { offset_buffer_builder.push_length(falsy_len) @@ -594,7 +597,7 @@ impl BytesScalarImpl { let true_repeat_count = end - start; // fill with truthy values - mutable.push_slice_repeated(true_repeat_count, truthy_val); + mutable.repeat_slice_n_times(truthy_val, true_repeat_count); for _ in 0..true_repeat_count { offset_buffer_builder.push_length(truthy_len) @@ -605,7 +608,7 @@ impl BytesScalarImpl { if filled < predicate.len() { let false_repeat_count = predicate.len() - filled; // Copy the first item from the 'falsy' array into the output buffer. - mutable.push_slice_repeated(false_repeat_count, falsy_val); + mutable.repeat_slice_n_times(falsy_val, false_repeat_count); for _ in 0..false_repeat_count { offset_buffer_builder.push_length(falsy_len) From 11f72a027e1e61d8476003eeb4cd0e5aefb52c89 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Mon, 20 Oct 2025 14:09:50 +0300 Subject: [PATCH 11/21] updated with #8656 changes --- arrow-select/src/zip.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/arrow-select/src/zip.rs b/arrow-select/src/zip.rs index 2694d7634cf5..2abcdcd510f4 100644 --- a/arrow-select/src/zip.rs +++ b/arrow-select/src/zip.rs @@ -480,10 +480,7 @@ impl BytesScalarImpl { ) -> (Buffer, OffsetBuffer) { let value_length = value.len(); - let offsets = OffsetBuffer::::from_lengths(std::iter::repeat_n( - value_length, - predicate.len(), - )); + let offsets = OffsetBuffer::::from_repeated_length(value_length, predicate.len()); let mut bytes = MutableBuffer::with_capacity(0); bytes.repeat_slice_n_times(value, predicate.len()); From 0787956a3396429d75f3c4ed36d2590f55e0d8ec Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Mon, 20 Oct 2025 14:10:11 +0300 Subject: [PATCH 12/21] format --- arrow-select/src/zip.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/arrow-select/src/zip.rs b/arrow-select/src/zip.rs index 2abcdcd510f4..fca7355bda45 100644 --- a/arrow-select/src/zip.rs +++ b/arrow-select/src/zip.rs @@ -480,7 +480,8 @@ impl BytesScalarImpl { ) -> (Buffer, OffsetBuffer) { let value_length = value.len(); - let offsets = OffsetBuffer::::from_repeated_length(value_length, predicate.len()); + let offsets = + OffsetBuffer::::from_repeated_length(value_length, predicate.len()); let mut bytes = MutableBuffer::with_capacity(0); bytes.repeat_slice_n_times(value, predicate.len()); From 96cd9ccdb1940f4b09c72b44d4b6b060f83711f9 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Mon, 20 Oct 2025 14:46:06 +0300 Subject: [PATCH 13/21] add tests --- arrow-select/src/zip.rs | 119 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 119 insertions(+) diff --git a/arrow-select/src/zip.rs b/arrow-select/src/zip.rs index fca7355bda45..e4807b904830 100644 --- a/arrow-select/src/zip.rs +++ b/arrow-select/src/zip.rs @@ -741,4 +741,123 @@ mod test { let expected = Int32Array::from(vec![None, None, Some(42), Some(42), None]); assert_eq!(actual, &expected); } + + #[test] + fn test_zip_kernel_scalar_strings() { + let scalar_truthy = Scalar::new(StringArray::from(vec!["hello"])); + let scalar_falsy = Scalar::new(StringArray::from(vec!["world"])); + + let mask = BooleanArray::from(vec![true, false, true, false, true]); + let out = zip(&mask, &scalar_truthy, &scalar_falsy).unwrap(); + let actual = out.as_string::(); + let expected = StringArray::from(vec![ + Some("hello"), + Some("world"), + Some("hello"), + Some("world"), + Some("hello"), + ]); + assert_eq!(actual, &expected); + } + + #[test] + fn test_zip_kernel_scalar_binary() { + let truthy_bytes: &[u8] = b"\xFF\xFE\xFD"; + let falsy_bytes: &[u8] = b"world"; + let scalar_truthy = Scalar::new(BinaryArray::from_iter_values( + // Non valid UTF8 bytes + vec![truthy_bytes], + )); + let scalar_falsy = Scalar::new(BinaryArray::from_iter_values(vec![falsy_bytes])); + + let mask = BooleanArray::from(vec![true, false, true, false, true]); + let out = zip(&mask, &scalar_truthy, &scalar_falsy).unwrap(); + let actual = out.as_binary::(); + let expected = BinaryArray::from(vec![ + Some(truthy_bytes), + Some(falsy_bytes), + Some(truthy_bytes), + Some(falsy_bytes), + Some(truthy_bytes), + ]); + assert_eq!(actual, &expected); + } + + // Test to ensure that the precision and scale are kept when zipping Decimal128 data + #[test] + fn test_zip_decimal_with_custom_precision_and_scale() { + let arr = Decimal128Array::from_iter_values([12345, 456, 7890, -123223423432432]) + .with_precision_and_scale(20, 2) + .unwrap(); + + let arr: ArrayRef = Arc::new(arr); + + let scalar_1 = Scalar::new(arr.slice(0, 1)); + let scalar_2 = Scalar::new(arr.slice(1, 1)); + let null_scalar = Scalar::new(new_null_array(arr.data_type(), 1)); + let array_1: ArrayRef = arr.slice(0, 2); + let array_2: ArrayRef = arr.slice(2, 2); + + test_zip_output_data_types_for_input(scalar_1, scalar_2, null_scalar, array_1, array_2); + } + + // Test to ensure that the timezone is kept when zipping TimestampArray data + #[test] + fn test_zip_timestamp_with_timezone() { + let arr = TimestampSecondArray::from(vec![0, 1000, 2000, 4000]) + .with_timezone("+01:00".to_string()); + + let arr: ArrayRef = Arc::new(arr); + + let scalar_1 = Scalar::new(arr.slice(0, 1)); + let scalar_2 = Scalar::new(arr.slice(1, 1)); + let null_scalar = Scalar::new(new_null_array(arr.data_type(), 1)); + let array_1: ArrayRef = arr.slice(0, 2); + let array_2: ArrayRef = arr.slice(2, 2); + + test_zip_output_data_types_for_input(scalar_1, scalar_2, null_scalar, array_1, array_2); + } + + fn test_zip_output_data_types_for_input( + scalar_1: Scalar, + scalar_2: Scalar, + null_scalar: Scalar, + array_1: ArrayRef, + array_2: ArrayRef, + ) { + // non null Scalar vs non null Scalar + test_zip_output_data_type(&scalar_1, &scalar_2, 10); + + // null Scalar vs non-null Scalar (and vice versa) + test_zip_output_data_type(&null_scalar, &scalar_1, 10); + test_zip_output_data_type(&scalar_1, &null_scalar, 10); + + // non-null Scalar and array (and vice versa) + test_zip_output_data_type(&array_1.as_ref(), &scalar_1, array_1.len()); + test_zip_output_data_type(&scalar_1, &array_1.as_ref(), array_1.len()); + + // Array and null scalar (and vice versa) + test_zip_output_data_type(&array_1.as_ref(), &null_scalar, array_1.len()); + + test_zip_output_data_type(&null_scalar, &array_1.as_ref(), array_1.len()); + + // Both arrays + test_zip_output_data_type(&array_1.as_ref(), &array_2.as_ref(), array_1.len()); + } + + fn test_zip_output_data_type(truthy: &dyn Datum, falsy: &dyn Datum, mask_length: usize) { + let expected_data_type = truthy.get().0.data_type().clone(); + assert_eq!(&expected_data_type, falsy.get().0.data_type()); + + // Try different masks to test different paths + let mask_all_true = BooleanArray::from(vec![true; mask_length]); + let mask_all_false = BooleanArray::from(vec![false; mask_length]); + let mask_some_true_and_false = + BooleanArray::from((0..mask_length).map(|i| i % 2 == 0).collect::>()); + + for mask in [&mask_all_true, &mask_all_false, &mask_some_true_and_false] { + let out = zip(mask, truthy, falsy).unwrap(); + assert_eq!(out.data_type(), &expected_data_type); + } + } } From 72070dc54216b37c789d17f0464c8e655dc31194 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Mon, 20 Oct 2025 14:59:42 +0300 Subject: [PATCH 14/21] simplify implementation --- arrow-buffer/src/buffer/mutable.rs | 51 ++++++++++-------------------- 1 file changed, 16 insertions(+), 35 deletions(-) diff --git a/arrow-buffer/src/buffer/mutable.rs b/arrow-buffer/src/buffer/mutable.rs index 5239b7e46736..986e221db7c6 100644 --- a/arrow-buffer/src/buffer/mutable.rs +++ b/arrow-buffer/src/buffer/mutable.rs @@ -258,11 +258,6 @@ impl MutableBuffer { return; } - // We will use doubling strategy to fill the buffer in log(repeat_count) steps - - // If we keep extending from ourself we will reach it pretty fast - let final_len_to_repeat = repeat_count * bytes_to_repeat; - // Save the length before we do all the copies to know where to start from let length_before = self.len; @@ -270,15 +265,24 @@ impl MutableBuffer { self.extend_from_slice(slice_to_repeat); // This tracks how much bytes we have added by repeating so far - let mut added_repeats_length = bytes_to_repeat; + let added_repeats_length = bytes_to_repeat; assert_eq!( self.len - length_before, added_repeats_length, "should copy exactly the same number of bytes" ); - // Copy in doubling steps to make the number of calls logarithmic and fast (as we copy large chunks at a time) - while added_repeats_length * 2 <= final_len_to_repeat { + // Number of times the slice was repeated + let mut already_repeated_times = 1; + + // We will use doubling strategy to fill the buffer in log(repeat_count) steps + while already_repeated_times < repeat_count { + // How many slices can we copy in this iteration + // (either double what we have, or just the remaining ones) + let number_of_slices_to_copy = + already_repeated_times.min(repeat_count - already_repeated_times); + let number_of_bytes_to_copy = number_of_slices_to_copy * bytes_to_repeat; + unsafe { // Get to the start of the data before we started copying anything let src = self.data.as_ptr().add(length_before) as *const u8; @@ -286,38 +290,15 @@ impl MutableBuffer { // Go to the current location to copy to (end of current data) let dst = self.data.as_ptr().add(self.len); - // SAFETY: the pointers are not overlapping as there is `added_repeats_length` exactly - // between them - std::ptr::copy_nonoverlapping(src, dst, added_repeats_length) + // SAFETY: the pointers are not overlapping as there is `number_of_bytes_to_copy` or less between them + std::ptr::copy_nonoverlapping(src, dst, number_of_bytes_to_copy) } // Advance the length by the amount of data we just copied (doubled) - self.len += added_repeats_length; - - // Double the amount of data we have added so far - added_repeats_length *= 2; - } - - // Handle the remainder in single copy. - - // the amount left to copy is guaranteed to be less than what we have already copied - let last_amount_to_copy = final_len_to_repeat - added_repeats_length; - assert!( - last_amount_to_copy <= final_len_to_repeat, - "the last copy should not overlap" - ); - - if last_amount_to_copy == 0 { - return; - } - - unsafe { - let src = self.data.as_ptr().add(length_before) as *const u8; - let dst = self.data.as_ptr().add(self.len); + self.len += number_of_bytes_to_copy; - std::ptr::copy_nonoverlapping(src, dst, last_amount_to_copy) + already_repeated_times += number_of_slices_to_copy; } - self.len += last_amount_to_copy; } #[cold] From 37b2cdaa4977c77a494c369f7b4d4139502783b5 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Mon, 20 Oct 2025 15:12:49 +0300 Subject: [PATCH 15/21] add example and test for scalar zipper --- arrow-select/src/zip.rs | 47 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 46 insertions(+), 1 deletion(-) diff --git a/arrow-select/src/zip.rs b/arrow-select/src/zip.rs index e4807b904830..ed8e00873c64 100644 --- a/arrow-select/src/zip.rs +++ b/arrow-select/src/zip.rs @@ -200,6 +200,23 @@ fn zip_impl( /// /// Useful for using in `IF THEN ELSE END` expressions /// +/// # Example +/// ``` +/// # use std::sync::Arc; +/// # use arrow_array::{ArrayRef, BooleanArray, Int32Array, Scalar, cast::AsArray, types::Int32Type}; +/// +/// # use arrow_select::zip::ScalarZipper; +/// let scalar_truthy = Scalar::new(Int32Array::from_value(42, 1)); +/// let scalar_falsy = Scalar::new(Int32Array::from_value(123, 1)); +/// let zipper = ScalarZipper::try_new(&scalar_truthy, &scalar_falsy).unwrap(); +/// +/// // Later when we have a boolean mask +/// let mask = BooleanArray::from(vec![true, false, true, false, true]); +/// let result = zipper.zip(&mask).unwrap(); +/// let actual = result.as_primitive::(); +/// let expected = Int32Array::from(vec![Some(42), Some(123), Some(42), Some(123), Some(42)]); +/// ``` +/// #[derive(Debug, Clone)] pub struct ScalarZipper { zip_impl: Arc, @@ -273,9 +290,15 @@ impl ScalarZipper { Ok(Self { zip_impl }) } + + /// Creating output array based on input boolean array and the two scalar values the zipper was created with + /// See struct level documentation for examples. + pub fn zip(&self, mask: &BooleanArray) -> Result { + self.zip_impl.create_output(mask) + } } -/// Impl for creating output array based on input boolean array +/// Impl for creating output array based on a mask trait ZipImpl: Debug { /// Creating output array based on input boolean array fn create_output(&self, input: &BooleanArray) -> Result; @@ -631,6 +654,7 @@ fn combine_nulls_and_false(predicate: &BooleanArray) -> BooleanBuffer { #[cfg(test)] mod test { use super::*; + use arrow_array::types::Int32Type; #[test] fn test_zip_kernel_one() { @@ -742,6 +766,27 @@ mod test { assert_eq!(actual, &expected); } + #[test] + fn test_scalar_zipper() { + let scalar_truthy = Scalar::new(Int32Array::from_value(42, 1)); + let scalar_falsy = Scalar::new(Int32Array::from_value(123, 1)); + + let mask = BooleanArray::from(vec![false, false, true, true, false]); + + let scalar_zipper = ScalarZipper::try_new(&scalar_truthy, &scalar_falsy).unwrap(); + let out = scalar_zipper.zip(&mask).unwrap(); + let actual = out.as_primitive::(); + let expected = Int32Array::from(vec![Some(123), Some(123), Some(42), Some(42), Some(123)]); + assert_eq!(actual, &expected); + + // test with different mask length as well + let mask = BooleanArray::from(vec![true, false, true]); + let out = scalar_zipper.zip(&mask).unwrap(); + let actual = out.as_primitive::(); + let expected = Int32Array::from(vec![Some(42), Some(123), Some(42)]); + assert_eq!(actual, &expected); + } + #[test] fn test_zip_kernel_scalar_strings() { let scalar_truthy = Scalar::new(StringArray::from(vec!["hello"])); From dd453942ae379d453bb2269b6adbbe00f45a13db Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Mon, 20 Oct 2025 17:23:17 +0300 Subject: [PATCH 16/21] add send and sync --- arrow-select/src/zip.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arrow-select/src/zip.rs b/arrow-select/src/zip.rs index ed8e00873c64..98fceff0523e 100644 --- a/arrow-select/src/zip.rs +++ b/arrow-select/src/zip.rs @@ -299,7 +299,7 @@ impl ScalarZipper { } /// Impl for creating output array based on a mask -trait ZipImpl: Debug { +trait ZipImpl: Debug + Send + Sync { /// Creating output array based on input boolean array fn create_output(&self, input: &BooleanArray) -> Result; } From 8046f262ff05265009c6ee5e730905e0b9b6f15a Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Sun, 26 Oct 2025 14:42:21 +0200 Subject: [PATCH 17/21] add more tests --- arrow-select/src/zip.rs | 403 ++++++++++++++++++++++++++++++++-------- 1 file changed, 330 insertions(+), 73 deletions(-) diff --git a/arrow-select/src/zip.rs b/arrow-select/src/zip.rs index 98fceff0523e..db2aac9db22b 100644 --- a/arrow-select/src/zip.rs +++ b/arrow-select/src/zip.rs @@ -17,7 +17,7 @@ //! [`zip`]: Combine values from two arrays based on boolean mask -use crate::filter::SlicesIterator; +use crate::filter::{SlicesIterator, prep_null_mask_filter}; use arrow_array::cast::AsArray; use arrow_array::types::{BinaryType, ByteArrayType, LargeBinaryType, LargeUtf8Type, Utf8Type}; use arrow_array::*; @@ -31,7 +31,7 @@ use arrow_schema::{ArrowError, DataType}; use std::fmt::{Debug, Formatter}; use std::hash::Hash; use std::marker::PhantomData; -use std::ops::{BitAnd, Not}; +use std::ops::Not; use std::sync::Arc; /// Zip two arrays by some boolean mask. @@ -149,6 +149,7 @@ fn zip_impl( falsy: &ArrayData, falsy_is_scalar: bool, ) -> Result { + let mask_buffer = maybe_prep_null_mask_filter(mask); let mut mutable = MutableArrayData::new(vec![truthy, falsy], false, truthy.len()); // the SlicesIterator slices only the true values. So the gaps left by this iterator we need to @@ -157,7 +158,7 @@ fn zip_impl( // keep track of how much is filled let mut filled = 0; - SlicesIterator::new(mask).for_each(|(start, end)| { + SlicesIterator::from(&mask_buffer).for_each(|(start, end)| { // the gap needs to be filled with falsy values if start > filled { if falsy_is_scalar { @@ -359,9 +360,10 @@ impl PrimitiveScalarImpl { Some(value) } } -} -impl PrimitiveScalarImpl { + /// return an output array that has + /// `value` in all locations where predicate is true + /// `null` otherwise fn get_scalar_and_null_buffer_for_single_non_nullable( predicate: BooleanBuffer, value: T::Native, @@ -378,7 +380,7 @@ impl ZipImpl for PrimitiveScalarImpl { fn create_output(&self, predicate: &BooleanArray) -> Result { let result_len = predicate.len(); // Nulls are treated as false - let predicate = combine_nulls_and_false(predicate); + let predicate = maybe_prep_null_mask_filter(predicate); let (scalars, nulls): (Vec, Option) = match (self.truthy, self.falsy) { @@ -459,6 +461,9 @@ impl BytesScalarImpl { } } + /// return an output array that has + /// `value` in all locations where predicate is true + /// `null` otherwise fn get_scalar_and_null_buffer_for_single_non_nullable( predicate: BooleanBuffer, value: &[u8], @@ -497,73 +502,24 @@ impl BytesScalarImpl { (bytes, offsets, Some(nulls)) } + /// Create a [`Buffer`] where `value` slice is repeated `number_of_values` times + /// and [`OffsetBuffer`] where there are `number_of_values` lengths, and all equals to `value` length fn get_bytes_and_offset_for_all_same_value( - predicate: &BooleanBuffer, + number_of_values: usize, value: &[u8], ) -> (Buffer, OffsetBuffer) { let value_length = value.len(); let offsets = - OffsetBuffer::::from_repeated_length(value_length, predicate.len()); + OffsetBuffer::::from_repeated_length(value_length, number_of_values); let mut bytes = MutableBuffer::with_capacity(0); - bytes.repeat_slice_n_times(value, predicate.len()); + bytes.repeat_slice_n_times(value, number_of_values); let bytes = Buffer::from(bytes); (bytes, offsets) } -} - -impl ZipImpl for BytesScalarImpl { - fn create_output(&self, predicate: &BooleanArray) -> Result { - let result_len = predicate.len(); - // Nulls are treated as false - let predicate = combine_nulls_and_false(predicate); - - let (bytes, offsets, nulls): (Buffer, OffsetBuffer, Option) = - match (self.truthy.as_deref(), self.falsy.as_deref()) { - (Some(truthy_val), Some(falsy_val)) => { - let (bytes, offsets) = - Self::create_output_on_non_nulls(&predicate, truthy_val, falsy_val); - - (bytes, offsets, None) - } - (Some(truthy_val), None) => { - Self::get_scalar_and_null_buffer_for_single_non_nullable(predicate, truthy_val) - } - (None, Some(falsy_val)) => { - // Flipping the boolean buffer as we want the opposite of the TRUE case - // - // if the condition is true we want null so we need to NOT the value so we get 0 (meaning null) - // if the condition is false we want the FALSE value so we need to NOT the value so we get 1 (meaning not null) - let predicate = predicate.not(); - Self::get_scalar_and_null_buffer_for_single_non_nullable(predicate, falsy_val) - } - (None, None) => { - // All values are null - let nulls = NullBuffer::new_null(result_len); - - ( - // Empty bytes - Buffer::from(&[]), - // All nulls so all lengths are 0 - OffsetBuffer::::new_zeroed(predicate.len()), - Some(nulls), - ) - } - }; - - let output = unsafe { - // Safety: the values are based on valid inputs - // and `try_new` is expensive for strings as it validate that the input is valid utf8 - GenericByteArray::::new_unchecked(offsets, bytes, nulls) - }; - - Ok(Arc::new(output)) - } -} -impl BytesScalarImpl { fn create_output_on_non_nulls( predicate: &BooleanBuffer, truthy_val: &[u8], @@ -576,14 +532,14 @@ impl BytesScalarImpl { // All values are falsy let (bytes, offsets) = - Self::get_bytes_and_offset_for_all_same_value(predicate, falsy_val); + Self::get_bytes_and_offset_for_all_same_value(predicate.len(), falsy_val); return (bytes, offsets); } n if n == predicate.len() => { // All values are truthy let (bytes, offsets) = - Self::get_bytes_and_offset_for_all_same_value(predicate, truthy_val); + Self::get_bytes_and_offset_for_all_same_value(predicate.len(), truthy_val); return (bytes, offsets); } @@ -594,7 +550,7 @@ impl BytesScalarImpl { } let total_number_of_bytes = - true_count * truthy_val.len() + (predicate.len() - true_count) * falsy_val.len(); + true_count * truthy_val.len() + (predicate.len() - true_count) * falsy_val.len(); let mut mutable = MutableBuffer::with_capacity(total_number_of_bytes); let mut offset_buffer_builder = OffsetBufferBuilder::::new(predicate.len()); @@ -640,14 +596,63 @@ impl BytesScalarImpl { } } -fn combine_nulls_and_false(predicate: &BooleanArray) -> BooleanBuffer { - if let Some(nulls) = predicate.nulls().filter(|n| n.null_count() > 0) { - predicate.values().bitand( - // nulls are represented as 0 (false) in the values buffer - nulls.inner(), - ) - } else { +impl ZipImpl for BytesScalarImpl { + fn create_output(&self, predicate: &BooleanArray) -> Result { + let result_len = predicate.len(); + // Nulls are treated as false + let predicate = maybe_prep_null_mask_filter(predicate); + + let (bytes, offsets, nulls): (Buffer, OffsetBuffer, Option) = + match (self.truthy.as_deref(), self.falsy.as_deref()) { + (Some(truthy_val), Some(falsy_val)) => { + let (bytes, offsets) = + Self::create_output_on_non_nulls(&predicate, truthy_val, falsy_val); + + (bytes, offsets, None) + } + (Some(truthy_val), None) => { + Self::get_scalar_and_null_buffer_for_single_non_nullable(predicate, truthy_val) + } + (None, Some(falsy_val)) => { + // Flipping the boolean buffer as we want the opposite of the TRUE case + // + // if the condition is true we want null so we need to NOT the value so we get 0 (meaning null) + // if the condition is false we want the FALSE value so we need to NOT the value so we get 1 (meaning not null) + let predicate = predicate.not(); + Self::get_scalar_and_null_buffer_for_single_non_nullable(predicate, falsy_val) + } + (None, None) => { + // All values are null + let nulls = NullBuffer::new_null(result_len); + + ( + // Empty bytes + Buffer::from(&[]), + // All nulls so all lengths are 0 + OffsetBuffer::::new_zeroed(predicate.len()), + Some(nulls), + ) + } + }; + + let output = unsafe { + // Safety: the values are based on valid inputs + // and `try_new` is expensive for strings as it validate that the input is valid utf8 + GenericByteArray::::new_unchecked(offsets, bytes, nulls) + }; + + Ok(Arc::new(output)) + } +} + +fn maybe_prep_null_mask_filter(predicate: &BooleanArray) -> BooleanBuffer { + // Nulls are treated as false + if predicate.null_count() == 0 { predicate.values().clone() + } else { + let cleaned = prep_null_mask_filter(predicate); + let (boolean_buffer, _) = cleaned.into_parts(); + boolean_buffer } } @@ -731,7 +736,7 @@ mod test { } #[test] - fn test_zip_kernel_scalar_both() { + fn test_zip_kernel_scalar_both_mask_ends_with_true() { let scalar_truthy = Scalar::new(Int32Array::from_value(42, 1)); let scalar_falsy = Scalar::new(Int32Array::from_value(123, 1)); @@ -743,7 +748,49 @@ mod test { } #[test] - fn test_zip_kernel_scalar_none_1() { + fn test_zip_kernel_scalar_both_mask_ends_with_false() { + let scalar_truthy = Scalar::new(Int32Array::from_value(42, 1)); + let scalar_falsy = Scalar::new(Int32Array::from_value(123, 1)); + + let mask = BooleanArray::from(vec![true, true, false, true, false, false]); + let out = zip(&mask, &scalar_truthy, &scalar_falsy).unwrap(); + let actual = out.as_any().downcast_ref::().unwrap(); + let expected = Int32Array::from(vec![Some(42), Some(42), Some(123), Some(42), Some(123), Some(123)]); + assert_eq!(actual, &expected); + } + + #[test] + fn test_zip_kernel_primitive_scalar_with_boolean_array_mask_with_nulls_should_be_treated_as_false() { + let scalar_truthy = Scalar::new(Int32Array::from_value(42, 1)); + let scalar_falsy = Scalar::new(Int32Array::from_value(123, 1)); + + let mask = { + let booleans = BooleanBuffer::from(vec![true, true, false, true, false, false]); + let nulls = NullBuffer::from(vec![ + true, + true, + true, + false, // null treated as false even though in the original mask it was true + true, + true + ]); + BooleanArray::new(booleans, Some(nulls)) + }; + let out = zip(&mask, &scalar_truthy, &scalar_falsy).unwrap(); + let actual = out.as_any().downcast_ref::().unwrap(); + let expected = Int32Array::from(vec![ + Some(42), + Some(42), + Some(123), + Some(123), // true in mask but null + Some(123), + Some(123) + ]); + assert_eq!(actual, &expected); + } + + #[test] + fn test_zip_kernel_primitive_scalar_none_1() { let scalar_truthy = Scalar::new(Int32Array::from_value(42, 1)); let scalar_falsy = Scalar::new(Int32Array::new_null(1)); @@ -755,7 +802,7 @@ mod test { } #[test] - fn test_zip_kernel_scalar_none_2() { + fn test_zip_kernel_primitive_scalar_none_2() { let scalar_truthy = Scalar::new(Int32Array::from_value(42, 1)); let scalar_falsy = Scalar::new(Int32Array::new_null(1)); @@ -766,6 +813,168 @@ mod test { assert_eq!(actual, &expected); } + #[test] + fn test_zip_kernel_primitive_scalar_both_null() { + let scalar_truthy = Scalar::new(Int32Array::new_null(1)); + let scalar_falsy = Scalar::new(Int32Array::new_null(1)); + + let mask = BooleanArray::from(vec![false, false, true, true, false]); + let out = zip(&mask, &scalar_truthy, &scalar_falsy).unwrap(); + let actual = out.as_any().downcast_ref::().unwrap(); + let expected = Int32Array::from(vec![None, None, None, None, None]); + assert_eq!(actual, &expected); + } + + #[test] + fn test_zip_kernel_large_string_scalar_with_boolean_array_mask_with_nulls_should_be_treated_as_false() { + let scalar_truthy = Scalar::new(LargeStringArray::from_iter_values(&["test"])); + let scalar_falsy = Scalar::new(LargeStringArray::from_iter_values(&["something else"])); + + let mask = { + let booleans = BooleanBuffer::from(vec![true, true, false, true, false, false]); + let nulls = NullBuffer::from(vec![ + true, + true, + true, + false, // null treated as false even though in the original mask it was true + true, + true + ]); + BooleanArray::new(booleans, Some(nulls)) + }; + let out = zip(&mask, &scalar_truthy, &scalar_falsy).unwrap(); + let actual = out.as_any().downcast_ref::().unwrap(); + let expected = LargeStringArray::from_iter(vec![ + Some("test"), + Some("test"), + Some("something else"), + Some("something else"), // true in mask but null + Some("something else"), + Some("something else") + ]); + assert_eq!(actual, &expected); + } + + #[test] + fn test_zip_kernel_bytes_scalar_none_1() { + let scalar_truthy = Scalar::new(StringArray::from_iter_values(&["hello"])); + let scalar_falsy = Scalar::new(StringArray::new_null(1)); + + let mask = BooleanArray::from(vec![true, true, false, false, true]); + let out = zip(&mask, &scalar_truthy, &scalar_falsy).unwrap(); + let actual = out.as_any().downcast_ref::().unwrap(); + let expected = StringArray::from_iter(vec![Some("hello"), Some("hello"), None, None, Some("hello")]); + assert_eq!(actual, &expected); + } + + #[test] + fn test_zip_kernel_bytes_scalar_none_2() { + let scalar_truthy = Scalar::new(StringArray::new_null(1)); + let scalar_falsy = Scalar::new(StringArray::from_iter_values(&["hello"])); + + let mask = BooleanArray::from(vec![true, true, false, false, true]); + let out = zip(&mask, &scalar_truthy, &scalar_falsy).unwrap(); + let actual = out.as_any().downcast_ref::().unwrap(); + let expected = StringArray::from_iter(vec![None, None, Some("hello"), Some("hello"), None]); + assert_eq!(actual, &expected); + } + + #[test] + fn test_zip_kernel_bytes_scalar_both() { + let scalar_truthy = Scalar::new(StringArray::from_iter_values(&["test"])); + let scalar_falsy = Scalar::new(StringArray::from_iter_values(&["something else"])); + + // mask ends with false + let mask = BooleanArray::from(vec![true, true, false, true, false, false]); + let out = zip(&mask, &scalar_truthy, &scalar_falsy).unwrap(); + let actual = out.as_any().downcast_ref::().unwrap(); + let expected = StringArray::from_iter(vec![ + Some("test"), + Some("test"), + Some("something else"), + Some("test"), + Some("something else"), + Some("something else"), + ]); + assert_eq!(actual, &expected); + } + + #[test] + fn test_zip_scalar_bytes_only_taking_one_side() { + let mask_len = 5; + let all_true_mask = BooleanArray::from(vec![true; mask_len]); + let all_false_mask = BooleanArray::from(vec![false; mask_len]); + + let null_scalar = Scalar::new(StringArray::new_null(1)); + let non_null_scalar_1 = Scalar::new(StringArray::from_iter_values(&["test"])); + let non_null_scalar_2 = Scalar::new(StringArray::from_iter_values(&["something else"])); + + { + // 1. Test where left is null and right is non-null + // and mask is all true + let out = zip(&all_true_mask, &null_scalar, &non_null_scalar_1).unwrap(); + let actual = out.as_string::(); + let expected = StringArray::from_iter(std::iter::repeat_n(None::<&str>, mask_len)); + assert_eq!(actual, &expected); + } + + { + // 2. Test where left is null and right is non-null + // and mask is all false + let out = zip(&all_false_mask, &null_scalar, &non_null_scalar_1).unwrap(); + let actual = out.as_string::(); + let expected = StringArray::from_iter(std::iter::repeat_n(Some("test"), mask_len)); + assert_eq!(actual, &expected); + } + + { + // 3. Test where left is non-null and right is null + // and mask is all true + let out = zip(&all_true_mask, &non_null_scalar_1, &null_scalar).unwrap(); + let actual = out.as_string::(); + let expected = StringArray::from_iter(std::iter::repeat_n(Some("test"), mask_len)); + assert_eq!(actual, &expected); + } + + { + // 4. Test where left is non-null and right is null + // and mask is all false + let out = zip(&all_false_mask, &non_null_scalar_1, &null_scalar).unwrap(); + let actual = out.as_string::(); + let expected = StringArray::from_iter(std::iter::repeat_n(None::<&str>, mask_len)); + assert_eq!(actual, &expected); + } + + { + // 5. Test where both left and right are not null + // and mask is all true + let out = zip(&all_true_mask, &non_null_scalar_1, &non_null_scalar_2).unwrap(); + let actual = out.as_string::(); + let expected = StringArray::from_iter(std::iter::repeat_n(Some("test"), mask_len)); + assert_eq!(actual, &expected); + } + + { + // 6. Test where both left and right are not null + // and mask is all false + let out = zip(&all_false_mask, &non_null_scalar_1, &non_null_scalar_2).unwrap(); + let actual = out.as_string::(); + let expected = StringArray::from_iter(std::iter::repeat_n(Some("something else"), mask_len)); + assert_eq!(actual, &expected); + } + + { + // 7. Test where both left and right are null + // and mask is random + let mask = BooleanArray::from(vec![true, false, true, false, true]); + let out = zip(&mask, &null_scalar, &null_scalar).unwrap(); + let actual = out.as_string::(); + let expected = StringArray::from_iter(std::iter::repeat_n(None::<&str>, mask_len)); + assert_eq!(actual, &expected); + } + + } + #[test] fn test_scalar_zipper() { let scalar_truthy = Scalar::new(Int32Array::from_value(42, 1)); @@ -828,6 +1037,28 @@ mod test { assert_eq!(actual, &expected); } + #[test] + fn test_zip_kernel_scalar_large_binary() { + let truthy_bytes: &[u8] = b"hey"; + let falsy_bytes: &[u8] = b"world"; + let scalar_truthy = Scalar::new(LargeBinaryArray::from_iter_values( + vec![truthy_bytes], + )); + let scalar_falsy = Scalar::new(LargeBinaryArray::from_iter_values(vec![falsy_bytes])); + + let mask = BooleanArray::from(vec![true, false, true, false, true]); + let out = zip(&mask, &scalar_truthy, &scalar_falsy).unwrap(); + let actual = out.as_binary::(); + let expected = LargeBinaryArray::from(vec![ + Some(truthy_bytes), + Some(falsy_bytes), + Some(truthy_bytes), + Some(falsy_bytes), + Some(truthy_bytes), + ]); + assert_eq!(actual, &expected); + } + // Test to ensure that the precision and scale are kept when zipping Decimal128 data #[test] fn test_zip_decimal_with_custom_precision_and_scale() { @@ -905,4 +1136,30 @@ mod test { assert_eq!(out.data_type(), &expected_data_type); } } + + #[test] + fn zip_scalar_fallback_impl() { + let truthy_list_item_scalar = Some(vec![Some(1), None, Some(3)]); + let truthy_list_array_scalar = Scalar::new(ListArray::from_iter_primitive::(vec![ + truthy_list_item_scalar.clone() + ])); + let falsy_list_item_scalar = Some(vec![None, Some(2), Some(4)]); + let falsy_list_array_scalar = Scalar::new(ListArray::from_iter_primitive::(vec![ + falsy_list_item_scalar.clone() + ])); + let mask = BooleanArray::from(vec![true, false, true, false, false, true, false]); + let out = zip(&mask, &truthy_list_array_scalar, &falsy_list_array_scalar).unwrap(); + let actual = out.as_list::(); + + let expected = ListArray::from_iter_primitive::(vec![ + truthy_list_item_scalar.clone(), + falsy_list_item_scalar.clone(), + truthy_list_item_scalar.clone(), + falsy_list_item_scalar.clone(), + falsy_list_item_scalar.clone(), + truthy_list_item_scalar.clone(), + falsy_list_item_scalar.clone(), + ]); + assert_eq!(actual, &expected); + } } From f9439509002e57e40281ce1bb62e300a2f4aebce Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Sun, 26 Oct 2025 15:04:41 +0200 Subject: [PATCH 18/21] pass true in fallback zip impl --- arrow-select/src/zip.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arrow-select/src/zip.rs b/arrow-select/src/zip.rs index db2aac9db22b..caefac0dfd97 100644 --- a/arrow-select/src/zip.rs +++ b/arrow-select/src/zip.rs @@ -322,7 +322,7 @@ impl FallbackImpl { impl ZipImpl for FallbackImpl { fn create_output(&self, predicate: &BooleanArray) -> Result { - zip_impl(predicate, &self.truthy, false, &self.falsy, false) + zip_impl(predicate, &self.truthy, true, &self.falsy, true) } } From aa276c1fc7fcf6616ab877589c55f2d388417f68 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Sun, 26 Oct 2025 15:09:21 +0200 Subject: [PATCH 19/21] lint and format --- arrow-select/src/zip.rs | 85 +++++++++++++++++++++++------------------ 1 file changed, 47 insertions(+), 38 deletions(-) diff --git a/arrow-select/src/zip.rs b/arrow-select/src/zip.rs index caefac0dfd97..c0ff7b3b6fba 100644 --- a/arrow-select/src/zip.rs +++ b/arrow-select/src/zip.rs @@ -532,14 +532,14 @@ impl BytesScalarImpl { // All values are falsy let (bytes, offsets) = - Self::get_bytes_and_offset_for_all_same_value(predicate.len(), falsy_val); + Self::get_bytes_and_offset_for_all_same_value(predicate.len(), falsy_val); return (bytes, offsets); } n if n == predicate.len() => { // All values are truthy let (bytes, offsets) = - Self::get_bytes_and_offset_for_all_same_value(predicate.len(), truthy_val); + Self::get_bytes_and_offset_for_all_same_value(predicate.len(), truthy_val); return (bytes, offsets); } @@ -550,7 +550,7 @@ impl BytesScalarImpl { } let total_number_of_bytes = - true_count * truthy_val.len() + (predicate.len() - true_count) * falsy_val.len(); + true_count * truthy_val.len() + (predicate.len() - true_count) * falsy_val.len(); let mut mutable = MutableBuffer::with_capacity(total_number_of_bytes); let mut offset_buffer_builder = OffsetBufferBuilder::::new(predicate.len()); @@ -755,24 +755,29 @@ mod test { let mask = BooleanArray::from(vec![true, true, false, true, false, false]); let out = zip(&mask, &scalar_truthy, &scalar_falsy).unwrap(); let actual = out.as_any().downcast_ref::().unwrap(); - let expected = Int32Array::from(vec![Some(42), Some(42), Some(123), Some(42), Some(123), Some(123)]); + let expected = Int32Array::from(vec![ + Some(42), + Some(42), + Some(123), + Some(42), + Some(123), + Some(123), + ]); assert_eq!(actual, &expected); } #[test] - fn test_zip_kernel_primitive_scalar_with_boolean_array_mask_with_nulls_should_be_treated_as_false() { + fn test_zip_kernel_primitive_scalar_with_boolean_array_mask_with_nulls_should_be_treated_as_false() + { let scalar_truthy = Scalar::new(Int32Array::from_value(42, 1)); let scalar_falsy = Scalar::new(Int32Array::from_value(123, 1)); let mask = { let booleans = BooleanBuffer::from(vec![true, true, false, true, false, false]); let nulls = NullBuffer::from(vec![ - true, - true, - true, + true, true, true, false, // null treated as false even though in the original mask it was true - true, - true + true, true, ]); BooleanArray::new(booleans, Some(nulls)) }; @@ -784,7 +789,7 @@ mod test { Some(123), Some(123), // true in mask but null Some(123), - Some(123) + Some(123), ]); assert_eq!(actual, &expected); } @@ -826,19 +831,17 @@ mod test { } #[test] - fn test_zip_kernel_large_string_scalar_with_boolean_array_mask_with_nulls_should_be_treated_as_false() { - let scalar_truthy = Scalar::new(LargeStringArray::from_iter_values(&["test"])); - let scalar_falsy = Scalar::new(LargeStringArray::from_iter_values(&["something else"])); + fn test_zip_kernel_large_string_scalar_with_boolean_array_mask_with_nulls_should_be_treated_as_false() + { + let scalar_truthy = Scalar::new(LargeStringArray::from_iter_values(["test"])); + let scalar_falsy = Scalar::new(LargeStringArray::from_iter_values(["something else"])); let mask = { let booleans = BooleanBuffer::from(vec![true, true, false, true, false, false]); let nulls = NullBuffer::from(vec![ - true, - true, - true, + true, true, true, false, // null treated as false even though in the original mask it was true - true, - true + true, true, ]); BooleanArray::new(booleans, Some(nulls)) }; @@ -850,27 +853,33 @@ mod test { Some("something else"), Some("something else"), // true in mask but null Some("something else"), - Some("something else") + Some("something else"), ]); assert_eq!(actual, &expected); } #[test] fn test_zip_kernel_bytes_scalar_none_1() { - let scalar_truthy = Scalar::new(StringArray::from_iter_values(&["hello"])); + let scalar_truthy = Scalar::new(StringArray::from_iter_values(["hello"])); let scalar_falsy = Scalar::new(StringArray::new_null(1)); let mask = BooleanArray::from(vec![true, true, false, false, true]); let out = zip(&mask, &scalar_truthy, &scalar_falsy).unwrap(); let actual = out.as_any().downcast_ref::().unwrap(); - let expected = StringArray::from_iter(vec![Some("hello"), Some("hello"), None, None, Some("hello")]); + let expected = StringArray::from_iter(vec![ + Some("hello"), + Some("hello"), + None, + None, + Some("hello"), + ]); assert_eq!(actual, &expected); } #[test] fn test_zip_kernel_bytes_scalar_none_2() { let scalar_truthy = Scalar::new(StringArray::new_null(1)); - let scalar_falsy = Scalar::new(StringArray::from_iter_values(&["hello"])); + let scalar_falsy = Scalar::new(StringArray::from_iter_values(["hello"])); let mask = BooleanArray::from(vec![true, true, false, false, true]); let out = zip(&mask, &scalar_truthy, &scalar_falsy).unwrap(); @@ -881,8 +890,8 @@ mod test { #[test] fn test_zip_kernel_bytes_scalar_both() { - let scalar_truthy = Scalar::new(StringArray::from_iter_values(&["test"])); - let scalar_falsy = Scalar::new(StringArray::from_iter_values(&["something else"])); + let scalar_truthy = Scalar::new(StringArray::from_iter_values(["test"])); + let scalar_falsy = Scalar::new(StringArray::from_iter_values(["something else"])); // mask ends with false let mask = BooleanArray::from(vec![true, true, false, true, false, false]); @@ -906,8 +915,8 @@ mod test { let all_false_mask = BooleanArray::from(vec![false; mask_len]); let null_scalar = Scalar::new(StringArray::new_null(1)); - let non_null_scalar_1 = Scalar::new(StringArray::from_iter_values(&["test"])); - let non_null_scalar_2 = Scalar::new(StringArray::from_iter_values(&["something else"])); + let non_null_scalar_1 = Scalar::new(StringArray::from_iter_values(["test"])); + let non_null_scalar_2 = Scalar::new(StringArray::from_iter_values(["something else"])); { // 1. Test where left is null and right is non-null @@ -959,7 +968,8 @@ mod test { // and mask is all false let out = zip(&all_false_mask, &non_null_scalar_1, &non_null_scalar_2).unwrap(); let actual = out.as_string::(); - let expected = StringArray::from_iter(std::iter::repeat_n(Some("something else"), mask_len)); + let expected = + StringArray::from_iter(std::iter::repeat_n(Some("something else"), mask_len)); assert_eq!(actual, &expected); } @@ -972,7 +982,6 @@ mod test { let expected = StringArray::from_iter(std::iter::repeat_n(None::<&str>, mask_len)); assert_eq!(actual, &expected); } - } #[test] @@ -1041,9 +1050,7 @@ mod test { fn test_zip_kernel_scalar_large_binary() { let truthy_bytes: &[u8] = b"hey"; let falsy_bytes: &[u8] = b"world"; - let scalar_truthy = Scalar::new(LargeBinaryArray::from_iter_values( - vec![truthy_bytes], - )); + let scalar_truthy = Scalar::new(LargeBinaryArray::from_iter_values(vec![truthy_bytes])); let scalar_falsy = Scalar::new(LargeBinaryArray::from_iter_values(vec![falsy_bytes])); let mask = BooleanArray::from(vec![true, false, true, false, true]); @@ -1140,13 +1147,15 @@ mod test { #[test] fn zip_scalar_fallback_impl() { let truthy_list_item_scalar = Some(vec![Some(1), None, Some(3)]); - let truthy_list_array_scalar = Scalar::new(ListArray::from_iter_primitive::(vec![ - truthy_list_item_scalar.clone() - ])); + let truthy_list_array_scalar = + Scalar::new(ListArray::from_iter_primitive::(vec![ + truthy_list_item_scalar.clone(), + ])); let falsy_list_item_scalar = Some(vec![None, Some(2), Some(4)]); - let falsy_list_array_scalar = Scalar::new(ListArray::from_iter_primitive::(vec![ - falsy_list_item_scalar.clone() - ])); + let falsy_list_array_scalar = + Scalar::new(ListArray::from_iter_primitive::(vec![ + falsy_list_item_scalar.clone(), + ])); let mask = BooleanArray::from(vec![true, false, true, false, false, true, false]); let out = zip(&mask, &truthy_list_array_scalar, &falsy_list_array_scalar).unwrap(); let actual = out.as_list::(); From 2f74165685805d6a46913e6683aaef1a4e075061 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Mon, 27 Oct 2025 23:07:25 +0200 Subject: [PATCH 20/21] add comment --- arrow-select/src/zip.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/arrow-select/src/zip.rs b/arrow-select/src/zip.rs index c0ff7b3b6fba..aa634bcaece4 100644 --- a/arrow-select/src/zip.rs +++ b/arrow-select/src/zip.rs @@ -284,6 +284,7 @@ impl ScalarZipper { DataType::LargeBinary => { Arc::new(BytesScalarImpl::::new(truthy, falsy)) as Arc }, + // TODO: Handle Utf8View https://github.com/apache/arrow-rs/issues/8724 _ => { Arc::new(FallbackImpl::new(truthy, falsy)) as Arc }, From 53619458d887c2cc4419ec66cfd44cf6839bccf4 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Mon, 27 Oct 2025 23:11:46 +0200 Subject: [PATCH 21/21] merge conflicts --- arrow-select/src/zip.rs | 108 +++++++++++++++++++++++++++++----------- 1 file changed, 79 insertions(+), 29 deletions(-) diff --git a/arrow-select/src/zip.rs b/arrow-select/src/zip.rs index aa634bcaece4..e45b817dc6e8 100644 --- a/arrow-select/src/zip.rs +++ b/arrow-select/src/zip.rs @@ -149,7 +149,6 @@ fn zip_impl( falsy: &ArrayData, falsy_is_scalar: bool, ) -> Result { - let mask_buffer = maybe_prep_null_mask_filter(mask); let mut mutable = MutableArrayData::new(vec![truthy, falsy], false, truthy.len()); // the SlicesIterator slices only the true values. So the gaps left by this iterator we need to @@ -158,6 +157,7 @@ fn zip_impl( // keep track of how much is filled let mut filled = 0; + let mask_buffer = maybe_prep_null_mask_filter(mask); SlicesIterator::from(&mask_buffer).for_each(|(start, end)| { // the gap needs to be filled with falsy values if start > filled { @@ -767,34 +767,6 @@ mod test { assert_eq!(actual, &expected); } - #[test] - fn test_zip_kernel_primitive_scalar_with_boolean_array_mask_with_nulls_should_be_treated_as_false() - { - let scalar_truthy = Scalar::new(Int32Array::from_value(42, 1)); - let scalar_falsy = Scalar::new(Int32Array::from_value(123, 1)); - - let mask = { - let booleans = BooleanBuffer::from(vec![true, true, false, true, false, false]); - let nulls = NullBuffer::from(vec![ - true, true, true, - false, // null treated as false even though in the original mask it was true - true, true, - ]); - BooleanArray::new(booleans, Some(nulls)) - }; - let out = zip(&mask, &scalar_truthy, &scalar_falsy).unwrap(); - let actual = out.as_any().downcast_ref::().unwrap(); - let expected = Int32Array::from(vec![ - Some(42), - Some(42), - Some(123), - Some(123), // true in mask but null - Some(123), - Some(123), - ]); - assert_eq!(actual, &expected); - } - #[test] fn test_zip_kernel_primitive_scalar_none_1() { let scalar_truthy = Scalar::new(Int32Array::from_value(42, 1)); @@ -831,6 +803,84 @@ mod test { assert_eq!(actual, &expected); } + #[test] + fn test_zip_primitive_array_with_nulls_is_mask_should_be_treated_as_false() { + let truthy = Int32Array::from_iter_values(vec![1, 2, 3, 4, 5, 6]); + let falsy = Int32Array::from_iter_values(vec![7, 8, 9, 10, 11, 12]); + + let mask = { + let booleans = BooleanBuffer::from(vec![true, true, false, true, false, false]); + let nulls = NullBuffer::from(vec![ + true, true, true, + false, // null treated as false even though in the original mask it was true + true, true, + ]); + BooleanArray::new(booleans, Some(nulls)) + }; + let out = zip(&mask, &truthy, &falsy).unwrap(); + let actual = out.as_any().downcast_ref::().unwrap(); + let expected = Int32Array::from(vec![ + Some(1), + Some(2), + Some(9), + Some(10), // true in mask but null + Some(11), + Some(12), + ]); + assert_eq!(actual, &expected); + } + + #[test] + fn test_zip_kernel_primitive_scalar_with_boolean_array_mask_with_nulls_should_be_treated_as_false() + { + let scalar_truthy = Scalar::new(Int32Array::from_value(42, 1)); + let scalar_falsy = Scalar::new(Int32Array::from_value(123, 1)); + + let mask = { + let booleans = BooleanBuffer::from(vec![true, true, false, true, false, false]); + let nulls = NullBuffer::from(vec![ + true, true, true, + false, // null treated as false even though in the original mask it was true + true, true, + ]); + BooleanArray::new(booleans, Some(nulls)) + }; + let out = zip(&mask, &scalar_truthy, &scalar_falsy).unwrap(); + let actual = out.as_any().downcast_ref::().unwrap(); + let expected = Int32Array::from(vec![ + Some(42), + Some(42), + Some(123), + Some(123), // true in mask but null + Some(123), + Some(123), + ]); + assert_eq!(actual, &expected); + } + + #[test] + fn test_zip_string_array_with_nulls_is_mask_should_be_treated_as_false() { + let truthy = StringArray::from_iter_values(vec!["1", "2", "3", "4", "5", "6"]); + let falsy = StringArray::from_iter_values(vec!["7", "8", "9", "10", "11", "12"]); + + let mask = { + let booleans = BooleanBuffer::from(vec![true, true, false, true, false, false]); + let nulls = NullBuffer::from(vec![ + true, true, true, + false, // null treated as false even though in the original mask it was true + true, true, + ]); + BooleanArray::new(booleans, Some(nulls)) + }; + let out = zip(&mask, &truthy, &falsy).unwrap(); + let actual = out.as_string::(); + let expected = StringArray::from_iter_values(vec![ + "1", "2", "9", "10", // true in mask but null + "11", "12", + ]); + assert_eq!(actual, &expected); + } + #[test] fn test_zip_kernel_large_string_scalar_with_boolean_array_mask_with_nulls_should_be_treated_as_false() {