From 0bc84df5f0ef6d4d616e43d73b5489c786de0583 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 10 Oct 2025 03:16:55 -0600 Subject: [PATCH 1/2] Pass Spark conf to native --- native/core/src/execution/jni_api.rs | 40 ++++++++++-------- native/core/src/execution/mod.rs | 1 + native/core/src/execution/spark_config.rs | 42 +++++++++++++++++++ .../org/apache/comet/CometExecIterator.scala | 8 +--- .../main/scala/org/apache/comet/Native.scala | 4 -- 5 files changed, 67 insertions(+), 28 deletions(-) create mode 100644 native/core/src/execution/spark_config.rs diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index b76108ad96..b17cfa1d9b 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -78,6 +78,10 @@ use crate::execution::spark_plan::SparkPlan; use crate::execution::tracing::{log_memory_usage, trace_begin, trace_end, with_trace}; +use crate::execution::spark_config::{ + SparkConfig, COMET_DEBUG_ENABLED, COMET_EXPLAIN_NATIVE_ENABLED, COMET_MAX_TEMP_DIRECTORY_SIZE, + COMET_TRACING_ENABLED, +}; use crate::parquet::encryption_support::{CometEncryptionFactory, ENCRYPTION_FACTORY_ID}; use datafusion_comet_proto::spark_operator::operator::OpStruct; use log::info; @@ -168,14 +172,23 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan( memory_limit: jlong, memory_limit_per_task: jlong, task_attempt_id: jlong, - debug_native: jboolean, - explain_native: jboolean, - tracing_enabled: jboolean, - max_temp_directory_size: jlong, key_unwrapper_obj: JObject, ) -> jlong { try_unwrap_or_throw(&e, |mut env| { - with_trace("createPlan", tracing_enabled != JNI_FALSE, || { + // Deserialize Spark configs + let array = unsafe { JPrimitiveArray::from_raw(serialized_spark_configs) }; + let bytes = env.convert_byte_array(array)?; + let spark_configs = serde::deserialize_config(bytes.as_slice())?; + let spark_config: HashMap = spark_configs.entries.into_iter().collect(); + + // Access Comet configs + let debug_native = spark_config.get_bool(COMET_DEBUG_ENABLED); + let explain_native = spark_config.get_bool(COMET_EXPLAIN_NATIVE_ENABLED); + let tracing_enabled = spark_config.get_bool(COMET_TRACING_ENABLED); + let max_temp_directory_size = + spark_config.get_u64(COMET_MAX_TEMP_DIRECTORY_SIZE, 100 * 1024 * 1024 * 1024); + + with_trace("createPlan", tracing_enabled, || { // Init JVM classes JVMClasses::init(&mut env); @@ -186,15 +199,6 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan( let bytes = env.convert_byte_array(array)?; let spark_plan = serde::deserialize_op(bytes.as_slice())?; - // Deserialize Spark configs - let array = unsafe { JPrimitiveArray::from_raw(serialized_spark_configs) }; - let bytes = env.convert_byte_array(array)?; - let spark_configs = serde::deserialize_config(bytes.as_slice())?; - - // Convert Spark configs to HashMap - let _spark_config_map: HashMap = - spark_configs.entries.into_iter().collect(); - let metrics = Arc::new(jni_new_global_ref!(env, metrics_node)?); // Get the global references of input sources @@ -238,7 +242,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan( batch_size as usize, memory_pool, local_dirs, - max_temp_directory_size as u64, + max_temp_directory_size, )?; let plan_creation_time = start.elapsed(); @@ -274,10 +278,10 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan( metrics_last_update_time: Instant::now(), plan_creation_time, session_ctx: Arc::new(session), - debug_native: debug_native == 1, - explain_native: explain_native == 1, + debug_native, + explain_native, memory_pool_config, - tracing_enabled: tracing_enabled != JNI_FALSE, + tracing_enabled, }); Ok(Box::into_raw(exec_context) as i64) diff --git a/native/core/src/execution/mod.rs b/native/core/src/execution/mod.rs index c55b96f2a9..b8a3d546b3 100644 --- a/native/core/src/execution/mod.rs +++ b/native/core/src/execution/mod.rs @@ -27,6 +27,7 @@ pub(crate) mod sort; pub(crate) mod spark_plan; pub use datafusion_comet_spark_expr::timezone; mod memory_pools; +pub(crate) mod spark_config; pub(crate) mod tracing; pub(crate) mod utils; diff --git a/native/core/src/execution/spark_config.rs b/native/core/src/execution/spark_config.rs new file mode 100644 index 0000000000..60ebb2ff8b --- /dev/null +++ b/native/core/src/execution/spark_config.rs @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; + +pub(crate) const COMET_TRACING_ENABLED: &str = "spark.comet.tracing.enabled"; +pub(crate) const COMET_DEBUG_ENABLED: &str = "spark.comet.debug.enabled"; +pub(crate) const COMET_EXPLAIN_NATIVE_ENABLED: &str = "spark.comet.explain.native.enabled"; +pub(crate) const COMET_MAX_TEMP_DIRECTORY_SIZE: &str = "spark.comet.maxTempDirectorySize"; + +pub(crate) trait SparkConfig { + fn get_bool(&self, name: &str) -> bool; + fn get_u64(&self, name: &str, default_value: u64) -> u64; +} + +impl SparkConfig for HashMap { + fn get_bool(&self, name: &str) -> bool { + self.get(name) + .and_then(|str_val| str_val.parse::().ok()) + .unwrap_or(false) + } + + fn get_u64(&self, name: &str, default_value: u64) -> u64 { + self.get(name) + .and_then(|str_val| str_val.parse::().ok()) + .unwrap_or(default_value) + } +} diff --git a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala index 8603a7b9a8..43e2001382 100644 --- a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala +++ b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala @@ -96,9 +96,9 @@ class CometExecIterator( CometSparkSessionExtensions.getCometMemoryOverhead(conf) } - // serialize Spark conf in protobuf format + // serialize Comet related Spark configs in protobuf format val builder = ConfigMap.newBuilder() - conf.getAll.foreach { case (k, v) => + conf.getAll.filter(_._1.startsWith(CometConf.COMET_PREFIX)).foreach { case (k, v) => builder.putEntries(k, v) } val protobufSparkConfigs = builder.build().toByteArray @@ -140,10 +140,6 @@ class CometExecIterator( memoryLimit, memoryLimitPerTask, taskAttemptId, - debug = COMET_DEBUG_ENABLED.get(), - explain = COMET_EXPLAIN_NATIVE_ENABLED.get(), - tracingEnabled, - maxTempDirectorySize = CometConf.COMET_MAX_TEMP_DIRECTORY_SIZE.get(), keyUnwrapper) } diff --git a/spark/src/main/scala/org/apache/comet/Native.scala b/spark/src/main/scala/org/apache/comet/Native.scala index fb24dce0d3..6ef92d0a67 100644 --- a/spark/src/main/scala/org/apache/comet/Native.scala +++ b/spark/src/main/scala/org/apache/comet/Native.scala @@ -68,10 +68,6 @@ class Native extends NativeBase { memoryLimit: Long, memoryLimitPerTask: Long, taskAttemptId: Long, - debug: Boolean, - explain: Boolean, - tracingEnabled: Boolean, - maxTempDirectorySize: Long, keyUnwrapper: CometFileKeyUnwrapper): Long // scalastyle:on From 82e8691b24ef38469857ad905b8d99c820a37718 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 10 Oct 2025 03:24:47 -0600 Subject: [PATCH 2/2] fix --- common/src/main/scala/org/apache/comet/CometConf.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 5931eb25bf..05c0750571 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -63,9 +63,11 @@ object CometConf extends ShimCometConf { def conf(key: String): ConfigBuilder = ConfigBuilder(key) - val COMET_EXEC_CONFIG_PREFIX = "spark.comet.exec"; + val COMET_PREFIX = "spark.comet"; - val COMET_EXPR_CONFIG_PREFIX = "spark.comet.expression"; + val COMET_EXEC_CONFIG_PREFIX: String = s"$COMET_PREFIX.exec"; + + val COMET_EXPR_CONFIG_PREFIX: String = s"$COMET_PREFIX.expression"; val COMET_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.enabled") .doc(