diff --git a/datafusion/spark/src/function/math/ceil.rs b/datafusion/spark/src/function/math/ceil.rs new file mode 100644 index 000000000000..adbdb14d0e47 --- /dev/null +++ b/datafusion/spark/src/function/math/ceil.rs @@ -0,0 +1,461 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{Array, ArrayRef, AsArray, Float64Array}; +use arrow::datatypes::DataType::{ + Decimal128, Float32, Float64, Int16, Int32, Int64, Int8, UInt16, UInt32, UInt64, + UInt8, +}; +use arrow::datatypes::{ArrowNativeTypeOp, DataType, DECIMAL128_MAX_PRECISION}; +use datafusion_common::{exec_err, Result}; +use datafusion_expr::{ + ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, + Volatility, +}; +use datafusion_functions::utils::make_scalar_function; +use std::any::Any; +use std::sync::Arc; + +/// Spark-compatible CEIL function implementation. +/// Returns the smallest integer that is greater than or equal to the input value. +/// Optionally takes a scale parameter to control decimal precision. +/// Reference: +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct SparkCeil { + signature: Signature, +} + +impl Default for SparkCeil { + fn default() -> Self { + Self::new() + } +} + +impl SparkCeil { + pub fn new() -> Self { + Self { + signature: Signature::one_of( + vec![ + // Single argument: ceil(expr) for basic numeric types + TypeSignature::Uniform( + 1, + vec![Float32, Float64, Int64, Decimal128(38, 10)], + ), + // Two arguments: ceil(expr, scale) where scale can be any integer type + // Float32 with various integer scale types + TypeSignature::Exact(vec![Float32, Int8]), + TypeSignature::Exact(vec![Float32, Int16]), + TypeSignature::Exact(vec![Float32, Int32]), + TypeSignature::Exact(vec![Float32, Int64]), + // Float64 with various integer scale types + TypeSignature::Exact(vec![Float64, Int8]), + TypeSignature::Exact(vec![Float64, Int16]), + TypeSignature::Exact(vec![Float64, Int32]), + TypeSignature::Exact(vec![Float64, Int64]), + // Int64 with various integer scale types (scale has no effect on integers) + TypeSignature::Exact(vec![Int64, Int8]), + TypeSignature::Exact(vec![Int64, Int16]), + TypeSignature::Exact(vec![Int64, Int32]), + TypeSignature::Exact(vec![Int64, Int64]), + // Decimal128 with various integer scale types + TypeSignature::Exact(vec![Decimal128(38, 10), Int8]), + TypeSignature::Exact(vec![Decimal128(38, 10), Int16]), + TypeSignature::Exact(vec![Decimal128(38, 10), Int32]), + TypeSignature::Exact(vec![Decimal128(38, 10), Int64]), + ], + Volatility::Immutable, + ), + } + } +} + +impl ScalarUDFImpl for SparkCeil { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "ceil" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + /// Determines the return type based on input argument types. + /// For single argument (no scale): floats return Int64, integers stay Int64, decimals adjust precision/scale. + /// For two arguments (with scale): floats keep their type, decimals become Float64. + fn return_type(&self, arg_types: &[DataType]) -> Result { + if arg_types.is_empty() { + return exec_err!("ceil expects at least 1 argument"); + } + + let value_type = &arg_types[0]; + let has_scale = arg_types.len() == 2; + + match (value_type, has_scale) { + (Float32, false) => Ok(Int64), + (Float32, true) => Ok(Float32), + (Float64, false) => Ok(Int64), + (Float64, true) => Ok(Float64), + (Int64, _) => Ok(Int64), + (Decimal128(precision, scale), false) => { + // For decimals without scale, compute new precision/scale for integer result + let (new_precision, new_scale) = + round_decimal_base(*precision as i32, *scale as i32, 0); + Ok(Decimal128(new_precision, new_scale)) + } + (Decimal128(_precision, _scale), true) => Ok(Float64), // With scale, convert to float + _ => Ok(Int64), // Fallback for unsupported types + } + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + make_scalar_function(spark_ceil, vec![])(&args.args) + } +} + +/// Calculates the new precision and scale for decimal operations. +/// Used to determine the appropriate decimal representation after ceiling operations. +/// Ensures the result fits within Decimal128 constraints. +fn round_decimal_base(precision: i32, _scale: i32, target_scale: i32) -> (u8, i8) { + // Clamp target scale to valid range and ensure non-negative + let scale = if target_scale < -38 { + 0 + } else { + target_scale.max(0) as i8 + }; + // Calculate new precision based on target scale, ensuring it doesn't exceed max + let new_precision = precision + .max(target_scale + 1) + .min(DECIMAL128_MAX_PRECISION as i32) as u8; + (new_precision, scale) +} + +/// Core implementation of the Spark CEIL function. +/// Handles ceiling operations for different data types with optional scale parameter. +/// Supports Float32, Float64, Int64, and Decimal128 types. +fn spark_ceil(args: &[ArrayRef]) -> Result { + // Validate argument count + if args.is_empty() || args.len() > 2 { + return exec_err!("ceil expects 1 or 2 arguments, got {}", args.len()); + } + + let value_array: &dyn Array = args[0].as_ref(); + + // Extract scale parameter if provided (second argument) + let scale = if args.len() == 2 { + let scale_array = args[1].as_ref(); + // Scale must be a single scalar value, not an array + if scale_array.is_empty() || scale_array.len() != 1 { + return exec_err!( + "scale parameter must be a single integer value, got array of length {}", + scale_array.len() + ); + } + // Extract the scale value from the array, supporting various integer types + let s = match scale_array.data_type() { + Int8 => scale_array + .as_primitive::() + .value(0) as i32, // Cast to i32 for uniform handling + Int16 => scale_array + .as_primitive::() + .value(0) as i32, + Int32 => scale_array + .as_primitive::() + .value(0), // Already i32 + Int64 => scale_array + .as_primitive::() + .value(0) as i32, + UInt8 => scale_array + .as_primitive::() + .value(0) as i32, + UInt16 => scale_array + .as_primitive::() + .value(0) as i32, + UInt32 => scale_array + .as_primitive::() + .value(0) as i32, + UInt64 => scale_array + .as_primitive::() + .value(0) as i32, + other => { + return exec_err!("scale parameter must be an integer, got {:?}", other) + } + }; + Some(s) + } else { + None // No scale provided + }; + + // Perform ceiling operation based on data type and scale parameter + match (args[0].data_type(), scale) { + // Float32 without scale: ceil to nearest integer, return as Int64 + (Float32, None) => { + let array = value_array + .as_primitive::() + .unary::<_, arrow::datatypes::Int64Type>(|value: f32| { + value.ceil() as i64 + }); + Ok(Arc::new(array)) + } + // Float32 with scale: ceil to specified decimal places, return as Float32 + (Float32, Some(s)) => { + let scale_factor = 10_f32.powi(s); // 10^scale for decimal place adjustment + let array = value_array + .as_primitive::() + .unary::<_, arrow::datatypes::Float32Type>(|value: f32| { + (value * scale_factor).ceil() / scale_factor // Scale, ceil, then unscale + }); + Ok(Arc::new(array)) + } + // Float64 without scale: ceil to nearest integer, return as Int64 + (Float64, None) => { + let array = value_array + .as_primitive::() + .unary::<_, arrow::datatypes::Int64Type>(|value: f64| { + value.ceil() as i64 + }); + Ok(Arc::new(array)) + } + // Float64 with scale: ceil to specified decimal places, return as Float64 + (Float64, Some(s)) => { + let scale_factor = 10_f64.powi(s); // 10^scale for decimal place adjustment + let array = value_array + .as_primitive::() + .unary::<_, arrow::datatypes::Float64Type>(|value: f64| { + (value * scale_factor).ceil() / scale_factor // Scale, ceil, then unscale + }); + Ok(Arc::new(array)) + } + // Int64: integers are already "ceiled", return unchanged regardless of scale + (Int64, None) => Ok(Arc::clone(&args[0])), + (Int64, Some(_)) => Ok(Arc::clone(&args[0])), + // Decimal128: handle decimals with scale > 0 (fractional part exists) + (Decimal128(precision, value_scale), scale_param) => { + if *value_scale > 0 { + match scale_param { + // Without scale parameter: ceil to integer + None => { + let decimal_array = value_array + .as_primitive::(); + // Calculate divisor to separate integer and fractional parts + let div = 10_i128.pow_wrapping((*value_scale) as u32); + // Ceil by dividing, applying ceil to quotient, then converting to i64 + let result_array = decimal_array + .unary::<_, arrow::datatypes::Int64Type>(|value: i128| { + div_ceil(value, div) as i64 + }); + Ok(Arc::new(result_array)) + } + // With scale parameter: ceil to specified decimal places + Some(s) => { + // Validate that target scale doesn't exceed input scale + if s > *value_scale as i32 { + return exec_err!( + "scale {} cannot be greater than input scale {}", + s, + *value_scale + ); + } + // Calculate new precision and scale for the result + let (new_precision, new_scale) = + round_decimal_base(*precision as i32, *value_scale as i32, s); + let decimal_array = value_array + .as_primitive::(); + + // Handle positive scale (decimal places) + if s >= 0 { + let s_i8 = s as i8; + if s_i8 > *value_scale { + return exec_err!( + "output scale {} cannot exceed input scale {}", + s_i8, + *value_scale + ); + } + let factor = + 10_i128.pow_wrapping((*value_scale - s_i8) as u32); + let result_array = decimal_array + .unary::<_, arrow::datatypes::Decimal128Type>( + |value: i128| div_ceil(value, factor), + ); + let decimal_result = result_array + .with_precision_and_scale(new_precision, new_scale)?; + let scale_factor = 10_f64.powi(new_scale as i32); + let float_values: Vec> = decimal_result + .iter() + .map(|v| v.map(|x| (x as f64) / scale_factor)) + .collect(); + Ok(Arc::new(Float64Array::from(float_values))) + } else { + let s_i8 = s as i8; + let factor = + 10_i128.pow_wrapping((*value_scale - s_i8) as u32); + let result_array = decimal_array + .unary::<_, arrow::datatypes::Decimal128Type>( + |value: i128| div_ceil(value, factor), + ); + let decimal_result = result_array + .with_precision_and_scale(new_precision, 0)?; + let float_values: Vec> = decimal_result + .iter() + .map(|v| v.map(|x| x as f64)) + .collect(); + Ok(Arc::new(Float64Array::from(float_values))) + } + } + } + } else { + Ok(Arc::clone(&args[0])) + } + } + _ => exec_err!( + "ceil expects a numeric argument, got {}", + args[0].data_type() + ), + } +} + +#[inline] +fn div_ceil(a: i128, b: i128) -> i128 { + if b == 0 { + panic!("division by zero"); + } + let div = a / b; + let rem = a % b; + if rem != 0 && ((b > 0 && a > 0) || (b < 0 && a < 0)) { + div + 1 + } else { + div + } +} + +#[cfg(test)] +mod test { + use super::*; + use arrow::array::{Decimal128Array, Float32Array, Float64Array, Int64Array}; + use datafusion_common::Result; + use std::sync::Arc; + + #[test] + fn test_ceil_f32_array() -> Result<()> { + let input = vec![Some(125.2345_f32), Some(-1.1_f32), None]; + let array = Arc::new(Float32Array::from(input)) as ArrayRef; + let result = spark_ceil(&[array])?; + let result_array = result.as_any().downcast_ref::().unwrap(); + assert_eq!(result_array.value(0), 126); + assert_eq!(result_array.value(1), -1); + Ok(()) + } + + #[test] + fn test_ceil_f64_array() -> Result<()> { + let input = vec![Some(3.3281_f64), Some(-2.1_f64), None]; + let array = Arc::new(Float64Array::from(input)) as ArrayRef; + let result = spark_ceil(&[array])?; + let result_array = result.as_any().downcast_ref::().unwrap(); + assert_eq!(result_array.value(0), 4); + assert_eq!(result_array.value(1), -2); + Ok(()) + } + + #[test] + fn test_ceil_i64_array() -> Result<()> { + let input = vec![Some(42_i64), Some(-15_i64), None]; + let array = Arc::new(Int64Array::from(input)) as ArrayRef; + let result = spark_ceil(&[array])?; + let result_array = result.as_any().downcast_ref::().unwrap(); + assert_eq!(result_array.value(0), 42); + assert_eq!(result_array.value(1), -15); + Ok(()) + } + + #[test] + fn test_ceil_decimal_array() -> Result<()> { + let input = vec![Some(115_i128), Some(-267_i128), None]; + let array = + Arc::new(Decimal128Array::from(input).with_precision_and_scale(10, 2)?) + as ArrayRef; + let result = spark_ceil(&[array])?; + let result_array = result.as_any().downcast_ref::().unwrap(); + assert_eq!(result_array.value(0), 2); + assert_eq!(result_array.value(1), -2); + Ok(()) + } + + #[test] + fn test_ceil_with_scale() -> Result<()> { + let input = vec![Some(3.24792_f64), Some(2.71324_f64)]; + let value_array = Arc::new(Float64Array::from(input)) as ArrayRef; + let scale_array = Arc::new(Int64Array::from(vec![Some(2_i64)])) as ArrayRef; + let result = spark_ceil(&[value_array, scale_array])?; + let result_array = result.as_any().downcast_ref::().unwrap(); + assert_eq!(result_array.value(0), 3.25); + assert_eq!(result_array.value(1), 2.72); + Ok(()) + } + + #[test] + fn test_ceil_float32_with_scale() -> Result<()> { + let input = vec![Some(1.234_f32), Some(-2.567_f32)]; + let value_array = Arc::new(Float32Array::from(input)) as ArrayRef; + let scale_array = Arc::new(Int64Array::from(vec![Some(1_i64)])) as ArrayRef; + let result = spark_ceil(&[value_array, scale_array])?; + let result_array = result.as_any().downcast_ref::().unwrap(); + assert_eq!(result_array.value(0), 1.3); + assert_eq!(result_array.value(1), -2.5); + Ok(()) + } + + #[test] + fn test_ceil_float64_with_scale_3() -> Result<()> { + let input = vec![Some(4.1418_f64)]; + let value_array = Arc::new(Float64Array::from(input)) as ArrayRef; + let scale_array = Arc::new(Int64Array::from(vec![Some(3_i64)])) as ArrayRef; + let result = spark_ceil(&[value_array, scale_array])?; + let result_array = result.as_any().downcast_ref::().unwrap(); + assert_eq!(result_array.value(0), 4.142); + Ok(()) + } + + #[test] + fn test_ceil_with_negative_scale() -> Result<()> { + let input = vec![Some(-12.345_f64)]; + let value_array = Arc::new(Float64Array::from(input)) as ArrayRef; + let scale_array = Arc::new(Int64Array::from(vec![Some(1_i64)])) as ArrayRef; + let result = spark_ceil(&[value_array, scale_array])?; + let result_array = result.as_any().downcast_ref::().unwrap(); + assert_eq!(result_array.value(0), -12.3); + Ok(()) + } + + #[test] + // this scenario passes as UT but fails in slt. + fn test_ceil_decimal_with_scale() -> Result<()> { + let input = vec![Some(31411_i128), Some(-12345_i128)]; + let value_array = + Arc::new(Decimal128Array::from(input).with_precision_and_scale(5, 4)?) + as ArrayRef; + let scale_array = Arc::new(Int64Array::from(vec![Some(3_i64)])) as ArrayRef; + let result = spark_ceil(&[value_array, scale_array])?; + let result_array = result.as_any().downcast_ref::().unwrap(); + assert_eq!(result_array.value(0), 3.142); // ceil(3.1411, 3) = 3.142 + assert_eq!(result_array.value(1), -1.234); // ceil(-1.2345, 3) = -1.234 + Ok(()) + } +} diff --git a/datafusion/spark/src/function/math/mod.rs b/datafusion/spark/src/function/math/mod.rs index 092335e4aa18..ee9a1621fe59 100644 --- a/datafusion/spark/src/function/math/mod.rs +++ b/datafusion/spark/src/function/math/mod.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +pub mod ceil; pub mod expm1; pub mod factorial; pub mod hex; @@ -33,6 +34,7 @@ make_udf_function!(modulus::SparkMod, modulus); make_udf_function!(modulus::SparkPmod, pmod); make_udf_function!(rint::SparkRint, rint); make_udf_function!(width_bucket::SparkWidthBucket, width_bucket); +make_udf_function!(ceil::SparkCeil, ceil); pub mod expr_fn { use datafusion_functions::export_functions; @@ -48,6 +50,11 @@ pub mod expr_fn { export_functions!((pmod, "Returns the positive remainder of division of the first argument by the second argument.", arg1 arg2)); export_functions!((rint, "Returns the double value that is closest in value to the argument and is equal to a mathematical integer.", arg1)); export_functions!((width_bucket, "Returns the bucket number into which the value of this expression would fall after being evaluated.", arg1 arg2 arg3 arg4)); + export_functions!(( + ceil, + "Returns the smallest whole number that is greater than the input value.", + arg1 + )); } pub fn functions() -> Vec> { @@ -59,5 +66,6 @@ pub fn functions() -> Vec> { pmod(), rint(), width_bucket(), + ceil(), ] } diff --git a/datafusion/sqllogictest/test_files/spark/math/ceil.slt b/datafusion/sqllogictest/test_files/spark/math/ceil.slt index c87a29b61fd4..6bf1078cdb90 100644 --- a/datafusion/sqllogictest/test_files/spark/math/ceil.slt +++ b/datafusion/sqllogictest/test_files/spark/math/ceil.slt @@ -15,28 +15,151 @@ # specific language governing permissions and limitations # under the License. -# This file was originally created by a porting script from: -# https://github.com/lakehq/sail/tree/43b6ed8221de5c4c4adbedbb267ae1351158b43c/crates/sail-spark-connect/tests/gold_data/function -# This file is part of the implementation of the datafusion-spark function library. -# For more information, please see: -# https://github.com/apache/datafusion/issues/15914 - -## Original Query: SELECT ceil(-0.1); -## PySpark 3.5.5 Result: {'CEIL(-0.1)': Decimal('0'), 'typeof(CEIL(-0.1))': 'decimal(1,0)', 'typeof(-0.1)': 'decimal(1,1)'} -#query -#SELECT ceil(-0.1::decimal(1,1)); - -## Original Query: SELECT ceil(3.1411, -3); -## PySpark 3.5.5 Result: {'ceil(3.1411, -3)': Decimal('1000'), 'typeof(ceil(3.1411, -3))': 'decimal(4,0)', 'typeof(3.1411)': 'decimal(5,4)', 'typeof(-3)': 'int'} -#query -#SELECT ceil(3.1411::decimal(5,4), -3::int); - -## Original Query: SELECT ceil(3.1411, 3); -## PySpark 3.5.5 Result: {'ceil(3.1411, 3)': Decimal('3.142'), 'typeof(ceil(3.1411, 3))': 'decimal(5,3)', 'typeof(3.1411)': 'decimal(5,4)', 'typeof(3)': 'int'} -#query -#SELECT ceil(3.1411::decimal(5,4), 3::int); - -## Original Query: SELECT ceil(5); -## PySpark 3.5.5 Result: {'CEIL(5)': 5, 'typeof(CEIL(5))': 'bigint', 'typeof(5)': 'int'} -#query -#SELECT ceil(5::int); +# Test cases for the ceil function + +# Test basic positive float values +query I +SELECT ceil(8.11::float); +---- +9 + +query I +SELECT ceil(2.116::float); +---- +3 + +# Test basic positive double values +query I +SELECT ceil(2.1); +---- +3 + +query I +SELECT ceil(5.953); +---- +6 + +# Test negative float values +query I +SELECT ceil(-3.14::float); +---- +-3 + +query I +SELECT ceil(-5.9::float); +---- +-5 + +# Test negative double values +query I +SELECT ceil(-3.14159); +---- +-3 + +query I +SELECT ceil(-5.999); +---- +-5 + +# Test integer values +query I +SELECT ceil(52); +---- +52 + +query I +SELECT ceil(-39); +---- +-39 + +# Test zero values +query I +SELECT ceil(0.0); +---- +0 + +query I +SELECT ceil(-3.0::float); +---- +-3 + +# Test very small decimal values +query I +SELECT ceil(-0.0001); +---- +0 + +# Test large numbers +query I +SELECT ceil(123456.789); +---- +123457 + +# Test with NULL values +query I +SELECT ceil(NULL); +---- +NULL + +# Test array inputs +query I +SELECT ceil(column1::float) FROM (VALUES (1.5), (2.7), (-1.3), (NULL)) AS t(column1); +---- +2 +3 +-1 +NULL + +# Test edge cases +query I +SELECT ceil(0.999999999999); +---- +1 + +# Test boundary values +query I +SELECT ceil(922337203685477.0); +---- +922337203685477 + +# Test by casting to a datatype +query I +SELECT ceil(CAST(5 as DOUBLE)); +---- +5 + +query I +SELECT ceil(CAST(528.3683 as DECIMAL(6, 3))); +---- +529 + +# Test with two arguments +query R +SELECT ceil(5.4, -1); +---- +10 + +query R +SELECT ceil(CAST(5 as INT), -11); +---- +100000000000 + +query I +SELECT ceil(-12.345, 1); +---- +-12.3 + +query R +SELECT ceil(CAST(5 as SMALLINT), -6); +---- +1000000 + +query I +SELECT ceil(3.1411, 3); +---- +3.142 + +query R +SELECT ceil(3345.1, -2); +---- +3400