diff --git a/docs/source/user-guide/expressions.md b/docs/source/user-guide/expressions.md index 970b94a06e..df25bfce30 100644 --- a/docs/source/user-guide/expressions.md +++ b/docs/source/user-guide/expressions.md @@ -206,6 +206,7 @@ The following Spark expressions are currently available. Any known compatibility | ArrayIntersect | Experimental | | ArrayJoin | Experimental | | ArrayMax | Experimental | +| ArrayMin | | | ArrayRemove | | | ArrayRepeat | Experimental | | ArraysOverlap | Experimental | diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 1e95389c8c..23cf9d313e 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -88,6 +88,7 @@ object QueryPlanSerde extends Logging with CometExprShim { classOf[ArrayIntersect] -> CometArrayIntersect, classOf[ArrayJoin] -> CometArrayJoin, classOf[ArrayMax] -> CometArrayMax, + classOf[ArrayMin] -> CometArrayMin, classOf[ArrayRemove] -> CometArrayRemove, classOf[ArrayRepeat] -> CometArrayRepeat, classOf[ArraysOverlap] -> CometArraysOverlap, diff --git a/spark/src/main/scala/org/apache/comet/serde/arrays.scala b/spark/src/main/scala/org/apache/comet/serde/arrays.scala index aac4a37fda..2a77d5fa14 100644 --- a/spark/src/main/scala/org/apache/comet/serde/arrays.scala +++ b/spark/src/main/scala/org/apache/comet/serde/arrays.scala @@ -21,7 +21,7 @@ package org.apache.comet.serde import scala.annotation.tailrec -import org.apache.spark.sql.catalyst.expressions.{ArrayAppend, ArrayContains, ArrayDistinct, ArrayExcept, ArrayInsert, ArrayIntersect, ArrayJoin, ArrayMax, ArrayRemove, ArrayRepeat, ArraysOverlap, ArrayUnion, Attribute, CreateArray, Expression, Literal} +import org.apache.spark.sql.catalyst.expressions.{ArrayAppend, ArrayContains, ArrayDistinct, ArrayExcept, ArrayInsert, ArrayIntersect, ArrayJoin, ArrayMax, ArrayMin, ArrayRemove, ArrayRepeat, ArraysOverlap, ArrayUnion, Attribute, CreateArray, Expression, Literal} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -189,6 +189,18 @@ object CometArrayMax extends CometExpressionSerde[ArrayMax] { } } +object CometArrayMin extends CometExpressionSerde[ArrayMin] { + override def convert( + expr: ArrayMin, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] = { + val arrayExprProto = exprToProto(expr.children.head, inputs, binding) + + val arrayMinScalarExpr = scalarFunctionExprToProto("array_min", arrayExprProto) + optExprWithInfo(arrayMinScalarExpr, expr) + } +} + object CometArraysOverlap extends CometExpressionSerde[ArraysOverlap] with IncompatExpr { override def convert( expr: ArraysOverlap, diff --git a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala index ccece3dde9..6d8435c863 100644 --- a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala @@ -409,6 +409,29 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp } } + test("array_min") { + Seq(true, false).foreach { dictionaryEnabled => + withTempDir { dir => + val path = new Path(dir.toURI.toString, "test.parquet") + makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled, n = 10000) + spark.read.parquet(path.toString).createOrReplaceTempView("t1"); + checkSparkAnswerAndOperator(spark.sql("SELECT array_min(array(_2, _3, _4)) FROM t1")) + checkSparkAnswerAndOperator( + spark.sql("SELECT array_min((CASE WHEN _2 =_3 THEN array(_4) END)) FROM t1")) + checkSparkAnswerAndOperator( + spark.sql("SELECT array_min((CASE WHEN _2 =_3 THEN array(_2, _4) END)) FROM t1")) + checkSparkAnswerAndOperator( + spark.sql("SELECT array_min(array(CAST(NULL AS INT), CAST(NULL AS INT))) FROM t1")) + checkSparkAnswerAndOperator( + spark.sql("SELECT array_min(array(_2, CAST(NULL AS INT))) FROM t1")) + checkSparkAnswerAndOperator(spark.sql("SELECT array_min(array()) FROM t1")) + checkSparkAnswerAndOperator( + spark.sql( + "SELECT array_min(array(double('-Infinity'), 0.0, double('Infinity'))) FROM t1")) + } + } + } + test("array_intersect") { // TODO test fails if scan is auto // https://github.com/apache/datafusion-comet/issues/2174