From 3eb0718a63546b0d3141ef6c1e766fc8caf638aa Mon Sep 17 00:00:00 2001 From: Iskander14yo Date: Thu, 10 Jul 2025 22:09:20 +0300 Subject: [PATCH 01/19] add code from spark --- spark-comet/benchmark.sh | 27 ++++++++++++++++++++++++ spark-comet/queries.sql | 43 ++++++++++++++++++++++++++++++++++++++ spark-comet/query.py | 45 ++++++++++++++++++++++++++++++++++++++++ spark-comet/run.sh | 8 +++++++ 4 files changed, 123 insertions(+) create mode 100755 spark-comet/benchmark.sh create mode 100644 spark-comet/queries.sql create mode 100755 spark-comet/query.py create mode 100755 spark-comet/run.sh diff --git a/spark-comet/benchmark.sh b/spark-comet/benchmark.sh new file mode 100755 index 000000000..d8cf57c2c --- /dev/null +++ b/spark-comet/benchmark.sh @@ -0,0 +1,27 @@ +#!/bin/bash + +# Install + +sudo apt-get update -y +sudo apt-get install -y python3-pip python3-venv openjdk-17-jdk + +export JAVA_HOME="/usr/lib/jvm/java-17-openjdk-$(dpkg --print-architecture)/" +export PATH=$JAVA_HOME/bin:$PATH + +python3 -m venv myenv +source myenv/bin/activate +pip install pyspark==4.0.0 psutil + +# Load the data + +wget --continue --progress=dot:giga 'https://datasets.clickhouse.com/hits_compatible/hits.parquet' + +# Run the queries + +./run.sh 2>&1 | tee log.txt + +cat log.txt | grep -P '^Time:\s+([\d\.]+)|Failure!' | sed -r -e 's/Time: //; s/^Failure!$/null/' | + awk '{ if (i % 3 == 0) { printf "[" }; printf $1; if (i % 3 != 2) { printf "," } else { print "]," }; ++i; }' + +echo "Data size: $(du -b hits.parquet)" +echo "Load time: 0" diff --git a/spark-comet/queries.sql b/spark-comet/queries.sql new file mode 100644 index 000000000..31f65fc89 --- /dev/null +++ b/spark-comet/queries.sql @@ -0,0 +1,43 @@ +SELECT COUNT(*) FROM hits; +SELECT COUNT(*) FROM hits WHERE AdvEngineID <> 0; +SELECT SUM(AdvEngineID), COUNT(*), AVG(ResolutionWidth) FROM hits; +SELECT AVG(UserID) FROM hits; +SELECT COUNT(DISTINCT UserID) FROM hits; +SELECT COUNT(DISTINCT SearchPhrase) FROM hits; +SELECT MIN(EventDate), MAX(EventDate) FROM hits; +SELECT AdvEngineID, COUNT(*) FROM hits WHERE AdvEngineID <> 0 GROUP BY AdvEngineID ORDER BY COUNT(*) DESC; +SELECT RegionID, COUNT(DISTINCT UserID) AS u FROM hits GROUP BY RegionID ORDER BY u DESC LIMIT 10; +SELECT RegionID, SUM(AdvEngineID), COUNT(*) AS c, AVG(ResolutionWidth), COUNT(DISTINCT UserID) FROM hits GROUP BY RegionID ORDER BY c DESC LIMIT 10; +SELECT MobilePhoneModel, COUNT(DISTINCT UserID) AS u FROM hits WHERE MobilePhoneModel <> '' GROUP BY MobilePhoneModel ORDER BY u DESC LIMIT 10; +SELECT MobilePhone, MobilePhoneModel, COUNT(DISTINCT UserID) AS u FROM hits WHERE MobilePhoneModel <> '' GROUP BY MobilePhone, MobilePhoneModel ORDER BY u DESC LIMIT 10; +SELECT SearchPhrase, COUNT(*) AS c FROM hits WHERE SearchPhrase <> '' GROUP BY SearchPhrase ORDER BY c DESC LIMIT 10; +SELECT SearchPhrase, COUNT(DISTINCT UserID) AS u FROM hits WHERE SearchPhrase <> '' GROUP BY SearchPhrase ORDER BY u DESC LIMIT 10; +SELECT SearchEngineID, SearchPhrase, COUNT(*) AS c FROM hits WHERE SearchPhrase <> '' GROUP BY SearchEngineID, SearchPhrase ORDER BY c DESC LIMIT 10; +SELECT UserID, COUNT(*) FROM hits GROUP BY UserID ORDER BY COUNT(*) DESC LIMIT 10; +SELECT UserID, SearchPhrase, COUNT(*) FROM hits GROUP BY UserID, SearchPhrase ORDER BY COUNT(*) DESC LIMIT 10; +SELECT UserID, SearchPhrase, COUNT(*) FROM hits GROUP BY UserID, SearchPhrase LIMIT 10; +SELECT UserID, extract(minute FROM EventTime) AS m, SearchPhrase, COUNT(*) FROM hits GROUP BY UserID, m, SearchPhrase ORDER BY COUNT(*) DESC LIMIT 10; +SELECT UserID FROM hits WHERE UserID = 435090932899640449; +SELECT COUNT(*) FROM hits WHERE URL LIKE '%google%'; +SELECT SearchPhrase, MIN(URL), COUNT(*) AS c FROM hits WHERE URL LIKE '%google%' AND SearchPhrase <> '' GROUP BY SearchPhrase ORDER BY c DESC LIMIT 10; +SELECT SearchPhrase, MIN(URL), MIN(Title), COUNT(*) AS c, COUNT(DISTINCT UserID) FROM hits WHERE Title LIKE '%Google%' AND URL NOT LIKE '%.google.%' AND SearchPhrase <> '' GROUP BY SearchPhrase ORDER BY c DESC LIMIT 10; +SELECT * FROM hits WHERE URL LIKE '%google%' ORDER BY EventTime LIMIT 10; +SELECT SearchPhrase FROM hits WHERE SearchPhrase <> '' ORDER BY EventTime LIMIT 10; +SELECT SearchPhrase FROM hits WHERE SearchPhrase <> '' ORDER BY SearchPhrase LIMIT 10; +SELECT SearchPhrase FROM hits WHERE SearchPhrase <> '' ORDER BY EventTime, SearchPhrase LIMIT 10; +SELECT CounterID, AVG(length(URL)) AS l, COUNT(*) AS c FROM hits WHERE URL <> '' GROUP BY CounterID HAVING COUNT(*) > 100000 ORDER BY l DESC LIMIT 25; +SELECT REGEXP_REPLACE(Referer, '^https?://(?:www\.)?([^/]+)/.*$', '\1') AS k, AVG(length(Referer)) AS l, COUNT(*) AS c, MIN(Referer) FROM hits WHERE Referer <> '' GROUP BY k HAVING COUNT(*) > 100000 ORDER BY l DESC LIMIT 25; +SELECT SUM(ResolutionWidth), SUM(ResolutionWidth + 1), SUM(ResolutionWidth + 2), SUM(ResolutionWidth + 3), SUM(ResolutionWidth + 4), SUM(ResolutionWidth + 5), SUM(ResolutionWidth + 6), SUM(ResolutionWidth + 7), SUM(ResolutionWidth + 8), SUM(ResolutionWidth + 9), SUM(ResolutionWidth + 10), SUM(ResolutionWidth + 11), SUM(ResolutionWidth + 12), SUM(ResolutionWidth + 13), SUM(ResolutionWidth + 14), SUM(ResolutionWidth + 15), SUM(ResolutionWidth + 16), SUM(ResolutionWidth + 17), SUM(ResolutionWidth + 18), SUM(ResolutionWidth + 19), SUM(ResolutionWidth + 20), SUM(ResolutionWidth + 21), SUM(ResolutionWidth + 22), SUM(ResolutionWidth + 23), SUM(ResolutionWidth + 24), SUM(ResolutionWidth + 25), SUM(ResolutionWidth + 26), SUM(ResolutionWidth + 27), SUM(ResolutionWidth + 28), SUM(ResolutionWidth + 29), SUM(ResolutionWidth + 30), SUM(ResolutionWidth + 31), SUM(ResolutionWidth + 32), SUM(ResolutionWidth + 33), SUM(ResolutionWidth + 34), SUM(ResolutionWidth + 35), SUM(ResolutionWidth + 36), SUM(ResolutionWidth + 37), SUM(ResolutionWidth + 38), SUM(ResolutionWidth + 39), SUM(ResolutionWidth + 40), SUM(ResolutionWidth + 41), SUM(ResolutionWidth + 42), SUM(ResolutionWidth + 43), SUM(ResolutionWidth + 44), SUM(ResolutionWidth + 45), SUM(ResolutionWidth + 46), SUM(ResolutionWidth + 47), SUM(ResolutionWidth + 48), SUM(ResolutionWidth + 49), SUM(ResolutionWidth + 50), SUM(ResolutionWidth + 51), SUM(ResolutionWidth + 52), SUM(ResolutionWidth + 53), SUM(ResolutionWidth + 54), SUM(ResolutionWidth + 55), SUM(ResolutionWidth + 56), SUM(ResolutionWidth + 57), SUM(ResolutionWidth + 58), SUM(ResolutionWidth + 59), SUM(ResolutionWidth + 60), SUM(ResolutionWidth + 61), SUM(ResolutionWidth + 62), SUM(ResolutionWidth + 63), SUM(ResolutionWidth + 64), SUM(ResolutionWidth + 65), SUM(ResolutionWidth + 66), SUM(ResolutionWidth + 67), SUM(ResolutionWidth + 68), SUM(ResolutionWidth + 69), SUM(ResolutionWidth + 70), SUM(ResolutionWidth + 71), SUM(ResolutionWidth + 72), SUM(ResolutionWidth + 73), SUM(ResolutionWidth + 74), SUM(ResolutionWidth + 75), SUM(ResolutionWidth + 76), SUM(ResolutionWidth + 77), SUM(ResolutionWidth + 78), SUM(ResolutionWidth + 79), SUM(ResolutionWidth + 80), SUM(ResolutionWidth + 81), SUM(ResolutionWidth + 82), SUM(ResolutionWidth + 83), SUM(ResolutionWidth + 84), SUM(ResolutionWidth + 85), SUM(ResolutionWidth + 86), SUM(ResolutionWidth + 87), SUM(ResolutionWidth + 88), SUM(ResolutionWidth + 89) FROM hits; +SELECT SearchEngineID, ClientIP, COUNT(*) AS c, SUM(IsRefresh), AVG(ResolutionWidth) FROM hits WHERE SearchPhrase <> '' GROUP BY SearchEngineID, ClientIP ORDER BY c DESC LIMIT 10; +SELECT WatchID, ClientIP, COUNT(*) AS c, SUM(IsRefresh), AVG(ResolutionWidth) FROM hits WHERE SearchPhrase <> '' GROUP BY WatchID, ClientIP ORDER BY c DESC LIMIT 10; +SELECT WatchID, ClientIP, COUNT(*) AS c, SUM(IsRefresh), AVG(ResolutionWidth) FROM hits GROUP BY WatchID, ClientIP ORDER BY c DESC LIMIT 10; +SELECT URL, COUNT(*) AS c FROM hits GROUP BY URL ORDER BY c DESC LIMIT 10; +SELECT 1, URL, COUNT(*) AS c FROM hits GROUP BY 1, URL ORDER BY c DESC LIMIT 10; +SELECT ClientIP, ClientIP - 1, ClientIP - 2, ClientIP - 3, COUNT(*) AS c FROM hits GROUP BY ClientIP, ClientIP - 1, ClientIP - 2, ClientIP - 3 ORDER BY c DESC LIMIT 10; +SELECT URL, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND DontCountHits = 0 AND IsRefresh = 0 AND URL <> '' GROUP BY URL ORDER BY PageViews DESC LIMIT 10; +SELECT Title, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND DontCountHits = 0 AND IsRefresh = 0 AND Title <> '' GROUP BY Title ORDER BY PageViews DESC LIMIT 10; +SELECT URL, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND IsRefresh = 0 AND IsLink <> 0 AND IsDownload = 0 GROUP BY URL ORDER BY PageViews DESC LIMIT 10 OFFSET 1000; +SELECT TraficSourceID, SearchEngineID, AdvEngineID, CASE WHEN (SearchEngineID = 0 AND AdvEngineID = 0) THEN Referer ELSE '' END AS Src, URL AS Dst, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND IsRefresh = 0 GROUP BY TraficSourceID, SearchEngineID, AdvEngineID, Src, Dst ORDER BY PageViews DESC LIMIT 10 OFFSET 1000; +SELECT URLHash, EventDate, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND IsRefresh = 0 AND TraficSourceID IN (-1, 6) AND RefererHash = 3594120000172545465 GROUP BY URLHash, EventDate ORDER BY PageViews DESC LIMIT 10 OFFSET 100; +SELECT WindowClientWidth, WindowClientHeight, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND IsRefresh = 0 AND DontCountHits = 0 AND URLHash = 2868770270353813622 GROUP BY WindowClientWidth, WindowClientHeight ORDER BY PageViews DESC LIMIT 10 OFFSET 10000; +SELECT DATE_TRUNC('minute', EventTime) AS M, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-14' AND EventDate <= '2013-07-15' AND IsRefresh = 0 AND DontCountHits = 0 GROUP BY DATE_TRUNC('minute', EventTime) ORDER BY DATE_TRUNC('minute', EventTime) LIMIT 10 OFFSET 1000; diff --git a/spark-comet/query.py b/spark-comet/query.py new file mode 100755 index 000000000..9da8c7dbc --- /dev/null +++ b/spark-comet/query.py @@ -0,0 +1,45 @@ +#!/usr/bin/env python3 + +from pyspark.sql import SparkSession +import pyspark.sql.functions as F + +import timeit +import psutil +import sys +import re + +query = sys.stdin.read() +# Replace \1 to $1 because spark recognizes only this pattern style (in query 28) +query = re.sub(r"""(REGEXP_REPLACE\(.*?,\s*('[^']*')\s*,\s*)('1')""", r"\1'$1'", query) +print(query) + +# Calculate available memory to configurate SparkSession +ram = int(round(psutil.virtual_memory().available / (1024 ** 3) * 0.7)) +print(f"SparkSession will use {ram} GB of memory") + +spark = ( + SparkSession + .builder + .appName("ClickBench") + .config("spark.driver", "local[*]") # To ensure using all cores + .config("spark.driver.memory", f"{ram}g") # Set amount of memory SparkSession can use + .config("spark.sql.parquet.binaryAsString", True) # Treat binary as string to get correct length calculations and text results + .getOrCreate() +) + +df = spark.read.parquet("hits.parquet") +# Do casting before creating the view so no need to change to unreadable integer dates in SQL +df = df.withColumn("EventTime", F.col("EventTime").cast("timestamp")) +df = df.withColumn("EventDate", F.date_add(F.lit("1970-01-01"), F.col("EventDate"))) +df.createOrReplaceTempView("hits") + +for try_num in range(3): + try: + start = timeit.default_timer() + result = spark.sql(query) + result.show(100) # some queries should return more than 20 rows which is the default show limit + end = timeit.default_timer() + print("Time: ", end - start) + except Exception as e: + print(e); + print("Failure!") diff --git a/spark-comet/run.sh b/spark-comet/run.sh new file mode 100755 index 000000000..64df8c608 --- /dev/null +++ b/spark-comet/run.sh @@ -0,0 +1,8 @@ +#!/bin/bash + +cat queries.sql | while read query; do + sync + echo 3 | sudo tee /proc/sys/vm/drop_caches >/dev/null + + ./query.py <<< "${query}" +done From 08db5805d23e8b89205ae08d76188c51e4a5f58f Mon Sep 17 00:00:00 2001 From: Iskander14yo Date: Fri, 11 Jul 2025 00:40:34 +0300 Subject: [PATCH 02/19] add base comet setup: installation and SparkSession --- spark-comet/benchmark.sh | 6 +++++- spark-comet/query.py | 9 +++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/spark-comet/benchmark.sh b/spark-comet/benchmark.sh index d8cf57c2c..8e1d9e336 100755 --- a/spark-comet/benchmark.sh +++ b/spark-comet/benchmark.sh @@ -10,12 +10,16 @@ export PATH=$JAVA_HOME/bin:$PATH python3 -m venv myenv source myenv/bin/activate -pip install pyspark==4.0.0 psutil +pip install pyspark==3.5.6 psutil # Load the data wget --continue --progress=dot:giga 'https://datasets.clickhouse.com/hits_compatible/hits.parquet' +# Install Comet + +wget --progress=dot:giga 'https://repo1.maven.org/maven2/org/apache/datafusion/comet-spark-spark3.5_2.12/0.9.0/comet-spark-spark3.5_2.12-0.9.0.jar' -O comet.jar + # Run the queries ./run.sh 2>&1 | tee log.txt diff --git a/spark-comet/query.py b/spark-comet/query.py index 9da8c7dbc..9229ce772 100755 --- a/spark-comet/query.py +++ b/spark-comet/query.py @@ -24,6 +24,15 @@ .config("spark.driver", "local[*]") # To ensure using all cores .config("spark.driver.memory", f"{ram}g") # Set amount of memory SparkSession can use .config("spark.sql.parquet.binaryAsString", True) # Treat binary as string to get correct length calculations and text results + + # Comet configuration + .config("spark.jars", "comet.jar") + .config("spark.driver.extraClassPath", "comet.jar") # Otherwise fails on some queries - see https://datafusion.apache.org/comet/user-guide/installation.html#additional-configuration + .config("spark.plugins", "org.apache.spark.CometPlugin") + .config("spark.shuffle.manager", "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") + .config("spark.memory.offHeap.enabled", "true") + .config("spark.memory.offHeap.size", "4g") + .getOrCreate() ) From d50055d0a9ffdee06bedba39b4ce220e1be6a342 Mon Sep 17 00:00:00 2001 From: Iskander14yo Date: Mon, 14 Jul 2025 00:30:00 +0300 Subject: [PATCH 03/19] split memory: heap and off-heap --- spark-comet/query.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/spark-comet/query.py b/spark-comet/query.py index 9229ce772..2a9ab6d4a 100755 --- a/spark-comet/query.py +++ b/spark-comet/query.py @@ -15,23 +15,25 @@ # Calculate available memory to configurate SparkSession ram = int(round(psutil.virtual_memory().available / (1024 ** 3) * 0.7)) -print(f"SparkSession will use {ram} GB of memory") +heap = ram // 2 +off_heap = ram - heap +print(f"SparkSession will use {heap} GB of heap and {off_heap} GB of off-heap memory") spark = ( SparkSession .builder .appName("ClickBench") .config("spark.driver", "local[*]") # To ensure using all cores - .config("spark.driver.memory", f"{ram}g") # Set amount of memory SparkSession can use + .config("spark.driver.memory", f"{heap}g") # Set amount of memory SparkSession can use .config("spark.sql.parquet.binaryAsString", True) # Treat binary as string to get correct length calculations and text results # Comet configuration .config("spark.jars", "comet.jar") - .config("spark.driver.extraClassPath", "comet.jar") # Otherwise fails on some queries - see https://datafusion.apache.org/comet/user-guide/installation.html#additional-configuration + .config("spark.driver.extraClassPath", "comet.jar") # Otherwise fails on some queries (see https://datafusion.apache.org/comet/user-guide/installation.html#additional-configuration) .config("spark.plugins", "org.apache.spark.CometPlugin") .config("spark.shuffle.manager", "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") .config("spark.memory.offHeap.enabled", "true") - .config("spark.memory.offHeap.size", "4g") + .config("spark.memory.offHeap.size", f"{off_heap}g") .getOrCreate() ) From 898fe785c26587dbba921bad689efa4feaf9a92b Mon Sep 17 00:00:00 2001 From: Iskander14yo Date: Tue, 15 Jul 2025 23:49:49 +0300 Subject: [PATCH 04/19] auto-save results --- spark-comet/benchmark.sh | 11 +++++- spark-comet/process_results.py | 61 ++++++++++++++++++++++++++++++++++ 2 files changed, 71 insertions(+), 1 deletion(-) create mode 100755 spark-comet/process_results.py diff --git a/spark-comet/benchmark.sh b/spark-comet/benchmark.sh index 8e1d9e336..93193a543 100755 --- a/spark-comet/benchmark.sh +++ b/spark-comet/benchmark.sh @@ -18,7 +18,7 @@ wget --continue --progress=dot:giga 'https://datasets.clickhouse.com/hits_compat # Install Comet -wget --progress=dot:giga 'https://repo1.maven.org/maven2/org/apache/datafusion/comet-spark-spark3.5_2.12/0.9.0/comet-spark-spark3.5_2.12-0.9.0.jar' -O comet.jar +wget --continue --progress=dot:giga 'https://repo1.maven.org/maven2/org/apache/datafusion/comet-spark-spark3.5_2.12/0.9.0/comet-spark-spark3.5_2.12-0.9.0.jar' -O comet.jar # Run the queries @@ -27,5 +27,14 @@ wget --progress=dot:giga 'https://repo1.maven.org/maven2/org/apache/datafusion/c cat log.txt | grep -P '^Time:\s+([\d\.]+)|Failure!' | sed -r -e 's/Time: //; s/^Failure!$/null/' | awk '{ if (i % 3 == 0) { printf "[" }; printf $1; if (i % 3 != 2) { printf "," } else { print "]," }; ++i; }' +# Save results + +MACHINE="c6a.4xlarge" + +mkdir -p results +./process_results.py "$MACHINE" > "results/${MACHINE}.json" + +echo "Results have been saved to results/${MACHINE}.json" + echo "Data size: $(du -b hits.parquet)" echo "Load time: 0" diff --git a/spark-comet/process_results.py b/spark-comet/process_results.py new file mode 100755 index 000000000..4a17dc42f --- /dev/null +++ b/spark-comet/process_results.py @@ -0,0 +1,61 @@ +#!/usr/bin/env python3 + +import json +import sys +from datetime import date +import subprocess +import re + +def get_data_size(): + # basically a duplicate of `benchmark.sh` + result = subprocess.run(['du', '-b', 'hits.parquet'], capture_output=True, text=True) + return int(result.stdout.split()[0]) + +def process_times(log_file): + results = [] + current_array = [] + + with open(log_file) as f: + for line in f: + if line.startswith('Time: '): + value = float(line.strip().replace('Time: ', '')) + current_array.append(value) + elif line.startswith('Failure!'): + current_array.append(None) + + if len(current_array) == 3: + results.append(current_array) + current_array = [] + + return results + +def get_comment(): + with open("benchmark.sh") as f: + for line in f: + if "comet.jar" in line: + comet_version = re.search(r"(.{5}).jar", line).group(1) + elif "pyspark" in line: + pyspark_version = re.search(r"pyspark==([^\s]+)", line).group(1) + return f"Using Comet {comet_version} with Spark {pyspark_version}" + +def main(): + machine = sys.argv[1] if len(sys.argv) > 1 else "unknown" + + data = { + "system": "Spark (Comet)", + "date": date.today().isoformat(), + "machine": machine, + "cluster_size": 1, + "proprietary": "no", + "tuned": "no", + "comment": get_comment(), + "tags": ["Java", "Rust", "column-oriented", "Spark derivative", "DataFusion", "Parquet"], + "load_time": 0, + "data_size": get_data_size(), + "result": process_times("log.txt") + } + + print(json.dumps(data, indent=4)) + +if __name__ == '__main__': + main() \ No newline at end of file From bb6961a1407ca696d93804e3396ab980b673ea41 Mon Sep 17 00:00:00 2001 From: Iskander14yo Date: Tue, 15 Jul 2025 23:50:22 +0300 Subject: [PATCH 05/19] add docs for spark accelerators and Comet --- spark-comet/README.md | 21 +++++++++++++++++++++ spark-comet/benchmark.sh | 5 +++++ spark-comet/query.py | 8 +++++++- spark/README-accelerators.md | 30 ++++++++++++++++++++++++++++++ 4 files changed, 63 insertions(+), 1 deletion(-) create mode 100644 spark-comet/README.md create mode 100644 spark/README-accelerators.md diff --git a/spark-comet/README.md b/spark-comet/README.md new file mode 100644 index 000000000..52176dc8d --- /dev/null +++ b/spark-comet/README.md @@ -0,0 +1,21 @@ +For basic information, check the [spark-accelerators README](../spark/README-accelerators.md). + +- To find all unsupported queries from log.txt (requires `spark.comet.explainFallback.enabled=True` and better set `spark.sql.debug.maxToStringFields` to arbitrary big number like `10000`): +```bash +>>> grep -P "\[COMET:" log.txt | sed -e 's/^[ \t]*//' | sort | uniq -c + + 78 +- GlobalLimit [COMET: GlobalLimit is not supported] + 18 +- HashAggregate [COMET: Unsupported aggregation mode PartialMerge] + 123 +- HashAggregate [COMET: distinct aggregates are not supported] + ... +``` +- Check [here](https://datafusion.apache.org/comet/user-guide/installation.html#supported-spark-versions) for _version compatibility_ between Spark and Comet. +- Check [here](https://datafusion.apache.org/comet/user-guide/installation.html#using-a-published-jar-file) for _links to Comet jar_. +- Check [here](https://datafusion.apache.org/comet/user-guide/installation.html#run-spark-shell-with-comet-enabled) for _basic Comet configuration_. + +### Configuration +- Comet requires dedicated memory allocation, which can be provided either through memoryOverhead or off-heap memory. The [documentation recommends](https://datafusion.apache.org/comet/user-guide/tuning.html#configuring-comet-memory-in-off-heap-mode) using off-heap memory, which is the approach used in ClickBench. +Therefore, we need to split memory between heap (for Spark) and off-heap (for Comet). For both TPC-H and TPC-DS benchmarks, Comet documentation suggests a 50/50 proportion +(see [here](https://datafusion.apache.org/comet/contributor-guide/benchmarking.html)). Seems that this allocation is appropriate to support fallback to Spark execution when Comet can't handle certain operations, which is also relevant for ClickBench. +- `spark.driver.extraClassPath` is set so Comet doesn't fail on some queries (check [here](https://datafusion.apache.org/comet/user-guide/installation.html#additional-configuration) for info). +- `spark.comet.regexp.allowIncompatible` is set to `True` to allow using incompatible regular expressions so Comet doesn't fall back to Spark (check [here](https://datafusion.apache.org/comet/user-guide/compatibility.html#regular-expressions) for info). diff --git a/spark-comet/benchmark.sh b/spark-comet/benchmark.sh index 93193a543..3cf0bfb93 100755 --- a/spark-comet/benchmark.sh +++ b/spark-comet/benchmark.sh @@ -1,5 +1,10 @@ #!/bin/bash +# Differences with Spark setup (see README.md for details): +# - pyspark==3.5.6 version is used (latest possible for Comet 0.9.0) +# - Comet installation is added +# - auto-save results + # Install sudo apt-get update -y diff --git a/spark-comet/query.py b/spark-comet/query.py index 2a9ab6d4a..c43a4b160 100755 --- a/spark-comet/query.py +++ b/spark-comet/query.py @@ -1,5 +1,11 @@ #!/usr/bin/env python3 +""" +Differences with Spark setup (see README.md for details): +- memory is split between heap (for Spark) and off-heap (for Comet) +- Comet configuration is added to `SparkSession` +""" + from pyspark.sql import SparkSession import pyspark.sql.functions as F @@ -29,7 +35,7 @@ # Comet configuration .config("spark.jars", "comet.jar") - .config("spark.driver.extraClassPath", "comet.jar") # Otherwise fails on some queries (see https://datafusion.apache.org/comet/user-guide/installation.html#additional-configuration) + .config("spark.driver.extraClassPath", "comet.jar") .config("spark.plugins", "org.apache.spark.CometPlugin") .config("spark.shuffle.manager", "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") .config("spark.memory.offHeap.enabled", "true") diff --git a/spark/README-accelerators.md b/spark/README-accelerators.md new file mode 100644 index 000000000..1572194fb --- /dev/null +++ b/spark/README-accelerators.md @@ -0,0 +1,30 @@ +Several Spark plugins and extensions claim to offer significantly better performance than vanilla Spark for analytical workloads. +They share common characteristics, which are documented here for convenience. + +Currently implemented in ClickBench: +- [Apache Comet](https://datafusion.apache.org/comet/user-guide/overview.html) (`spark-comet` folder) + +There are some considerations when working with these accelerators. Several have already been discussed [here](https://github.com/ClickHouse/ClickBench/issues/517). This README provides general guidance and may be supplemented by READMEs in individual engine folders. + +### General + +Some of these backends set goal to provide performance improvements with no _code rewrites_. Therefore, an objective is to run the "same Spark" with minimal modifications (except for setup and configuration adjustments). In practice, this means changes should be limited to `benchmark.sh` and `query.py`. + +### Setting Up + +- Java installation is still required as Spark handles all operations except computation (and sometimes computation as well). +- Spark version has to be selected from a supported list (versions are likely restricted for internal API compatibility). This list is typically available in the documentation. +The current approach is to use the "latest stable Spark version" for each engine. +- These engines utilize Spark's plugin/extension system, therefore represent a `.jar` file. The appropriate `.jar` (engine) version usually depend on the Spark version, Scala version, and other factors. + +### Configuration + +- Resource allocation should follow existing Spark configuration to provide results' fairness. While `cores` allocation is straightforward, memory needs to be divided between heap (for Spark) and off-heap/overhead (for engines). It's safe to base configuration on the documentation recommendations/examples. +- `SparkSession` configuration typically requires settings like `spark.jars`, `spark.plugins`, and other parameters to enable engines. +- Remember to disable debug mode (most engines can explain fallbacks to Spark) to avoid performance overhead. + +### Queries + +These engines typically don't support the complete set of Spark operators, functions, or expressions. In such cases, they usually fall back gracefully to Spark execution or, rarely, fail due to semantic differences between Spark and engines. + +The standard approach is to use the default `queries.sql`. However, queries can be optionally rewritten for complete engine computation (see [here](https://github.com/ClickHouse/ClickBench/issues/517#issuecomment-3069121171) for discussion). From 282c85e1579bfa16d532946cc0b55ce1316f8cc2 Mon Sep 17 00:00:00 2001 From: Iskander14yo Date: Thu, 17 Jul 2025 13:52:34 +0300 Subject: [PATCH 06/19] deal with unsupported things: regexp --- spark-comet/query.py | 1 + 1 file changed, 1 insertion(+) diff --git a/spark-comet/query.py b/spark-comet/query.py index c43a4b160..664b41a20 100755 --- a/spark-comet/query.py +++ b/spark-comet/query.py @@ -40,6 +40,7 @@ .config("spark.shuffle.manager", "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") .config("spark.memory.offHeap.enabled", "true") .config("spark.memory.offHeap.size", f"{off_heap}g") + .config("spark.comet.regexp.allowIncompatible", True) .getOrCreate() ) From 970f3dbd15130234cdcae1b77d8565770358dbe1 Mon Sep 17 00:00:00 2001 From: Iskander14yo Date: Thu, 17 Jul 2025 21:00:49 +0300 Subject: [PATCH 07/19] add results (c6a.4xlarge) --- spark-comet/results/c6a.4xlarge.json | 236 +++++++++++++++++++++++++++ 1 file changed, 236 insertions(+) create mode 100644 spark-comet/results/c6a.4xlarge.json diff --git a/spark-comet/results/c6a.4xlarge.json b/spark-comet/results/c6a.4xlarge.json new file mode 100644 index 000000000..7dfb82516 --- /dev/null +++ b/spark-comet/results/c6a.4xlarge.json @@ -0,0 +1,236 @@ +{ + "system": "Spark (Comet)", + "date": "2025-07-17", + "machine": "c6a.4xlarge", + "cluster_size": 1, + "proprietary": "no", + "tuned": "no", + "comment": "Using Comet 0.9.0 with Spark 3.5.6", + "tags": [ + "Java", + "Rust", + "column-oriented", + "Spark derivative", + "DataFusion", + "Parquet" + ], + "load_time": 0, + "data_size": 14779976446, + "result": [ + [ + 4.472266616999946, + 2.1039232120000406, + 1.8430970299999672 + ], + [ + 3.578030586000068, + 1.0736136329999226, + 0.9344627739999396 + ], + [ + 6.64499863900005, + 1.4128532029999405, + 1.1755133540000315 + ], + [ + 4.696922097999959, + 2.0041085599999633, + 1.7866440060000741 + ], + [ + 5.631326217000037, + 2.6193342880000046, + 2.494008354000016 + ], + [ + 6.264381856999989, + 2.870651210999995, + 2.715818182000021 + ], + [ + 5.016372360999981, + 2.275984874999949, + 2.11629320499992 + ], + [ + 3.756535059999919, + 1.121937813000045, + 0.9281246310000597 + ], + [ + 6.773856178999949, + 2.9831256090000124, + 2.7141978240000526 + ], + [ + 13.568733273999896, + 4.030130065000094, + 3.534092967000106 + ], + [ + 5.836205211999982, + 2.355154799000047, + 2.1867549080000117 + ], + [ + 5.6480224209999506, + 1.5667226559999108, + 1.407957185999976 + ], + [ + 6.183444781000048, + 3.122852286000011, + 2.7804966820000345 + ], + [ + 10.864835242000026, + 5.143748008999978, + 4.993124574000035 + ], + [ + 5.930140444000017, + 2.934907199999998, + 2.7044927689998985 + ], + [ + 5.991562084000066, + 3.1449877329999936, + 2.7900950980000516 + ], + [ + 8.38386990999993, + 4.513304905000041, + 4.088832370999967 + ], + [ + 7.609503776999986, + 3.542259543, + 3.492194361999964 + ], + [ + 16.28575477000004, + 12.083004838999955, + 11.858153362000053 + ], + [ + 4.626656580000031, + 2.276965354999902, + 2.057529632000069 + ], + [ + 21.794626840999967, + 3.180863176999992, + 3.0697125969999206 + ], + [ + 25.802874348000046, + 3.6175597540000126, + 3.35841635099996 + ], + [ + 47.73140455199996, + 5.476737628000137, + 5.387913136999941 + ], + [ + 112.79471081299994, + 20.44997842099997, + 20.447969489999878 + ], + [ + 8.229423031999886, + 2.295753730000115, + 2.258644013999856 + ], + [ + 5.093002269999943, + 2.1934522190001644, + 2.009188600000016 + ], + [ + 8.189870422000013, + 2.316829217000077, + 2.3093284550000135 + ], + [ + 22.221672595999962, + 3.8472844349998923, + 3.526627628999904 + ], + [ + 24.93743621999988, + 17.81829874799996, + 17.44794403999981 + ], + [ + 6.5120100659999025, + 3.486931069999855, + 3.336994612000126 + ], + [ + 11.685128793000104, + 2.5526158909999594, + 2.231654909999861 + ], + [ + 21.547016347999943, + 3.102731389999917, + 2.8722797610000725 + ], + [ + 18.465872553999816, + 8.19440798100004, + 7.955121861999942 + ], + [ + 24.087333387999934, + 8.077702382000098, + 7.767284494000023 + ], + [ + 24.110849956000038, + 8.102607654999929, + 7.702735385000096 + ], + [ + 6.394193387999849, + 3.394644223999876, + 3.071767507000004 + ], + [ + 32.970651810999925, + 4.560951911000075, + 4.3527405339998495 + ], + [ + 26.42749080500016, + 4.248164862999829, + 4.1698636720000195 + ], + [ + 37.74051866500008, + 4.908241721000195, + 4.685314071999983 + ], + [ + null, + null, + null + ], + [ + 13.638310121999893, + 1.8678283230001398, + 1.4909935100001803 + ], + [ + 25.261886403999824, + 2.0152608360001523, + 1.763202008999997 + ], + [ + 8.614786060000142, + 1.8395530959999178, + 1.526004198999999 + ] + ] +} \ No newline at end of file From 448bd3af0f65f514d56481d7169a2e9c91f60434 Mon Sep 17 00:00:00 2001 From: Iskander14yo Date: Wed, 23 Jul 2025 22:41:21 +0300 Subject: [PATCH 08/19] add debug mode --- spark-comet/query.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/spark-comet/query.py b/spark-comet/query.py index 664b41a20..3384647af 100755 --- a/spark-comet/query.py +++ b/spark-comet/query.py @@ -13,6 +13,7 @@ import psutil import sys import re +import os query = sys.stdin.read() # Replace \1 to $1 because spark recognizes only this pattern style (in query 28) @@ -25,7 +26,7 @@ off_heap = ram - heap print(f"SparkSession will use {heap} GB of heap and {off_heap} GB of off-heap memory") -spark = ( +builder = ( SparkSession .builder .appName("ClickBench") @@ -41,10 +42,14 @@ .config("spark.memory.offHeap.enabled", "true") .config("spark.memory.offHeap.size", f"{off_heap}g") .config("spark.comet.regexp.allowIncompatible", True) - - .getOrCreate() ) +if os.getenv("DEBUG") == "1": + builder.config("spark.comet.explainFallback.enabled", "true") + builder.config("spark.sql.debug.maxToStringFields", "10000") + +spark = builder.getOrCreate() + df = spark.read.parquet("hits.parquet") # Do casting before creating the view so no need to change to unreadable integer dates in SQL df = df.withColumn("EventTime", F.col("EventTime").cast("timestamp")) From e789a74c69a2e53c3c99cade5462d64a8d9574ca Mon Sep 17 00:00:00 2001 From: Iskander14yo Date: Wed, 23 Jul 2025 22:50:12 +0300 Subject: [PATCH 09/19] chores: formatter (ruff) edits --- spark-comet/process_results.py | 33 +++++++++++++++++++-------------- spark-comet/query.py | 16 ++++++++-------- 2 files changed, 27 insertions(+), 22 deletions(-) diff --git a/spark-comet/process_results.py b/spark-comet/process_results.py index 4a17dc42f..2f838e7b7 100755 --- a/spark-comet/process_results.py +++ b/spark-comet/process_results.py @@ -1,34 +1,37 @@ #!/usr/bin/env python3 import json +import re +import subprocess import sys from datetime import date -import subprocess -import re + def get_data_size(): # basically a duplicate of `benchmark.sh` - result = subprocess.run(['du', '-b', 'hits.parquet'], capture_output=True, text=True) + result = subprocess.run(["du", "-b", "hits.parquet"], capture_output=True, text=True) return int(result.stdout.split()[0]) + def process_times(log_file): results = [] current_array = [] - + with open(log_file) as f: for line in f: - if line.startswith('Time: '): - value = float(line.strip().replace('Time: ', '')) + if line.startswith("Time: "): + value = float(line.strip().replace("Time: ", "")) current_array.append(value) - elif line.startswith('Failure!'): + elif line.startswith("Failure!"): current_array.append(None) - + if len(current_array) == 3: results.append(current_array) current_array = [] - + return results + def get_comment(): with open("benchmark.sh") as f: for line in f: @@ -38,9 +41,10 @@ def get_comment(): pyspark_version = re.search(r"pyspark==([^\s]+)", line).group(1) return f"Using Comet {comet_version} with Spark {pyspark_version}" + def main(): machine = sys.argv[1] if len(sys.argv) > 1 else "unknown" - + data = { "system": "Spark (Comet)", "date": date.today().isoformat(), @@ -52,10 +56,11 @@ def main(): "tags": ["Java", "Rust", "column-oriented", "Spark derivative", "DataFusion", "Parquet"], "load_time": 0, "data_size": get_data_size(), - "result": process_times("log.txt") + "result": process_times("log.txt"), } - + print(json.dumps(data, indent=4)) -if __name__ == '__main__': - main() \ No newline at end of file + +if __name__ == "__main__": + main() diff --git a/spark-comet/query.py b/spark-comet/query.py index 3384647af..287148e01 100755 --- a/spark-comet/query.py +++ b/spark-comet/query.py @@ -6,14 +6,14 @@ - Comet configuration is added to `SparkSession` """ -from pyspark.sql import SparkSession -import pyspark.sql.functions as F - +import os +import re +import sys import timeit + import psutil -import sys -import re -import os +import pyspark.sql.functions as F +from pyspark.sql import SparkSession query = sys.stdin.read() # Replace \1 to $1 because spark recognizes only this pattern style (in query 28) @@ -33,7 +33,7 @@ .config("spark.driver", "local[*]") # To ensure using all cores .config("spark.driver.memory", f"{heap}g") # Set amount of memory SparkSession can use .config("spark.sql.parquet.binaryAsString", True) # Treat binary as string to get correct length calculations and text results - + # Comet configuration .config("spark.jars", "comet.jar") .config("spark.driver.extraClassPath", "comet.jar") @@ -64,5 +64,5 @@ end = timeit.default_timer() print("Time: ", end - start) except Exception as e: - print(e); + print(e) print("Failure!") From c2c0d6856026eec2d4006ae0bc1e82115d9e64b7 Mon Sep 17 00:00:00 2001 From: Iskander14yo Date: Sat, 26 Jul 2025 13:31:07 +0300 Subject: [PATCH 10/19] add note on syncing changes between engines --- spark-comet/benchmark.sh | 5 +++-- spark-comet/query.py | 4 +++- spark-comet/run.sh | 2 ++ spark/benchmark.sh | 2 ++ spark/query.py | 4 ++++ spark/run.sh | 2 ++ 6 files changed, 16 insertions(+), 3 deletions(-) diff --git a/spark-comet/benchmark.sh b/spark-comet/benchmark.sh index 3cf0bfb93..293a0da86 100755 --- a/spark-comet/benchmark.sh +++ b/spark-comet/benchmark.sh @@ -1,7 +1,8 @@ #!/bin/bash -# Differences with Spark setup (see README.md for details): -# - pyspark==3.5.6 version is used (latest possible for Comet 0.9.0) +# ⚠️ Derived from spark/benchmark.sh — keep in sync where possible (check README.md for the details). +# Current differences: +# - pyspark==3.5.6 version is used (latest stable for Comet 0.9.0) # - Comet installation is added # - auto-save results diff --git a/spark-comet/query.py b/spark-comet/query.py index 287148e01..84ac38af7 100755 --- a/spark-comet/query.py +++ b/spark-comet/query.py @@ -1,9 +1,11 @@ #!/usr/bin/env python3 """ -Differences with Spark setup (see README.md for details): +⚠️ Derived from spark/query.py — keep in sync where possible (check README.md for the details). +Current differences: - memory is split between heap (for Spark) and off-heap (for Comet) - Comet configuration is added to `SparkSession` +- debug mode is added """ import os diff --git a/spark-comet/run.sh b/spark-comet/run.sh index 64df8c608..d9b5db47b 100755 --- a/spark-comet/run.sh +++ b/spark-comet/run.sh @@ -1,5 +1,7 @@ #!/bin/bash +# ⚠️ Derived from spark/run.sh — keep in sync where possible (check README.md for the details). + cat queries.sql | while read query; do sync echo 3 | sudo tee /proc/sys/vm/drop_caches >/dev/null diff --git a/spark/benchmark.sh b/spark/benchmark.sh index d8cf57c2c..cbcb40eee 100755 --- a/spark/benchmark.sh +++ b/spark/benchmark.sh @@ -1,5 +1,7 @@ #!/bin/bash +# ⚠️ Used as a base for spark-*/benchmark.sh — keep in sync where possible (check README-accelerators.md for the details). + # Install sudo apt-get update -y diff --git a/spark/query.py b/spark/query.py index 9da8c7dbc..ca8f312e1 100755 --- a/spark/query.py +++ b/spark/query.py @@ -1,5 +1,9 @@ #!/usr/bin/env python3 +""" +⚠️ Used as a base for spark-*/query.py — keep in sync where possible (check README-accelerators.md for the details). +""" + from pyspark.sql import SparkSession import pyspark.sql.functions as F diff --git a/spark/run.sh b/spark/run.sh index 64df8c608..5076f9ca3 100755 --- a/spark/run.sh +++ b/spark/run.sh @@ -1,5 +1,7 @@ #!/bin/bash +# ⚠️ Used as a base for spark-*/run.sh — keep in sync where possible (check README-accelerators.md for the details). + cat queries.sql | while read query; do sync echo 3 | sudo tee /proc/sys/vm/drop_caches >/dev/null From a9f3a3b4361c692498bab8a02978aafc8631edb5 Mon Sep 17 00:00:00 2001 From: Iskander14yo Date: Sat, 26 Jul 2025 13:41:17 +0300 Subject: [PATCH 11/19] fix: allow incompatible scan to avoid crashes --- spark-comet/query.py | 1 + 1 file changed, 1 insertion(+) diff --git a/spark-comet/query.py b/spark-comet/query.py index 84ac38af7..fab518f4b 100755 --- a/spark-comet/query.py +++ b/spark-comet/query.py @@ -44,6 +44,7 @@ .config("spark.memory.offHeap.enabled", "true") .config("spark.memory.offHeap.size", f"{off_heap}g") .config("spark.comet.regexp.allowIncompatible", True) + .config("spark.comet.scan.allowIncompatible", True) ) if os.getenv("DEBUG") == "1": From 6f1c79aab492adf6219f8feae71f4a78330e4c8b Mon Sep 17 00:00:00 2001 From: Iskander14yo Date: Sun, 27 Jul 2025 11:00:02 +0300 Subject: [PATCH 12/19] refactor: move JSON generating to benchmark.sh, fix JSON formatting --- spark-comet/benchmark.sh | 56 +++++++++++++++++++++++++---- spark-comet/process_results.py | 66 ---------------------------------- 2 files changed, 49 insertions(+), 73 deletions(-) delete mode 100755 spark-comet/process_results.py diff --git a/spark-comet/benchmark.sh b/spark-comet/benchmark.sh index 293a0da86..568cd445c 100755 --- a/spark-comet/benchmark.sh +++ b/spark-comet/benchmark.sh @@ -24,23 +24,65 @@ wget --continue --progress=dot:giga 'https://datasets.clickhouse.com/hits_compat # Install Comet -wget --continue --progress=dot:giga 'https://repo1.maven.org/maven2/org/apache/datafusion/comet-spark-spark3.5_2.12/0.9.0/comet-spark-spark3.5_2.12-0.9.0.jar' -O comet.jar +COMET_JAR_URL='https://repo1.maven.org/maven2/org/apache/datafusion/comet-spark-spark3.5_2.12/0.9.0/comet-spark-spark3.5_2.12-0.9.0.jar' + +wget --continue --progress=dot:giga $COMET_JAR_URL -O comet.jar # Run the queries ./run.sh 2>&1 | tee log.txt +# Print results to stdout as required cat log.txt | grep -P '^Time:\s+([\d\.]+)|Failure!' | sed -r -e 's/Time: //; s/^Failure!$/null/' | awk '{ if (i % 3 == 0) { printf "[" }; printf $1; if (i % 3 != 2) { printf "," } else { print "]," }; ++i; }' -# Save results +DATA_SIZE=$(du -b hits.parquet | cut -f1) + +echo "Data size: $DATA_SIZE" +echo "Load time: 0" -MACHINE="c6a.4xlarge" +# Save results as JSON + +MACHINE="${1:-c6a.4xlarge}" # Use first argument as machine name, default to c6a.4xlarge +COMET_VERSION=$(echo $COMET_JAR_URL | grep -Po ".{5}(?=.jar)") +SPARK_VERSION=$(pip freeze | grep '^pyspark==' | cut -d '=' -f3) mkdir -p results -./process_results.py "$MACHINE" > "results/${MACHINE}.json" -echo "Results have been saved to results/${MACHINE}.json" +( +cat << EOF +{ + "system": "Spark (Comet)", + "date": "$(date +%Y-%m-%d)", + "machine": "${MACHINE}", + "cluster_size": 1, + "proprietary": "no", + "tuned": "no", + "comment": "Using Comet ${COMET_VERSION} with Spark ${SPARK_VERSION}", + "tags": ["Java", "Rust", "column-oriented", "Spark derivative", "DataFusion", "Parquet"], + "load_time": 0, + "data_size": ${DATA_SIZE}, + "result": [ +EOF -echo "Data size: $(du -b hits.parquet)" -echo "Load time: 0" +cat log.txt | grep -P '^Time:\s+([\d\.]+)|Failure!' | sed -r -e 's/Time: //; s/^Failure!$/null/' | + awk -v total=$(grep -cP '^Time:\s+[\d\.]+|Failure!' log.txt) ' + { + if (i % 3 == 0) printf "\t\t["; + printf $1; + if (i % 3 != 2) printf ","; + else { + if (i < total - 1) printf "],\n"; + else printf "]"; + } + i++; + }' + +cat << EOF + + ] +} +EOF +) > "results/${MACHINE}.json" + +echo "Results have been saved to results/${MACHINE}.json" diff --git a/spark-comet/process_results.py b/spark-comet/process_results.py deleted file mode 100755 index 2f838e7b7..000000000 --- a/spark-comet/process_results.py +++ /dev/null @@ -1,66 +0,0 @@ -#!/usr/bin/env python3 - -import json -import re -import subprocess -import sys -from datetime import date - - -def get_data_size(): - # basically a duplicate of `benchmark.sh` - result = subprocess.run(["du", "-b", "hits.parquet"], capture_output=True, text=True) - return int(result.stdout.split()[0]) - - -def process_times(log_file): - results = [] - current_array = [] - - with open(log_file) as f: - for line in f: - if line.startswith("Time: "): - value = float(line.strip().replace("Time: ", "")) - current_array.append(value) - elif line.startswith("Failure!"): - current_array.append(None) - - if len(current_array) == 3: - results.append(current_array) - current_array = [] - - return results - - -def get_comment(): - with open("benchmark.sh") as f: - for line in f: - if "comet.jar" in line: - comet_version = re.search(r"(.{5}).jar", line).group(1) - elif "pyspark" in line: - pyspark_version = re.search(r"pyspark==([^\s]+)", line).group(1) - return f"Using Comet {comet_version} with Spark {pyspark_version}" - - -def main(): - machine = sys.argv[1] if len(sys.argv) > 1 else "unknown" - - data = { - "system": "Spark (Comet)", - "date": date.today().isoformat(), - "machine": machine, - "cluster_size": 1, - "proprietary": "no", - "tuned": "no", - "comment": get_comment(), - "tags": ["Java", "Rust", "column-oriented", "Spark derivative", "DataFusion", "Parquet"], - "load_time": 0, - "data_size": get_data_size(), - "result": process_times("log.txt"), - } - - print(json.dumps(data, indent=4)) - - -if __name__ == "__main__": - main() From 44d799f2eaf2ceffa919eeafd2ac64239db31bab Mon Sep 17 00:00:00 2001 From: Iskander14yo Date: Sun, 27 Jul 2025 14:48:58 +0300 Subject: [PATCH 13/19] update docs --- spark-comet/README.md | 20 +++++++++++++------- spark/README-accelerators.md | 24 +++++++++++------------- 2 files changed, 24 insertions(+), 20 deletions(-) diff --git a/spark-comet/README.md b/spark-comet/README.md index 52176dc8d..be0d0d0d4 100644 --- a/spark-comet/README.md +++ b/spark-comet/README.md @@ -1,6 +1,11 @@ -For basic information, check the [spark-accelerators README](../spark/README-accelerators.md). +This README includes info on configuring Comet for ClickBench. For additional details, please refer to [Comet's docs](https://datafusion.apache.org/comet/user-guide/overview.html), [spark-accelerators README](../spark/README-accelerators.md) and [issue](https://github.com/apache/datafusion-comet/issues/2035) discussing the results. -- To find all unsupported queries from log.txt (requires `spark.comet.explainFallback.enabled=True` and better set `spark.sql.debug.maxToStringFields` to arbitrary big number like `10000`): +### Run + +As usual, benchmark can be run via `./benchmark.sh`. Additionally, users can provide machine spec like `./benchmark.sh c6a.8xlarge` so script saves it in relevant file. + +### Tips +- To find all unsupported queries from `log.txt` (requires running bench in debug mode): ```bash >>> grep -P "\[COMET:" log.txt | sed -e 's/^[ \t]*//' | sort | uniq -c @@ -14,8 +19,9 @@ For basic information, check the [spark-accelerators README](../spark/README-acc - Check [here](https://datafusion.apache.org/comet/user-guide/installation.html#run-spark-shell-with-comet-enabled) for _basic Comet configuration_. ### Configuration -- Comet requires dedicated memory allocation, which can be provided either through memoryOverhead or off-heap memory. The [documentation recommends](https://datafusion.apache.org/comet/user-guide/tuning.html#configuring-comet-memory-in-off-heap-mode) using off-heap memory, which is the approach used in ClickBench. -Therefore, we need to split memory between heap (for Spark) and off-heap (for Comet). For both TPC-H and TPC-DS benchmarks, Comet documentation suggests a 50/50 proportion -(see [here](https://datafusion.apache.org/comet/contributor-guide/benchmarking.html)). Seems that this allocation is appropriate to support fallback to Spark execution when Comet can't handle certain operations, which is also relevant for ClickBench. -- `spark.driver.extraClassPath` is set so Comet doesn't fail on some queries (check [here](https://datafusion.apache.org/comet/user-guide/installation.html#additional-configuration) for info). -- `spark.comet.regexp.allowIncompatible` is set to `True` to allow using incompatible regular expressions so Comet doesn't fall back to Spark (check [here](https://datafusion.apache.org/comet/user-guide/compatibility.html#regular-expressions) for info). +- Comet requires a __dedicated memory pool__, which can be allocated either through memoryOverhead or off-heap memory. The [documentation recommends](https://datafusion.apache.org/comet/user-guide/tuning.html#configuring-comet-memory-in-off-heap-mode) using off-heap memory, which is the approach used in ClickBench. +Therefore, we need to split available memory between heap (for Spark) and off-heap (for Comet). For both TPC-H and TPC-DS benchmarks, Comet documentation [suggests](https://datafusion.apache.org/comet/contributor-guide/benchmarking.html) a 50/50 split. This allocation appears to be chosen to support Spark execution as well — it happens when Comet can't handle certain operators, which is also relevant for ClickBench. +- `spark.driver.extraClassPath` is set to prevent Comet from failing on certain queries (check [docs](https://datafusion.apache.org/comet/user-guide/installation.html#additional-configuration) for details). +- `spark.comet.regexp.allowIncompatible=True` allows Comet to use incompatible regular expressions instead of falling back to Spark (check [docs](https://datafusion.apache.org/comet/user-guide/compatibility.html#regular-expressions) for details). +- `spark.comet.scan.allowIncompatible=True` allows Comet to use different parquet readers to prevent query failures (check [issue](https://github.com/apache/datafusion-comet/issues/2035#issuecomment-3090666597) for details).
+What happens here: `hits.parquet` contains `short` type columns, so Comet uses the `native_comet` reader for compatibility. However, this reader [doesn't work](https://github.com/apache/datafusion-comet/issues/2038) for some queries (e.g., query 40), causing Comet to fail. As a workaround, we can allow Comet to produce results incompatible with Spark, enabling it to use [other readers](https://datafusion.apache.org/comet/user-guide/compatibility.html#parquet-scans) and successfully execute these queries. diff --git a/spark/README-accelerators.md b/spark/README-accelerators.md index 1572194fb..cbc097a17 100644 --- a/spark/README-accelerators.md +++ b/spark/README-accelerators.md @@ -1,30 +1,28 @@ -Several Spark plugins and extensions claim to offer significantly better performance than vanilla Spark for analytical workloads. -They share common characteristics, which are documented here for convenience. +Several Spark plugins and extensions claim to offer significantly better performance than vanilla Spark for analytical workloads. [Original PR.](https://github.com/ClickHouse/ClickBench/issues/517) Currently implemented in ClickBench: -- [Apache Comet](https://datafusion.apache.org/comet/user-guide/overview.html) (`spark-comet` folder) +- [Apache Comet](https://datafusion.apache.org/comet/user-guide/overview.html) (`spark-comet/` folder) -There are some considerations when working with these accelerators. Several have already been discussed [here](https://github.com/ClickHouse/ClickBench/issues/517). This README provides general guidance and may be supplemented by READMEs in individual engine folders. +There are some considerations when working with these accelerators. This README provides general guidance and may be supplemented by READMEs in individual engine folders. ### General -Some of these backends set goal to provide performance improvements with no _code rewrites_. Therefore, an objective is to run the "same Spark" with minimal modifications (except for setup and configuration adjustments). In practice, this means changes should be limited to `benchmark.sh` and `query.py`. +- Engines under `spark-*/` share a base structure derived from `spark/`. Where possible, improvements or edits should be __synced__ across all these engines — but __with caution__, as optimizations may not generalize. +- Some of these backends set goal to work with no _code rewrites_. Therefore, it'd be great to run the "same Spark" with __minimal modifications__ (except for setup and configuration adjustments). +- Although some accelerators aim to maintain __compatibility__ with Spark (e.g., Comet), this is __not__ critical in the context of ClickBench. Therefore, it is acceptable to _disable_ such settings if they hinder engine performance (usually due to causing fallbacks or opting for less efficient but compatible implementations). For examples of such settings, refer to [Comet's docs](https://datafusion.apache.org/comet/user-guide/compatibility.html) or [Comet's README](../spark-comet/README.md#configuration). ### Setting Up -- Java installation is still required as Spark handles all operations except computation (and sometimes computation as well). -- Spark version has to be selected from a supported list (versions are likely restricted for internal API compatibility). This list is typically available in the documentation. -The current approach is to use the "latest stable Spark version" for each engine. -- These engines utilize Spark's plugin/extension system, therefore represent a `.jar` file. The appropriate `.jar` (engine) version usually depend on the Spark version, Scala version, and other factors. +- Java installation is _still required_ as Spark handles all operations except computation (and sometimes computation as well). +- There is a strong dependency between Spark, Java, Scala, and accelerator __versions__ (likely for internal API compatibility). Version compatibility lists are typically available in each engine's documentation. +The current approach is to use the _"latest stable Spark version"_ for each engine. ### Configuration -- Resource allocation should follow existing Spark configuration to provide results' fairness. While `cores` allocation is straightforward, memory needs to be divided between heap (for Spark) and off-heap/overhead (for engines). It's safe to base configuration on the documentation recommendations/examples. -- `SparkSession` configuration typically requires settings like `spark.jars`, `spark.plugins`, and other parameters to enable engines. -- Remember to disable debug mode (most engines can explain fallbacks to Spark) to avoid performance overhead. +- __Resource allocation__ should follow existing Spark configuration to ensure fair results comparison. While `cores` allocation is rather straightforward, memory needs to be divided between heap (for Spark) and off-heap/memoryOverhead (for engines). It should be safe to base configuration on the engine's documentation recommendations/examples. This also allows to not overfit on the benchmark. ### Queries -These engines typically don't support the complete set of Spark operators, functions, or expressions. In such cases, they usually fall back gracefully to Spark execution or, rarely, fail due to semantic differences between Spark and engines. +These engines typically _don't_ support the complete set of Spark operators, functions, or expressions. In such cases, they usually _fall back_ gracefully to Spark execution or, rarely, _fail_ due to semantic differences between them and Spark. The standard approach is to use the default `queries.sql`. However, queries can be optionally rewritten for complete engine computation (see [here](https://github.com/ClickHouse/ClickBench/issues/517#issuecomment-3069121171) for discussion). From 412287a5e2557268982fc6020004a502cefbe3ae Mon Sep 17 00:00:00 2001 From: Robert Schulze Date: Mon, 28 Jul 2025 15:06:31 +0200 Subject: [PATCH 14/19] Update benchmark.sh --- spark-comet/benchmark.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark-comet/benchmark.sh b/spark-comet/benchmark.sh index 568cd445c..b386449bd 100755 --- a/spark-comet/benchmark.sh +++ b/spark-comet/benchmark.sh @@ -1,6 +1,6 @@ #!/bin/bash -# ⚠️ Derived from spark/benchmark.sh — keep in sync where possible (check README.md for the details). +# Note: Derived from spark/benchmark.sh — keep in sync where possible (check README.md for the details). # Current differences: # - pyspark==3.5.6 version is used (latest stable for Comet 0.9.0) # - Comet installation is added From 6bacaab3f527d71c8d945a06b951085507d70503 Mon Sep 17 00:00:00 2001 From: Robert Schulze Date: Mon, 28 Jul 2025 13:10:04 +0000 Subject: [PATCH 15/19] Cosmetics --- spark-comet/benchmark.sh | 3 ++- spark-comet/query.py | 3 ++- spark-comet/run.sh | 2 +- spark/benchmark.sh | 2 +- spark/query.py | 2 +- spark/run.sh | 2 +- 6 files changed, 8 insertions(+), 6 deletions(-) diff --git a/spark-comet/benchmark.sh b/spark-comet/benchmark.sh index b386449bd..e873d0399 100755 --- a/spark-comet/benchmark.sh +++ b/spark-comet/benchmark.sh @@ -1,6 +1,7 @@ #!/bin/bash -# Note: Derived from spark/benchmark.sh — keep in sync where possible (check README.md for the details). +# Note: Keep in sync with spark-*/benchmark.sh (see README-accelerators.md for details) +# # Current differences: # - pyspark==3.5.6 version is used (latest stable for Comet 0.9.0) # - Comet installation is added diff --git a/spark-comet/query.py b/spark-comet/query.py index fab518f4b..e70137cb1 100755 --- a/spark-comet/query.py +++ b/spark-comet/query.py @@ -1,7 +1,8 @@ #!/usr/bin/env python3 """ -⚠️ Derived from spark/query.py — keep in sync where possible (check README.md for the details). +Note: Keep in sync with spark-*/query.py (see README-accelerators.md for details) + Current differences: - memory is split between heap (for Spark) and off-heap (for Comet) - Comet configuration is added to `SparkSession` diff --git a/spark-comet/run.sh b/spark-comet/run.sh index d9b5db47b..8c9ca1289 100755 --- a/spark-comet/run.sh +++ b/spark-comet/run.sh @@ -1,6 +1,6 @@ #!/bin/bash -# ⚠️ Derived from spark/run.sh — keep in sync where possible (check README.md for the details). +# Note: Keep in sync with spark-*/run.sh (see README-accelerators.md for details) cat queries.sql | while read query; do sync diff --git a/spark/benchmark.sh b/spark/benchmark.sh index cbcb40eee..b2e9a5567 100755 --- a/spark/benchmark.sh +++ b/spark/benchmark.sh @@ -1,6 +1,6 @@ #!/bin/bash -# ⚠️ Used as a base for spark-*/benchmark.sh — keep in sync where possible (check README-accelerators.md for the details). +# Note: Keep in sync with spark-*/benchmark.sh (see README-accelerators.md for details) # Install diff --git a/spark/query.py b/spark/query.py index ca8f312e1..0b4861db6 100755 --- a/spark/query.py +++ b/spark/query.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 """ -⚠️ Used as a base for spark-*/query.py — keep in sync where possible (check README-accelerators.md for the details). +Note: Keep in sync with spark-*/query.sh (see README-accelerators.md for details) """ from pyspark.sql import SparkSession diff --git a/spark/run.sh b/spark/run.sh index 5076f9ca3..8c9ca1289 100755 --- a/spark/run.sh +++ b/spark/run.sh @@ -1,6 +1,6 @@ #!/bin/bash -# ⚠️ Used as a base for spark-*/run.sh — keep in sync where possible (check README-accelerators.md for the details). +# Note: Keep in sync with spark-*/run.sh (see README-accelerators.md for details) cat queries.sql | while read query; do sync From 8d7336e72c87ecafde252e2f7357a058c45e90f7 Mon Sep 17 00:00:00 2001 From: Robert Schulze Date: Mon, 28 Jul 2025 13:14:18 +0000 Subject: [PATCH 16/19] Update README --- spark/README-accelerators.md | 32 ++++++++++++++++++-------------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/spark/README-accelerators.md b/spark/README-accelerators.md index cbc097a17..1e3d82c74 100644 --- a/spark/README-accelerators.md +++ b/spark/README-accelerators.md @@ -1,28 +1,32 @@ -Several Spark plugins and extensions claim to offer significantly better performance than vanilla Spark for analytical workloads. [Original PR.](https://github.com/ClickHouse/ClickBench/issues/517) +Several Spark plugins and extensions claim to offer significantly better performance than vanilla Spark for analytical workloads, see the [discussion here](https://github.com/ClickHouse/ClickBench/issues/517). -Currently implemented in ClickBench: -- [Apache Comet](https://datafusion.apache.org/comet/user-guide/overview.html) (`spark-comet/` folder) +Currently available in ClickBench: +- [Apache Comet](https://datafusion.apache.org/comet/user-guide/overview.html) (folder `spark-comet/`) -There are some considerations when working with these accelerators. This README provides general guidance and may be supplemented by READMEs in individual engine folders. +There are some considerations when working with these accelerators. +This README provides general guidance and may be supplemented by READMEs in individual engine folders. ### General -- Engines under `spark-*/` share a base structure derived from `spark/`. Where possible, improvements or edits should be __synced__ across all these engines — but __with caution__, as optimizations may not generalize. -- Some of these backends set goal to work with no _code rewrites_. Therefore, it'd be great to run the "same Spark" with __minimal modifications__ (except for setup and configuration adjustments). -- Although some accelerators aim to maintain __compatibility__ with Spark (e.g., Comet), this is __not__ critical in the context of ClickBench. Therefore, it is acceptable to _disable_ such settings if they hinder engine performance (usually due to causing fallbacks or opting for less efficient but compatible implementations). For examples of such settings, refer to [Comet's docs](https://datafusion.apache.org/comet/user-guide/compatibility.html) or [Comet's README](../spark-comet/README.md#configuration). +- Engines under `spark-*/` share a base structure derived from `spark/`. Where possible, improvements or edits should be synced across all these engines - but with caution, as optimizations may not generalize. +- Some of these backends set goal to work without code rewrites. Therefore, it'd be great to run the "same Spark" with minimal modifications (except for setup and configuration adjustments). +- Although some accelerators aim to maintain compatibility with Spark (e.g., Comet), this is not critical in the context of ClickBench. Therefore, it is acceptable to disable such settings if they hinder engine performance (usually due to causing fallbacks or opting for less efficient but compatible implementations). For examples of such settings, refer to [Comet's docs](https://datafusion.apache.org/comet/user-guide/compatibility.html) or [Comet's README](../spark-comet/README.md#configuration). -### Setting Up +### Setup -- Java installation is _still required_ as Spark handles all operations except computation (and sometimes computation as well). -- There is a strong dependency between Spark, Java, Scala, and accelerator __versions__ (likely for internal API compatibility). Version compatibility lists are typically available in each engine's documentation. -The current approach is to use the _"latest stable Spark version"_ for each engine. +- Java installation is still required as Spark handles all operations except computation (and sometimes computation as well). +- There is a strong dependency between Spark, Java, Scala, and accelerator versions (likely for internal API compatibility). Version compatibility lists are typically available in each engine's documentation. + +The current approach is to use the latest stable Spark version for each engine. ### Configuration -- __Resource allocation__ should follow existing Spark configuration to ensure fair results comparison. While `cores` allocation is rather straightforward, memory needs to be divided between heap (for Spark) and off-heap/memoryOverhead (for engines). It should be safe to base configuration on the engine's documentation recommendations/examples. This also allows to not overfit on the benchmark. +- Resource allocation should follow existing Spark configuration to ensure fair results comparison. While `cores` allocation is rather straightforward, memory needs to be divided between heap (for Spark) and off-heap/memoryOverhead (for engines). It should be safe to base configuration on the engine's documentation recommendations / examples. This also allows to not overfit on the benchmark. ### Queries -These engines typically _don't_ support the complete set of Spark operators, functions, or expressions. In such cases, they usually _fall back_ gracefully to Spark execution or, rarely, _fail_ due to semantic differences between them and Spark. +These engines typically don't support the complete set of Spark operators, functions, or expressions. +In such cases, they usually fall back gracefully to Spark execution or, rarely, fail due to semantic differences between them and Spark. -The standard approach is to use the default `queries.sql`. However, queries can be optionally rewritten for complete engine computation (see [here](https://github.com/ClickHouse/ClickBench/issues/517#issuecomment-3069121171) for discussion). +The standard approach is to use the default `queries.sql`. +However, queries can be optionally rewritten for complete engine computation (see [here](https://github.com/ClickHouse/ClickBench/issues/517#issuecomment-3069121171) for discussion). From 3aa173c8267e9f45e9f180df2500e524a47b24e2 Mon Sep 17 00:00:00 2001 From: Robert Schulze Date: Mon, 28 Jul 2025 13:20:36 +0000 Subject: [PATCH 17/19] Fix results formatting --- spark-comet/results/c6a.4xlarge.json | 269 +++++---------------------- 1 file changed, 45 insertions(+), 224 deletions(-) diff --git a/spark-comet/results/c6a.4xlarge.json b/spark-comet/results/c6a.4xlarge.json index 7dfb82516..6596690dd 100644 --- a/spark-comet/results/c6a.4xlarge.json +++ b/spark-comet/results/c6a.4xlarge.json @@ -6,231 +6,52 @@ "proprietary": "no", "tuned": "no", "comment": "Using Comet 0.9.0 with Spark 3.5.6", - "tags": [ - "Java", - "Rust", - "column-oriented", - "Spark derivative", - "DataFusion", - "Parquet" - ], + "tags": ["Java", "Rust", "column-oriented", "Spark derivative", "DataFusion", "Parquet"], "load_time": 0, "data_size": 14779976446, "result": [ - [ - 4.472266616999946, - 2.1039232120000406, - 1.8430970299999672 - ], - [ - 3.578030586000068, - 1.0736136329999226, - 0.9344627739999396 - ], - [ - 6.64499863900005, - 1.4128532029999405, - 1.1755133540000315 - ], - [ - 4.696922097999959, - 2.0041085599999633, - 1.7866440060000741 - ], - [ - 5.631326217000037, - 2.6193342880000046, - 2.494008354000016 - ], - [ - 6.264381856999989, - 2.870651210999995, - 2.715818182000021 - ], - [ - 5.016372360999981, - 2.275984874999949, - 2.11629320499992 - ], - [ - 3.756535059999919, - 1.121937813000045, - 0.9281246310000597 - ], - [ - 6.773856178999949, - 2.9831256090000124, - 2.7141978240000526 - ], - [ - 13.568733273999896, - 4.030130065000094, - 3.534092967000106 - ], - [ - 5.836205211999982, - 2.355154799000047, - 2.1867549080000117 - ], - [ - 5.6480224209999506, - 1.5667226559999108, - 1.407957185999976 - ], - [ - 6.183444781000048, - 3.122852286000011, - 2.7804966820000345 - ], - [ - 10.864835242000026, - 5.143748008999978, - 4.993124574000035 - ], - [ - 5.930140444000017, - 2.934907199999998, - 2.7044927689998985 - ], - [ - 5.991562084000066, - 3.1449877329999936, - 2.7900950980000516 - ], - [ - 8.38386990999993, - 4.513304905000041, - 4.088832370999967 - ], - [ - 7.609503776999986, - 3.542259543, - 3.492194361999964 - ], - [ - 16.28575477000004, - 12.083004838999955, - 11.858153362000053 - ], - [ - 4.626656580000031, - 2.276965354999902, - 2.057529632000069 - ], - [ - 21.794626840999967, - 3.180863176999992, - 3.0697125969999206 - ], - [ - 25.802874348000046, - 3.6175597540000126, - 3.35841635099996 - ], - [ - 47.73140455199996, - 5.476737628000137, - 5.387913136999941 - ], - [ - 112.79471081299994, - 20.44997842099997, - 20.447969489999878 - ], - [ - 8.229423031999886, - 2.295753730000115, - 2.258644013999856 - ], - [ - 5.093002269999943, - 2.1934522190001644, - 2.009188600000016 - ], - [ - 8.189870422000013, - 2.316829217000077, - 2.3093284550000135 - ], - [ - 22.221672595999962, - 3.8472844349998923, - 3.526627628999904 - ], - [ - 24.93743621999988, - 17.81829874799996, - 17.44794403999981 - ], - [ - 6.5120100659999025, - 3.486931069999855, - 3.336994612000126 - ], - [ - 11.685128793000104, - 2.5526158909999594, - 2.231654909999861 - ], - [ - 21.547016347999943, - 3.102731389999917, - 2.8722797610000725 - ], - [ - 18.465872553999816, - 8.19440798100004, - 7.955121861999942 - ], - [ - 24.087333387999934, - 8.077702382000098, - 7.767284494000023 - ], - [ - 24.110849956000038, - 8.102607654999929, - 7.702735385000096 - ], - [ - 6.394193387999849, - 3.394644223999876, - 3.071767507000004 - ], - [ - 32.970651810999925, - 4.560951911000075, - 4.3527405339998495 - ], - [ - 26.42749080500016, - 4.248164862999829, - 4.1698636720000195 - ], - [ - 37.74051866500008, - 4.908241721000195, - 4.685314071999983 - ], - [ - null, - null, - null - ], - [ - 13.638310121999893, - 1.8678283230001398, - 1.4909935100001803 - ], - [ - 25.261886403999824, - 2.0152608360001523, - 1.763202008999997 - ], - [ - 8.614786060000142, - 1.8395530959999178, - 1.526004198999999 - ] + [4.472, 2.103, 1.843], + [3.578, 1.073, 0.934], + [6.644, 1.412, 1.175], + [4.696, 2.004, 1.786], + [5.631, 2.619, 2.494], + [6.264, 2.870, 2.715], + [5.0163, 2.275, 2.116], + [3.756, 1.121, 0.928], + [6.773, 2.983, 2.714], + [13.568, 4.030, 3.534], + [5.836, 2.355, 2.186], + [5.648, 1.566, 1.407], + [6.183, 3.122, 2.780], + [10.864, 5.143, 4.993], + [5.930, 2.934, 2.704], + [5.991, 3.144, 2.790], + [8.383, 4.513, 4.088], + [7.609, 3.542, 3.492], + [16.285, 12.08, 11.858], + [4.626, 2.276, 2.057], + [21.794, 3.180, 3.069], + [25.802, 3.617, 3.358], + [47.731, 5.476, 5.387], + [112.794, 20.449, 20.447], + [8.229, 2.295, 2.258], + [5.093, 2.193, 2.009], + [8.189, 2.316, 2.309], + [22.221, 3.847, 3.526], + [24.937, 17.818, 17.447], + [6.512, 3.486, 3.336], + [11.685, 2.552, 2.231], + [21.547, 3.102, 2.872], + [18.465, 8.194, 7.955], + [24.087, 8.077, 7.767], + [24.110, 8.102, 7.702], + [6.394, 3.394, 3.071], + [32.970, 4.560, 4.352], + [26.427, 4.248, 4.169], + [37.740, 4.908, 4.685], + [null, null, null], + [13.638, 1.867, 1.490], + [25.261, 2.015, 1.763], + [8.614, 1.839, 1.526] ] -} \ No newline at end of file +} From b15d4151330bc9783441cf6fff1535e2a1f88332 Mon Sep 17 00:00:00 2001 From: Robert Schulze Date: Mon, 28 Jul 2025 13:28:04 +0000 Subject: [PATCH 18/19] Cosmetics --- spark-comet/benchmark.sh | 2 +- spark-comet/query.py | 10 ++++++---- spark/query.py | 4 ++-- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/spark-comet/benchmark.sh b/spark-comet/benchmark.sh index e873d0399..1ca1d8b6a 100755 --- a/spark-comet/benchmark.sh +++ b/spark-comet/benchmark.sh @@ -68,7 +68,7 @@ EOF cat log.txt | grep -P '^Time:\s+([\d\.]+)|Failure!' | sed -r -e 's/Time: //; s/^Failure!$/null/' | awk -v total=$(grep -cP '^Time:\s+[\d\.]+|Failure!' log.txt) ' - { + { if (i % 3 == 0) printf "\t\t["; printf $1; if (i % 3 != 2) printf ","; diff --git a/spark-comet/query.py b/spark-comet/query.py index e70137cb1..f35c4353e 100755 --- a/spark-comet/query.py +++ b/spark-comet/query.py @@ -9,14 +9,15 @@ - debug mode is added """ +from pyspark.sql import SparkSession +import pyspark.sql.functions as F + import os +import psutil import re import sys import timeit -import psutil -import pyspark.sql.functions as F -from pyspark.sql import SparkSession query = sys.stdin.read() # Replace \1 to $1 because spark recognizes only this pattern style (in query 28) @@ -37,7 +38,7 @@ .config("spark.driver.memory", f"{heap}g") # Set amount of memory SparkSession can use .config("spark.sql.parquet.binaryAsString", True) # Treat binary as string to get correct length calculations and text results - # Comet configuration + # Additional Comet configuration .config("spark.jars", "comet.jar") .config("spark.driver.extraClassPath", "comet.jar") .config("spark.plugins", "org.apache.spark.CometPlugin") @@ -48,6 +49,7 @@ .config("spark.comet.scan.allowIncompatible", True) ) +# Even more Comet configuration if os.getenv("DEBUG") == "1": builder.config("spark.comet.explainFallback.enabled", "true") builder.config("spark.sql.debug.maxToStringFields", "10000") diff --git a/spark/query.py b/spark/query.py index 0b4861db6..a58177dfd 100755 --- a/spark/query.py +++ b/spark/query.py @@ -7,10 +7,10 @@ from pyspark.sql import SparkSession import pyspark.sql.functions as F -import timeit import psutil -import sys import re +import sys +import timeit query = sys.stdin.read() # Replace \1 to $1 because spark recognizes only this pattern style (in query 28) From 4e19e8dcbbf20f998a9ef4d4afc5338ead0f1b47 Mon Sep 17 00:00:00 2001 From: Iskander Date: Thu, 31 Jul 2025 01:15:22 +0300 Subject: [PATCH 19/19] Fix results formatting: truncate to 3 digits --- spark-comet/benchmark.sh | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/spark-comet/benchmark.sh b/spark-comet/benchmark.sh index 1ca1d8b6a..94d34ffea 100755 --- a/spark-comet/benchmark.sh +++ b/spark-comet/benchmark.sh @@ -70,8 +70,9 @@ cat log.txt | grep -P '^Time:\s+([\d\.]+)|Failure!' | sed -r -e 's/Time: //; s/^ awk -v total=$(grep -cP '^Time:\s+[\d\.]+|Failure!' log.txt) ' { if (i % 3 == 0) printf "\t\t["; - printf $1; - if (i % 3 != 2) printf ","; + if ($1 == "null") printf "null"; + else printf "%.3f", $1; + if (i % 3 != 2) printf ", "; else { if (i < total - 1) printf "],\n"; else printf "]";