diff --git a/externals/kyuubi-spark-sql-engine/src/main/resources/META-INF/services/org.apache.kyuubi.engine.spark.udf.KDFRegistry b/externals/kyuubi-spark-sql-engine/src/main/resources/META-INF/services/org.apache.kyuubi.engine.spark.udf.KDFRegistry new file mode 100644 index 00000000000..34c9e0c9690 --- /dev/null +++ b/externals/kyuubi-spark-sql-engine/src/main/resources/META-INF/services/org.apache.kyuubi.engine.spark.udf.KDFRegistry @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.kyuubi.engine.spark.udf.KyuubiDF diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala index 85c918eb66d..ef9fbf0d896 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala @@ -17,8 +17,11 @@ package org.apache.kyuubi.engine.spark.session +import java.util.ServiceLoader import java.util.concurrent.atomic.AtomicLong +import scala.collection.convert.ImplicitConversions.`iterable AsScalaIterable` + import org.apache.commons.lang3.StringUtils import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.ui.SparkUIUtils.formatDuration @@ -91,7 +94,8 @@ class SparkSessionImpl( otherConf.foreach { case (key, value) => setModifiableConfig(key, value) } - KDFRegistry.registerAll(spark) + ServiceLoader.load(classOf[KDFRegistry]) + .foreach(_.registerAll(spark)) EventBus.post(sessionEvent) super.open() } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/udf/KDFRegistry.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/udf/KDFRegistry.scala index a2d50d1515b..ea4812d9233 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/udf/KDFRegistry.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/udf/KDFRegistry.scala @@ -17,86 +17,8 @@ package org.apache.kyuubi.engine.spark.udf -import scala.collection.mutable.ArrayBuffer - -import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.expressions.UserDefinedFunction -import org.apache.spark.sql.functions.udf - -import org.apache.kyuubi.{KYUUBI_VERSION, Utils} -import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_URL, KYUUBI_SESSION_USER_KEY} - -object KDFRegistry { - - @transient - val registeredFunctions = new ArrayBuffer[KyuubiDefinedFunction]() - - lazy val appName = SparkEnv.get.conf.get("spark.app.name") - lazy val appId = SparkEnv.get.conf.get("spark.app.id") - - val kyuubi_version: KyuubiDefinedFunction = create( - "kyuubi_version", - udf(() => KYUUBI_VERSION).asNonNullable(), - "Return the version of Kyuubi Server", - "string", - "1.3.0") - - val engine_name: KyuubiDefinedFunction = create( - "engine_name", - udf(() => appName).asNonNullable(), - "Return the spark application name for the associated query engine", - "string", - "1.3.0") - - val engine_id: KyuubiDefinedFunction = create( - "engine_id", - udf(() => appId).asNonNullable(), - "Return the spark application id for the associated query engine", - "string", - "1.4.0") - - val system_user: KyuubiDefinedFunction = create( - "system_user", - udf(() => Utils.currentUser).asNonNullable(), - "Return the system user name for the associated query engine", - "string", - "1.3.0") - - val session_user: KyuubiDefinedFunction = create( - "session_user", - udf { () => - Option(TaskContext.get()).map(_.getLocalProperty(KYUUBI_SESSION_USER_KEY)) - .getOrElse(throw new RuntimeException("Unable to get session_user")) - }, - "Return the session username for the associated query engine", - "string", - "1.4.0") - - val engine_url: KyuubiDefinedFunction = create( - "engine_url", - udf { () => - Option(TaskContext.get()).map(_.getLocalProperty(KYUUBI_ENGINE_URL)) - .getOrElse(throw new RuntimeException("Unable to get engine url")) - }, - "Return the engine url for the associated query engine", - "string", - "1.8.0") - - def create( - name: String, - udf: UserDefinedFunction, - description: String, - returnType: String, - since: String): KyuubiDefinedFunction = { - val kdf = KyuubiDefinedFunction(name, udf, description, returnType, since) - registeredFunctions += kdf - kdf - } - def registerAll(spark: SparkSession): Unit = { - for (func <- registeredFunctions) { - spark.udf.register(func.name, func.udf) - } - } +trait KDFRegistry { + def registerAll(spark: SparkSession): Unit } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/udf/KyuubiDF b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/udf/KyuubiDF new file mode 100644 index 00000000000..f12a2174702 --- /dev/null +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/udf/KyuubiDF @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine.spark.udf + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.expressions.UserDefinedFunction +import org.apache.spark.sql.functions.udf + +import org.apache.kyuubi.{KYUUBI_VERSION, Utils} +import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_URL, KYUUBI_SESSION_USER_KEY} + +class KyuubiDF extends KDFRegistry with Serializable { + + @transient + val registeredFunctions = new ArrayBuffer[KyuubiDefinedFunction]() + + lazy val appName = SparkEnv.get.conf.get("spark.app.name") + lazy val appId = SparkEnv.get.conf.get("spark.app.id") + + val kyuubi_version: KyuubiDefinedFunction = create( + "kyuubi_version", + udf(() => KYUUBI_VERSION).asNonNullable(), + "Return the version of Kyuubi Server", + "string", + "1.3.0") + + val engine_name: KyuubiDefinedFunction = create( + "engine_name", + udf(() => appName).asNonNullable(), + "Return the spark application name for the associated query engine", + "string", + "1.3.0") + + val engine_id: KyuubiDefinedFunction = create( + "engine_id", + udf(() => appId).asNonNullable(), + "Return the spark application id for the associated query engine", + "string", + "1.4.0") + + val system_user: KyuubiDefinedFunction = create( + "system_user", + udf(() => Utils.currentUser).asNonNullable(), + "Return the system user name for the associated query engine", + "string", + "1.3.0") + + val session_user: KyuubiDefinedFunction = create( + "session_user", + udf { () => + Option(TaskContext.get()).map(_.getLocalProperty(KYUUBI_SESSION_USER_KEY)) + .getOrElse(throw new RuntimeException("Unable to get session_user")) + }, + "Return the session username for the associated query engine", + "string", + "1.4.0") + + val engine_url: KyuubiDefinedFunction = create( + "engine_url", + udf { () => + Option(TaskContext.get()).map(_.getLocalProperty(KYUUBI_ENGINE_URL)) + .getOrElse(throw new RuntimeException("Unable to get engine url")) + }, + "Return the engine url for the associated query engine", + "string", + "1.8.0") + + def create( + name: String, + udf: UserDefinedFunction, + description: String, + returnType: String, + since: String): KyuubiDefinedFunction = { + val kdf = KyuubiDefinedFunction(name, udf, description, returnType, since) + registeredFunctions += kdf + kdf + } + + def registerAll(spark: SparkSession): Unit = { + for (func <- registeredFunctions) { + spark.udf.register(func.name, func.udf) + } + } +} diff --git a/externals/kyuubi-spark-sql-engine/src/test/resources/META-INF/services/org.apache.kyuubi.engine.spark.udf.KDFRegistry b/externals/kyuubi-spark-sql-engine/src/test/resources/META-INF/services/org.apache.kyuubi.engine.spark.udf.KDFRegistry new file mode 100644 index 00000000000..b1d20ed7457 --- /dev/null +++ b/externals/kyuubi-spark-sql-engine/src/test/resources/META-INF/services/org.apache.kyuubi.engine.spark.udf.KDFRegistry @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.kyuubi.engine.spark.udf.SpiUDF diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/udf/KyuubiDefinedFunctionSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/udf/KyuubiDefinedFunctionSuite.scala index ec93db85682..22ab07ffe36 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/udf/KyuubiDefinedFunctionSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/udf/KyuubiDefinedFunctionSuite.scala @@ -56,7 +56,7 @@ class KyuubiDefinedFunctionSuite extends KyuubiFunSuite { | Name | Description | Return Type | Since | --- | --- | --- | --- |""" - KDFRegistry.registeredFunctions.foreach { func => + new KyuubiDF().registeredFunctions.foreach { func => builder += s"${func.name} | ${func.description} | ${func.returnType} | ${func.since}" } diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/udf/SpiUDFTests b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/udf/SpiUDFTests new file mode 100644 index 00000000000..c8227efb74d --- /dev/null +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/udf/SpiUDFTests @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine.spark.udf + +import java.util.ServiceLoader + +import scala.collection.convert.ImplicitConversions.`iterable AsScalaIterable` + +import org.apache.spark.sql.SparkSession + +import org.apache.kyuubi.engine.spark.WithSparkSQLEngine + + +class SpiUDFTests extends WithSparkSQLEngine { + override def withKyuubiConf: Map[String, String] = Map.empty + + test("spi udf register") { + ServiceLoader.load(classOf[KDFRegistry]) + .foreach(_.registerAll(spark)) + + assert(spark.sql("select spi_test()").first().getString(0) == "success") + } +} + +class SpiUDF extends KDFRegistry{ + override def registerAll(spark: SparkSession): Unit = { + spark.udf.register("spi_test", () => "success") + } +}