Skip to content
Open
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
7252605
Access Spark configs from native code
andygrove Aug 22, 2025
d084cfa
code cleanup
andygrove Aug 22, 2025
4837935
revert
andygrove Aug 22, 2025
ad9c9b8
debug
andygrove Oct 3, 2025
f3bb412
use df release
andygrove Oct 3, 2025
13f14d3
cargo update
andygrove Oct 3, 2025
78f5b4f
[skip ci]
andygrove Oct 3, 2025
5a39d3b
merge other PR [skip-ci]
andygrove Oct 3, 2025
dc11515
save [skip-ci]
andygrove Oct 3, 2025
d2a1ab1
[skip ci]
andygrove Oct 3, 2025
31cdbc6
save [skip ci]
andygrove Oct 3, 2025
ffb1f71
Merge remote-tracking branch 'apache/main' into debug-mem
andygrove Oct 3, 2025
322b4c5
info logging
andygrove Oct 3, 2025
89e10ac
log task id [skip ci]
andygrove Oct 3, 2025
3b191fd
println
andygrove Oct 3, 2025
7c24836
revert lock file
andygrove Oct 3, 2025
405f5b7
prep for review
andygrove Oct 3, 2025
522238d
save
andygrove Oct 3, 2025
36565ca
Update spark/src/main/scala/org/apache/comet/CometExecIterator.scala
andygrove Oct 3, 2025
21189a6
info logging
andygrove Oct 3, 2025
dfa2c67
Merge branch 'debug-mem' of github.com:andygrove/datafusion-comet int…
andygrove Oct 3, 2025
d9817ce
fix
andygrove Oct 3, 2025
acba7bc
log error on try_grow fail
andygrove Oct 3, 2025
4051d29
log error on try_grow fail
andygrove Oct 3, 2025
df69875
revert
andygrove Oct 3, 2025
ad891a0
add Python script to convert log to csv
andygrove Oct 3, 2025
8756256
Python script to generate chart
andygrove Oct 3, 2025
7eb1bc1
scripts
andygrove Oct 3, 2025
21bd386
new script
andygrove Oct 3, 2025
ec823c2
show err
andygrove Oct 3, 2025
a66fa65
save
andygrove Oct 3, 2025
12db37f
Merge branch 'debug-mem' of github.com:andygrove/datafusion-comet int…
andygrove Oct 3, 2025
2fb336e
track errors
andygrove Oct 3, 2025
706f5e7
format
andygrove Oct 3, 2025
4faf881
ASF header
andygrove Oct 3, 2025
d91abda
add brief docs
andygrove Oct 3, 2025
f6128b5
docs
andygrove Oct 3, 2025
7d40ac2
fix
andygrove Oct 5, 2025
c495897
cargo fmt
andygrove Oct 6, 2025
06814b7
upmerge
andygrove Oct 6, 2025
e51751f
format
andygrove Oct 6, 2025
75e727f
upmerge
andygrove Oct 7, 2025
e844287
fix regression
andygrove Oct 7, 2025
2884ed3
upmerge
andygrove Oct 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,11 @@ object CometConf extends ShimCometConf {

def conf(key: String): ConfigBuilder = ConfigBuilder(key)

val COMET_EXEC_CONFIG_PREFIX = "spark.comet.exec";
val COMET_PREFIX = "spark.comet";

val COMET_EXPR_CONFIG_PREFIX = "spark.comet.expression";
val COMET_EXEC_CONFIG_PREFIX: String = s"$COMET_PREFIX.exec";

val COMET_EXPR_CONFIG_PREFIX: String = s"$COMET_PREFIX.expression";

val COMET_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.enabled")
.doc(
Expand Down Expand Up @@ -454,6 +456,13 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)

val COMET_DEBUG_MEMORY_ENABLED: ConfigEntry[Boolean] =
conf(s"$COMET_PREFIX.debug.memory")
.doc("When enabled, log all native memory pool interactions to stdout.")
.internal()
.booleanConf
.createWithDefault(false)

val COMET_EXPLAIN_VERBOSE_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.explain.verbose.enabled")
.doc(
Expand Down
80 changes: 80 additions & 0 deletions dev/scripts/mem_debug_chart.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
#!/usr/bin/python
##############################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
##############################################################################

import argparse
from pathlib import Path
import pandas as pd
import matplotlib.pyplot as plt

def main():
ap = argparse.ArgumentParser()
ap.add_argument("csv", help="CSV with columns: name,size")
ap.add_argument("--instant", action="store_true",
help="Plot per-step stacked values (not cumulative totals).")
ap.add_argument("--bar", action="store_true",
help="Use stacked bars instead of stacked area.")
ap.add_argument("--title", default=None, help="Optional plot title.")
args = ap.parse_args()

path = Path(args.csv)
df = pd.read_csv(path)

# Validate + clean
need = {"name", "size"}
if not need.issubset(set(df.columns)):
raise SystemExit("CSV must have columns: name,size")

df["size"] = pd.to_numeric(df["size"], errors="coerce").fillna(0)

# Treat each row as the next time step: t = 1..N
df = df.reset_index(drop=True).assign(t=lambda d: d.index + 1)

# Build wide matrix: one column per name, one row per time step
# If multiple entries exist for the same (t, name), they’ll be summed.
wide = (
df.groupby(["t", "name"], as_index=False)["size"].sum()
.pivot(index="t", columns="name", values="size")
.fillna(0.0)
.sort_index()
)

# Running totals unless --instant specified
plot_data = wide if args.instant else wide.cumsum(axis=0)

# Plot
if args.bar:
ax = plot_data.plot(kind="bar", stacked=True, figsize=(12, 6), width=1.0)
else:
ax = plot_data.plot.area(stacked=True, figsize=(12, 6))

ax.set_xlabel("step")
ax.set_ylabel("size" if args.instant else "cumulative size")
ax.set_title(args.title or ("Stacked running totals by name" if not args.instant
else "Stacked per-step values by name"))
ax.legend(title="name", bbox_to_anchor=(1.02, 1), loc="upper left")
plt.tight_layout()

out = path.with_suffix(".stacked.png" if args.instant else ".stacked_cumulative.png")
plt.savefig(out, dpi=150)
print(f"Saved plot to {out}")
plt.show()

if __name__ == "__main__":
main()
69 changes: 69 additions & 0 deletions dev/scripts/mem_debug_to_csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
#!/usr/bin/python
##############################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
##############################################################################

import argparse
import re
import sys

def main(file, task_filter):
# keep track of running total allocation per consumer
alloc = {}

# open file
with open(file) as f:
# iterate over lines in file
print("name,size")
for line in f:
# print(line, file=sys.stderr)

# example line: [Task 486] MemoryPool[HashJoinInput[6]].shrink(1000)
# parse consumer name
re_match = re.search('\[Task (.*)\] MemoryPool\[(.*)\]\.(.*)\((.*)\)', line, re.IGNORECASE)
if re_match:
try:
task = int(re_match.group(1))
if task != task_filter:
continue

consumer = re_match.group(2)
method = re_match.group(3)
size = int(re_match.group(4))
if method == "try_grow":
if "Err" in line:
continue

if alloc.get(consumer) is None:
alloc[consumer] = size
else:
if method == "grow" or method == "try_grow":
alloc[consumer] = alloc[consumer] + size
elif method == "shrink":
alloc[consumer] = alloc[consumer] - size
print(consumer, ",", alloc[consumer])
except:
print("error parsing", line, file=sys.stderr)


if __name__ == "__main__":
ap = argparse.ArgumentParser(description="Generate CSV From memory debug output")
ap.add_argument("--task", default=None, help="Task ID.")
ap.add_argument("--file", default=None, help="Spark log containing memory debug output")
args = ap.parse_args()
main(args.file, int(args.task))
43 changes: 27 additions & 16 deletions native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ use crate::execution::spark_plan::SparkPlan;

use crate::execution::tracing::{log_memory_usage, trace_begin, trace_end, with_trace};

use crate::execution::memory_pools::logging_pool::LoggingPool;
use crate::execution::spark_config::{
SparkConfig, COMET_DEBUG_ENABLED, COMET_DEBUG_MEMORY, COMET_EXPLAIN_NATIVE_ENABLED,
COMET_TRACING_ENABLED,
};
use datafusion_comet_proto::spark_operator::operator::OpStruct;
use log::info;
use once_cell::sync::Lazy;
Expand Down Expand Up @@ -167,12 +172,21 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
memory_limit: jlong,
memory_limit_per_task: jlong,
task_attempt_id: jlong,
debug_native: jboolean,
explain_native: jboolean,
tracing_enabled: jboolean,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rather than adding yet another flag to this API call, I am now using the already available spark config map in native code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. The config map should be the preferred method

) -> jlong {
try_unwrap_or_throw(&e, |mut env| {
with_trace("createPlan", tracing_enabled != JNI_FALSE, || {
// Deserialize Spark configs
let array = unsafe { JPrimitiveArray::from_raw(serialized_spark_configs) };
let bytes = env.convert_byte_array(array)?;
let spark_configs = serde::deserialize_config(bytes.as_slice())?;
let spark_config: HashMap<String, String> = spark_configs.entries.into_iter().collect();

// Access Comet configs
let debug_native = spark_config.get_bool(COMET_DEBUG_ENABLED);
let explain_native = spark_config.get_bool(COMET_EXPLAIN_NATIVE_ENABLED);
let tracing_enabled = spark_config.get_bool(COMET_TRACING_ENABLED);
let logging_memory_pool = spark_config.get_bool(COMET_DEBUG_MEMORY);

with_trace("createPlan", tracing_enabled, || {
// Init JVM classes
JVMClasses::init(&mut env);

Expand All @@ -183,15 +197,6 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
let bytes = env.convert_byte_array(array)?;
let spark_plan = serde::deserialize_op(bytes.as_slice())?;

// Deserialize Spark configs
let array = unsafe { JPrimitiveArray::from_raw(serialized_spark_configs) };
let bytes = env.convert_byte_array(array)?;
let spark_configs = serde::deserialize_config(bytes.as_slice())?;

// Convert Spark configs to HashMap
let _spark_config_map: HashMap<String, String> =
spark_configs.entries.into_iter().collect();

let metrics = Arc::new(jni_new_global_ref!(env, metrics_node)?);

// Get the global references of input sources
Expand All @@ -218,6 +223,12 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
let memory_pool =
create_memory_pool(&memory_pool_config, task_memory_manager, task_attempt_id);

let memory_pool = if logging_memory_pool {
Arc::new(LoggingPool::new(task_attempt_id as u64, memory_pool))
} else {
memory_pool
};

// Get local directories for storing spill files
let local_dirs_array = JObjectArray::from_raw(local_dirs);
let num_local_dirs = env.get_array_length(&local_dirs_array)?;
Expand Down Expand Up @@ -256,10 +267,10 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
metrics_last_update_time: Instant::now(),
plan_creation_time,
session_ctx: Arc::new(session),
debug_native: debug_native == 1,
explain_native: explain_native == 1,
debug_native,
explain_native,
memory_pool_config,
tracing_enabled: tracing_enabled != JNI_FALSE,
tracing_enabled,
});

Ok(Box::into_raw(exec_context) as i64)
Expand Down
88 changes: 88 additions & 0 deletions native/core/src/execution/memory_pools/logging_pool.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use datafusion::execution::memory_pool::{MemoryPool, MemoryReservation};
use log::info;
use std::sync::Arc;

#[derive(Debug)]
pub(crate) struct LoggingPool {
task_attempt_id: u64,
pool: Arc<dyn MemoryPool>,
}

impl LoggingPool {
pub fn new(task_attempt_id: u64, pool: Arc<dyn MemoryPool>) -> Self {
Self {
task_attempt_id,
pool,
}
}
}

impl MemoryPool for LoggingPool {
fn grow(&self, reservation: &MemoryReservation, additional: usize) {
info!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be useful to add a debug! log message which has the backtrace of where this was requested from?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. I updated try_grow to log the Err if it fails. This should contain the backtrace if the backtrace feature is enabled, but I need to test this out locally.

Copy link
Contributor

@parthchandra parthchandra Oct 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking that we do this for every call (not just for the error) so we can trace the precise origins of the allocations. Probably should be a trace message (not a debug) though. This is merely a suggestion though, I'll leave it to you to decide if it is useful.
Logging the backtrace on error is definitely useful.

"[Task {}] MemoryPool[{}].grow({})",
self.task_attempt_id,
reservation.consumer().name(),
additional
);
self.pool.grow(reservation, additional);
}

fn shrink(&self, reservation: &MemoryReservation, shrink: usize) {
info!(
"[Task {}] MemoryPool[{}].shrink({})",
self.task_attempt_id,
reservation.consumer().name(),
shrink
);
self.pool.shrink(reservation, shrink);
}

fn try_grow(
&self,
reservation: &MemoryReservation,
additional: usize,
) -> datafusion::common::Result<()> {
match self.pool.try_grow(reservation, additional) {
Ok(_) => {
info!(
"[Task {}] MemoryPool[{}].try_grow({}) returning Ok",
self.task_attempt_id,
reservation.consumer().name(),
additional
);
Ok(())
}
Err(e) => {
info!(
"[Task {}] MemoryPool[{}].try_grow({}) returning Err: {e:?}",
self.task_attempt_id,
reservation.consumer().name(),
additional
);
Err(e)
}
}
}

fn reserved(&self) -> usize {
self.pool.reserved()
}
}
1 change: 1 addition & 0 deletions native/core/src/execution/memory_pools/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

mod config;
mod fair_pool;
pub mod logging_pool;
mod task_shared;
mod unified_pool;

Expand Down
1 change: 1 addition & 0 deletions native/core/src/execution/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub(crate) mod sort;
pub(crate) mod spark_plan;
pub use datafusion_comet_spark_expr::timezone;
mod memory_pools;
pub(crate) mod spark_config;
pub(crate) mod tracing;
pub(crate) mod utils;

Expand Down
Loading
Loading