diff --git a/spark-comet/README.md b/spark-comet/README.md new file mode 100644 index 000000000..be0d0d0d4 --- /dev/null +++ b/spark-comet/README.md @@ -0,0 +1,27 @@ +This README includes info on configuring Comet for ClickBench. For additional details, please refer to [Comet's docs](https://datafusion.apache.org/comet/user-guide/overview.html), [spark-accelerators README](../spark/README-accelerators.md) and [issue](https://github.com/apache/datafusion-comet/issues/2035) discussing the results. + +### Run + +As usual, benchmark can be run via `./benchmark.sh`. Additionally, users can provide machine spec like `./benchmark.sh c6a.8xlarge` so script saves it in relevant file. + +### Tips +- To find all unsupported queries from `log.txt` (requires running bench in debug mode): +```bash +>>> grep -P "\[COMET:" log.txt | sed -e 's/^[ \t]*//' | sort | uniq -c + + 78 +- GlobalLimit [COMET: GlobalLimit is not supported] + 18 +- HashAggregate [COMET: Unsupported aggregation mode PartialMerge] + 123 +- HashAggregate [COMET: distinct aggregates are not supported] + ... +``` +- Check [here](https://datafusion.apache.org/comet/user-guide/installation.html#supported-spark-versions) for _version compatibility_ between Spark and Comet. +- Check [here](https://datafusion.apache.org/comet/user-guide/installation.html#using-a-published-jar-file) for _links to Comet jar_. +- Check [here](https://datafusion.apache.org/comet/user-guide/installation.html#run-spark-shell-with-comet-enabled) for _basic Comet configuration_. + +### Configuration +- Comet requires a __dedicated memory pool__, which can be allocated either through memoryOverhead or off-heap memory. The [documentation recommends](https://datafusion.apache.org/comet/user-guide/tuning.html#configuring-comet-memory-in-off-heap-mode) using off-heap memory, which is the approach used in ClickBench. +Therefore, we need to split available memory between heap (for Spark) and off-heap (for Comet). For both TPC-H and TPC-DS benchmarks, Comet documentation [suggests](https://datafusion.apache.org/comet/contributor-guide/benchmarking.html) a 50/50 split. This allocation appears to be chosen to support Spark execution as well — it happens when Comet can't handle certain operators, which is also relevant for ClickBench. +- `spark.driver.extraClassPath` is set to prevent Comet from failing on certain queries (check [docs](https://datafusion.apache.org/comet/user-guide/installation.html#additional-configuration) for details). +- `spark.comet.regexp.allowIncompatible=True` allows Comet to use incompatible regular expressions instead of falling back to Spark (check [docs](https://datafusion.apache.org/comet/user-guide/compatibility.html#regular-expressions) for details). +- `spark.comet.scan.allowIncompatible=True` allows Comet to use different parquet readers to prevent query failures (check [issue](https://github.com/apache/datafusion-comet/issues/2035#issuecomment-3090666597) for details).
+What happens here: `hits.parquet` contains `short` type columns, so Comet uses the `native_comet` reader for compatibility. However, this reader [doesn't work](https://github.com/apache/datafusion-comet/issues/2038) for some queries (e.g., query 40), causing Comet to fail. As a workaround, we can allow Comet to produce results incompatible with Spark, enabling it to use [other readers](https://datafusion.apache.org/comet/user-guide/compatibility.html#parquet-scans) and successfully execute these queries. diff --git a/spark-comet/benchmark.sh b/spark-comet/benchmark.sh new file mode 100755 index 000000000..94d34ffea --- /dev/null +++ b/spark-comet/benchmark.sh @@ -0,0 +1,90 @@ +#!/bin/bash + +# Note: Keep in sync with spark-*/benchmark.sh (see README-accelerators.md for details) +# +# Current differences: +# - pyspark==3.5.6 version is used (latest stable for Comet 0.9.0) +# - Comet installation is added +# - auto-save results + +# Install + +sudo apt-get update -y +sudo apt-get install -y python3-pip python3-venv openjdk-17-jdk + +export JAVA_HOME="/usr/lib/jvm/java-17-openjdk-$(dpkg --print-architecture)/" +export PATH=$JAVA_HOME/bin:$PATH + +python3 -m venv myenv +source myenv/bin/activate +pip install pyspark==3.5.6 psutil + +# Load the data + +wget --continue --progress=dot:giga 'https://datasets.clickhouse.com/hits_compatible/hits.parquet' + +# Install Comet + +COMET_JAR_URL='https://repo1.maven.org/maven2/org/apache/datafusion/comet-spark-spark3.5_2.12/0.9.0/comet-spark-spark3.5_2.12-0.9.0.jar' + +wget --continue --progress=dot:giga $COMET_JAR_URL -O comet.jar + +# Run the queries + +./run.sh 2>&1 | tee log.txt + +# Print results to stdout as required +cat log.txt | grep -P '^Time:\s+([\d\.]+)|Failure!' | sed -r -e 's/Time: //; s/^Failure!$/null/' | + awk '{ if (i % 3 == 0) { printf "[" }; printf $1; if (i % 3 != 2) { printf "," } else { print "]," }; ++i; }' + +DATA_SIZE=$(du -b hits.parquet | cut -f1) + +echo "Data size: $DATA_SIZE" +echo "Load time: 0" + +# Save results as JSON + +MACHINE="${1:-c6a.4xlarge}" # Use first argument as machine name, default to c6a.4xlarge +COMET_VERSION=$(echo $COMET_JAR_URL | grep -Po ".{5}(?=.jar)") +SPARK_VERSION=$(pip freeze | grep '^pyspark==' | cut -d '=' -f3) + +mkdir -p results + +( +cat << EOF +{ + "system": "Spark (Comet)", + "date": "$(date +%Y-%m-%d)", + "machine": "${MACHINE}", + "cluster_size": 1, + "proprietary": "no", + "tuned": "no", + "comment": "Using Comet ${COMET_VERSION} with Spark ${SPARK_VERSION}", + "tags": ["Java", "Rust", "column-oriented", "Spark derivative", "DataFusion", "Parquet"], + "load_time": 0, + "data_size": ${DATA_SIZE}, + "result": [ +EOF + +cat log.txt | grep -P '^Time:\s+([\d\.]+)|Failure!' | sed -r -e 's/Time: //; s/^Failure!$/null/' | + awk -v total=$(grep -cP '^Time:\s+[\d\.]+|Failure!' log.txt) ' + { + if (i % 3 == 0) printf "\t\t["; + if ($1 == "null") printf "null"; + else printf "%.3f", $1; + if (i % 3 != 2) printf ", "; + else { + if (i < total - 1) printf "],\n"; + else printf "]"; + } + i++; + }' + +cat << EOF + + ] +} +EOF +) > "results/${MACHINE}.json" + +echo "Results have been saved to results/${MACHINE}.json" diff --git a/spark-comet/queries.sql b/spark-comet/queries.sql new file mode 100644 index 000000000..31f65fc89 --- /dev/null +++ b/spark-comet/queries.sql @@ -0,0 +1,43 @@ +SELECT COUNT(*) FROM hits; +SELECT COUNT(*) FROM hits WHERE AdvEngineID <> 0; +SELECT SUM(AdvEngineID), COUNT(*), AVG(ResolutionWidth) FROM hits; +SELECT AVG(UserID) FROM hits; +SELECT COUNT(DISTINCT UserID) FROM hits; +SELECT COUNT(DISTINCT SearchPhrase) FROM hits; +SELECT MIN(EventDate), MAX(EventDate) FROM hits; +SELECT AdvEngineID, COUNT(*) FROM hits WHERE AdvEngineID <> 0 GROUP BY AdvEngineID ORDER BY COUNT(*) DESC; +SELECT RegionID, COUNT(DISTINCT UserID) AS u FROM hits GROUP BY RegionID ORDER BY u DESC LIMIT 10; +SELECT RegionID, SUM(AdvEngineID), COUNT(*) AS c, AVG(ResolutionWidth), COUNT(DISTINCT UserID) FROM hits GROUP BY RegionID ORDER BY c DESC LIMIT 10; +SELECT MobilePhoneModel, COUNT(DISTINCT UserID) AS u FROM hits WHERE MobilePhoneModel <> '' GROUP BY MobilePhoneModel ORDER BY u DESC LIMIT 10; +SELECT MobilePhone, MobilePhoneModel, COUNT(DISTINCT UserID) AS u FROM hits WHERE MobilePhoneModel <> '' GROUP BY MobilePhone, MobilePhoneModel ORDER BY u DESC LIMIT 10; +SELECT SearchPhrase, COUNT(*) AS c FROM hits WHERE SearchPhrase <> '' GROUP BY SearchPhrase ORDER BY c DESC LIMIT 10; +SELECT SearchPhrase, COUNT(DISTINCT UserID) AS u FROM hits WHERE SearchPhrase <> '' GROUP BY SearchPhrase ORDER BY u DESC LIMIT 10; +SELECT SearchEngineID, SearchPhrase, COUNT(*) AS c FROM hits WHERE SearchPhrase <> '' GROUP BY SearchEngineID, SearchPhrase ORDER BY c DESC LIMIT 10; +SELECT UserID, COUNT(*) FROM hits GROUP BY UserID ORDER BY COUNT(*) DESC LIMIT 10; +SELECT UserID, SearchPhrase, COUNT(*) FROM hits GROUP BY UserID, SearchPhrase ORDER BY COUNT(*) DESC LIMIT 10; +SELECT UserID, SearchPhrase, COUNT(*) FROM hits GROUP BY UserID, SearchPhrase LIMIT 10; +SELECT UserID, extract(minute FROM EventTime) AS m, SearchPhrase, COUNT(*) FROM hits GROUP BY UserID, m, SearchPhrase ORDER BY COUNT(*) DESC LIMIT 10; +SELECT UserID FROM hits WHERE UserID = 435090932899640449; +SELECT COUNT(*) FROM hits WHERE URL LIKE '%google%'; +SELECT SearchPhrase, MIN(URL), COUNT(*) AS c FROM hits WHERE URL LIKE '%google%' AND SearchPhrase <> '' GROUP BY SearchPhrase ORDER BY c DESC LIMIT 10; +SELECT SearchPhrase, MIN(URL), MIN(Title), COUNT(*) AS c, COUNT(DISTINCT UserID) FROM hits WHERE Title LIKE '%Google%' AND URL NOT LIKE '%.google.%' AND SearchPhrase <> '' GROUP BY SearchPhrase ORDER BY c DESC LIMIT 10; +SELECT * FROM hits WHERE URL LIKE '%google%' ORDER BY EventTime LIMIT 10; +SELECT SearchPhrase FROM hits WHERE SearchPhrase <> '' ORDER BY EventTime LIMIT 10; +SELECT SearchPhrase FROM hits WHERE SearchPhrase <> '' ORDER BY SearchPhrase LIMIT 10; +SELECT SearchPhrase FROM hits WHERE SearchPhrase <> '' ORDER BY EventTime, SearchPhrase LIMIT 10; +SELECT CounterID, AVG(length(URL)) AS l, COUNT(*) AS c FROM hits WHERE URL <> '' GROUP BY CounterID HAVING COUNT(*) > 100000 ORDER BY l DESC LIMIT 25; +SELECT REGEXP_REPLACE(Referer, '^https?://(?:www\.)?([^/]+)/.*$', '\1') AS k, AVG(length(Referer)) AS l, COUNT(*) AS c, MIN(Referer) FROM hits WHERE Referer <> '' GROUP BY k HAVING COUNT(*) > 100000 ORDER BY l DESC LIMIT 25; +SELECT SUM(ResolutionWidth), SUM(ResolutionWidth + 1), SUM(ResolutionWidth + 2), SUM(ResolutionWidth + 3), SUM(ResolutionWidth + 4), SUM(ResolutionWidth + 5), SUM(ResolutionWidth + 6), SUM(ResolutionWidth + 7), SUM(ResolutionWidth + 8), SUM(ResolutionWidth + 9), SUM(ResolutionWidth + 10), SUM(ResolutionWidth + 11), SUM(ResolutionWidth + 12), SUM(ResolutionWidth + 13), SUM(ResolutionWidth + 14), SUM(ResolutionWidth + 15), SUM(ResolutionWidth + 16), SUM(ResolutionWidth + 17), SUM(ResolutionWidth + 18), SUM(ResolutionWidth + 19), SUM(ResolutionWidth + 20), SUM(ResolutionWidth + 21), SUM(ResolutionWidth + 22), SUM(ResolutionWidth + 23), SUM(ResolutionWidth + 24), SUM(ResolutionWidth + 25), SUM(ResolutionWidth + 26), SUM(ResolutionWidth + 27), SUM(ResolutionWidth + 28), SUM(ResolutionWidth + 29), SUM(ResolutionWidth + 30), SUM(ResolutionWidth + 31), SUM(ResolutionWidth + 32), SUM(ResolutionWidth + 33), SUM(ResolutionWidth + 34), SUM(ResolutionWidth + 35), SUM(ResolutionWidth + 36), SUM(ResolutionWidth + 37), SUM(ResolutionWidth + 38), SUM(ResolutionWidth + 39), SUM(ResolutionWidth + 40), SUM(ResolutionWidth + 41), SUM(ResolutionWidth + 42), SUM(ResolutionWidth + 43), SUM(ResolutionWidth + 44), SUM(ResolutionWidth + 45), SUM(ResolutionWidth + 46), SUM(ResolutionWidth + 47), SUM(ResolutionWidth + 48), SUM(ResolutionWidth + 49), SUM(ResolutionWidth + 50), SUM(ResolutionWidth + 51), SUM(ResolutionWidth + 52), SUM(ResolutionWidth + 53), SUM(ResolutionWidth + 54), SUM(ResolutionWidth + 55), SUM(ResolutionWidth + 56), SUM(ResolutionWidth + 57), SUM(ResolutionWidth + 58), SUM(ResolutionWidth + 59), SUM(ResolutionWidth + 60), SUM(ResolutionWidth + 61), SUM(ResolutionWidth + 62), SUM(ResolutionWidth + 63), SUM(ResolutionWidth + 64), SUM(ResolutionWidth + 65), SUM(ResolutionWidth + 66), SUM(ResolutionWidth + 67), SUM(ResolutionWidth + 68), SUM(ResolutionWidth + 69), SUM(ResolutionWidth + 70), SUM(ResolutionWidth + 71), SUM(ResolutionWidth + 72), SUM(ResolutionWidth + 73), SUM(ResolutionWidth + 74), SUM(ResolutionWidth + 75), SUM(ResolutionWidth + 76), SUM(ResolutionWidth + 77), SUM(ResolutionWidth + 78), SUM(ResolutionWidth + 79), SUM(ResolutionWidth + 80), SUM(ResolutionWidth + 81), SUM(ResolutionWidth + 82), SUM(ResolutionWidth + 83), SUM(ResolutionWidth + 84), SUM(ResolutionWidth + 85), SUM(ResolutionWidth + 86), SUM(ResolutionWidth + 87), SUM(ResolutionWidth + 88), SUM(ResolutionWidth + 89) FROM hits; +SELECT SearchEngineID, ClientIP, COUNT(*) AS c, SUM(IsRefresh), AVG(ResolutionWidth) FROM hits WHERE SearchPhrase <> '' GROUP BY SearchEngineID, ClientIP ORDER BY c DESC LIMIT 10; +SELECT WatchID, ClientIP, COUNT(*) AS c, SUM(IsRefresh), AVG(ResolutionWidth) FROM hits WHERE SearchPhrase <> '' GROUP BY WatchID, ClientIP ORDER BY c DESC LIMIT 10; +SELECT WatchID, ClientIP, COUNT(*) AS c, SUM(IsRefresh), AVG(ResolutionWidth) FROM hits GROUP BY WatchID, ClientIP ORDER BY c DESC LIMIT 10; +SELECT URL, COUNT(*) AS c FROM hits GROUP BY URL ORDER BY c DESC LIMIT 10; +SELECT 1, URL, COUNT(*) AS c FROM hits GROUP BY 1, URL ORDER BY c DESC LIMIT 10; +SELECT ClientIP, ClientIP - 1, ClientIP - 2, ClientIP - 3, COUNT(*) AS c FROM hits GROUP BY ClientIP, ClientIP - 1, ClientIP - 2, ClientIP - 3 ORDER BY c DESC LIMIT 10; +SELECT URL, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND DontCountHits = 0 AND IsRefresh = 0 AND URL <> '' GROUP BY URL ORDER BY PageViews DESC LIMIT 10; +SELECT Title, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND DontCountHits = 0 AND IsRefresh = 0 AND Title <> '' GROUP BY Title ORDER BY PageViews DESC LIMIT 10; +SELECT URL, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND IsRefresh = 0 AND IsLink <> 0 AND IsDownload = 0 GROUP BY URL ORDER BY PageViews DESC LIMIT 10 OFFSET 1000; +SELECT TraficSourceID, SearchEngineID, AdvEngineID, CASE WHEN (SearchEngineID = 0 AND AdvEngineID = 0) THEN Referer ELSE '' END AS Src, URL AS Dst, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND IsRefresh = 0 GROUP BY TraficSourceID, SearchEngineID, AdvEngineID, Src, Dst ORDER BY PageViews DESC LIMIT 10 OFFSET 1000; +SELECT URLHash, EventDate, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND IsRefresh = 0 AND TraficSourceID IN (-1, 6) AND RefererHash = 3594120000172545465 GROUP BY URLHash, EventDate ORDER BY PageViews DESC LIMIT 10 OFFSET 100; +SELECT WindowClientWidth, WindowClientHeight, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND IsRefresh = 0 AND DontCountHits = 0 AND URLHash = 2868770270353813622 GROUP BY WindowClientWidth, WindowClientHeight ORDER BY PageViews DESC LIMIT 10 OFFSET 10000; +SELECT DATE_TRUNC('minute', EventTime) AS M, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-14' AND EventDate <= '2013-07-15' AND IsRefresh = 0 AND DontCountHits = 0 GROUP BY DATE_TRUNC('minute', EventTime) ORDER BY DATE_TRUNC('minute', EventTime) LIMIT 10 OFFSET 1000; diff --git a/spark-comet/query.py b/spark-comet/query.py new file mode 100755 index 000000000..f35c4353e --- /dev/null +++ b/spark-comet/query.py @@ -0,0 +1,74 @@ +#!/usr/bin/env python3 + +""" +Note: Keep in sync with spark-*/query.py (see README-accelerators.md for details) + +Current differences: +- memory is split between heap (for Spark) and off-heap (for Comet) +- Comet configuration is added to `SparkSession` +- debug mode is added +""" + +from pyspark.sql import SparkSession +import pyspark.sql.functions as F + +import os +import psutil +import re +import sys +import timeit + + +query = sys.stdin.read() +# Replace \1 to $1 because spark recognizes only this pattern style (in query 28) +query = re.sub(r"""(REGEXP_REPLACE\(.*?,\s*('[^']*')\s*,\s*)('1')""", r"\1'$1'", query) +print(query) + +# Calculate available memory to configurate SparkSession +ram = int(round(psutil.virtual_memory().available / (1024 ** 3) * 0.7)) +heap = ram // 2 +off_heap = ram - heap +print(f"SparkSession will use {heap} GB of heap and {off_heap} GB of off-heap memory") + +builder = ( + SparkSession + .builder + .appName("ClickBench") + .config("spark.driver", "local[*]") # To ensure using all cores + .config("spark.driver.memory", f"{heap}g") # Set amount of memory SparkSession can use + .config("spark.sql.parquet.binaryAsString", True) # Treat binary as string to get correct length calculations and text results + + # Additional Comet configuration + .config("spark.jars", "comet.jar") + .config("spark.driver.extraClassPath", "comet.jar") + .config("spark.plugins", "org.apache.spark.CometPlugin") + .config("spark.shuffle.manager", "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") + .config("spark.memory.offHeap.enabled", "true") + .config("spark.memory.offHeap.size", f"{off_heap}g") + .config("spark.comet.regexp.allowIncompatible", True) + .config("spark.comet.scan.allowIncompatible", True) +) + +# Even more Comet configuration +if os.getenv("DEBUG") == "1": + builder.config("spark.comet.explainFallback.enabled", "true") + builder.config("spark.sql.debug.maxToStringFields", "10000") + +spark = builder.getOrCreate() + +df = spark.read.parquet("hits.parquet") +# Do casting before creating the view so no need to change to unreadable integer dates in SQL +df = df.withColumn("EventTime", F.col("EventTime").cast("timestamp")) +df = df.withColumn("EventDate", F.date_add(F.lit("1970-01-01"), F.col("EventDate"))) +df.createOrReplaceTempView("hits") + +for try_num in range(3): + try: + start = timeit.default_timer() + result = spark.sql(query) + result.show(100) # some queries should return more than 20 rows which is the default show limit + end = timeit.default_timer() + print("Time: ", end - start) + except Exception as e: + print(e) + print("Failure!") diff --git a/spark-comet/results/c6a.4xlarge.json b/spark-comet/results/c6a.4xlarge.json new file mode 100644 index 000000000..6596690dd --- /dev/null +++ b/spark-comet/results/c6a.4xlarge.json @@ -0,0 +1,57 @@ +{ + "system": "Spark (Comet)", + "date": "2025-07-17", + "machine": "c6a.4xlarge", + "cluster_size": 1, + "proprietary": "no", + "tuned": "no", + "comment": "Using Comet 0.9.0 with Spark 3.5.6", + "tags": ["Java", "Rust", "column-oriented", "Spark derivative", "DataFusion", "Parquet"], + "load_time": 0, + "data_size": 14779976446, + "result": [ + [4.472, 2.103, 1.843], + [3.578, 1.073, 0.934], + [6.644, 1.412, 1.175], + [4.696, 2.004, 1.786], + [5.631, 2.619, 2.494], + [6.264, 2.870, 2.715], + [5.0163, 2.275, 2.116], + [3.756, 1.121, 0.928], + [6.773, 2.983, 2.714], + [13.568, 4.030, 3.534], + [5.836, 2.355, 2.186], + [5.648, 1.566, 1.407], + [6.183, 3.122, 2.780], + [10.864, 5.143, 4.993], + [5.930, 2.934, 2.704], + [5.991, 3.144, 2.790], + [8.383, 4.513, 4.088], + [7.609, 3.542, 3.492], + [16.285, 12.08, 11.858], + [4.626, 2.276, 2.057], + [21.794, 3.180, 3.069], + [25.802, 3.617, 3.358], + [47.731, 5.476, 5.387], + [112.794, 20.449, 20.447], + [8.229, 2.295, 2.258], + [5.093, 2.193, 2.009], + [8.189, 2.316, 2.309], + [22.221, 3.847, 3.526], + [24.937, 17.818, 17.447], + [6.512, 3.486, 3.336], + [11.685, 2.552, 2.231], + [21.547, 3.102, 2.872], + [18.465, 8.194, 7.955], + [24.087, 8.077, 7.767], + [24.110, 8.102, 7.702], + [6.394, 3.394, 3.071], + [32.970, 4.560, 4.352], + [26.427, 4.248, 4.169], + [37.740, 4.908, 4.685], + [null, null, null], + [13.638, 1.867, 1.490], + [25.261, 2.015, 1.763], + [8.614, 1.839, 1.526] + ] +} diff --git a/spark-comet/run.sh b/spark-comet/run.sh new file mode 100755 index 000000000..8c9ca1289 --- /dev/null +++ b/spark-comet/run.sh @@ -0,0 +1,10 @@ +#!/bin/bash + +# Note: Keep in sync with spark-*/run.sh (see README-accelerators.md for details) + +cat queries.sql | while read query; do + sync + echo 3 | sudo tee /proc/sys/vm/drop_caches >/dev/null + + ./query.py <<< "${query}" +done diff --git a/spark/README-accelerators.md b/spark/README-accelerators.md new file mode 100644 index 000000000..1e3d82c74 --- /dev/null +++ b/spark/README-accelerators.md @@ -0,0 +1,32 @@ +Several Spark plugins and extensions claim to offer significantly better performance than vanilla Spark for analytical workloads, see the [discussion here](https://github.com/ClickHouse/ClickBench/issues/517). + +Currently available in ClickBench: +- [Apache Comet](https://datafusion.apache.org/comet/user-guide/overview.html) (folder `spark-comet/`) + +There are some considerations when working with these accelerators. +This README provides general guidance and may be supplemented by READMEs in individual engine folders. + +### General + +- Engines under `spark-*/` share a base structure derived from `spark/`. Where possible, improvements or edits should be synced across all these engines - but with caution, as optimizations may not generalize. +- Some of these backends set goal to work without code rewrites. Therefore, it'd be great to run the "same Spark" with minimal modifications (except for setup and configuration adjustments). +- Although some accelerators aim to maintain compatibility with Spark (e.g., Comet), this is not critical in the context of ClickBench. Therefore, it is acceptable to disable such settings if they hinder engine performance (usually due to causing fallbacks or opting for less efficient but compatible implementations). For examples of such settings, refer to [Comet's docs](https://datafusion.apache.org/comet/user-guide/compatibility.html) or [Comet's README](../spark-comet/README.md#configuration). + +### Setup + +- Java installation is still required as Spark handles all operations except computation (and sometimes computation as well). +- There is a strong dependency between Spark, Java, Scala, and accelerator versions (likely for internal API compatibility). Version compatibility lists are typically available in each engine's documentation. + +The current approach is to use the latest stable Spark version for each engine. + +### Configuration + +- Resource allocation should follow existing Spark configuration to ensure fair results comparison. While `cores` allocation is rather straightforward, memory needs to be divided between heap (for Spark) and off-heap/memoryOverhead (for engines). It should be safe to base configuration on the engine's documentation recommendations / examples. This also allows to not overfit on the benchmark. + +### Queries + +These engines typically don't support the complete set of Spark operators, functions, or expressions. +In such cases, they usually fall back gracefully to Spark execution or, rarely, fail due to semantic differences between them and Spark. + +The standard approach is to use the default `queries.sql`. +However, queries can be optionally rewritten for complete engine computation (see [here](https://github.com/ClickHouse/ClickBench/issues/517#issuecomment-3069121171) for discussion). diff --git a/spark/benchmark.sh b/spark/benchmark.sh index d8cf57c2c..b2e9a5567 100755 --- a/spark/benchmark.sh +++ b/spark/benchmark.sh @@ -1,5 +1,7 @@ #!/bin/bash +# Note: Keep in sync with spark-*/benchmark.sh (see README-accelerators.md for details) + # Install sudo apt-get update -y diff --git a/spark/query.py b/spark/query.py index 9da8c7dbc..a58177dfd 100755 --- a/spark/query.py +++ b/spark/query.py @@ -1,12 +1,16 @@ #!/usr/bin/env python3 +""" +Note: Keep in sync with spark-*/query.sh (see README-accelerators.md for details) +""" + from pyspark.sql import SparkSession import pyspark.sql.functions as F -import timeit import psutil -import sys import re +import sys +import timeit query = sys.stdin.read() # Replace \1 to $1 because spark recognizes only this pattern style (in query 28) diff --git a/spark/run.sh b/spark/run.sh index 64df8c608..8c9ca1289 100755 --- a/spark/run.sh +++ b/spark/run.sh @@ -1,5 +1,7 @@ #!/bin/bash +# Note: Keep in sync with spark-*/run.sh (see README-accelerators.md for details) + cat queries.sql | while read query; do sync echo 3 | sudo tee /proc/sys/vm/drop_caches >/dev/null