-
Notifications
You must be signed in to change notification settings - Fork 237
add Apache Comet #557
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
rschu1ze
merged 19 commits into
ClickHouse:main
from
Iskander14yo:add-engine/apache-comet
Aug 11, 2025
Merged
add Apache Comet #557
Changes from 7 commits
Commits
Show all changes
19 commits
Select commit
Hold shift + click to select a range
3eb0718
add code from spark
Iskander14yo 08db580
add base comet setup: installation and SparkSession
Iskander14yo d50055d
split memory: heap and off-heap
Iskander14yo 898fe78
auto-save results
Iskander14yo bb6961a
add docs for spark accelerators and Comet
Iskander14yo 282c85e
deal with unsupported things: regexp
Iskander14yo 970f3db
add results (c6a.4xlarge)
Iskander14yo 448bd3a
add debug mode
Iskander14yo e789a74
chores: formatter (ruff) edits
Iskander14yo c2c0d68
add note on syncing changes between engines
Iskander14yo a9f3a3b
fix: allow incompatible scan to avoid crashes
Iskander14yo 6f1c79a
refactor: move JSON generating to benchmark.sh, fix JSON formatting
Iskander14yo 44d799f
update docs
Iskander14yo 412287a
Update benchmark.sh
rschu1ze 6bacaab
Cosmetics
rschu1ze 8d7336e
Update README
rschu1ze 3aa173c
Fix results formatting
rschu1ze b15d415
Cosmetics
rschu1ze 4e19e8d
Fix results formatting: truncate to 3 digits
Iskander14yo File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,21 @@ | ||
| For basic information, check the [spark-accelerators README](../spark/README-accelerators.md). | ||
|
|
||
| - To find all unsupported queries from log.txt (requires `spark.comet.explainFallback.enabled=True` and better set `spark.sql.debug.maxToStringFields` to arbitrary big number like `10000`): | ||
| ```bash | ||
| >>> grep -P "\[COMET:" log.txt | sed -e 's/^[ \t]*//' | sort | uniq -c | ||
|
|
||
| 78 +- GlobalLimit [COMET: GlobalLimit is not supported] | ||
| 18 +- HashAggregate [COMET: Unsupported aggregation mode PartialMerge] | ||
| 123 +- HashAggregate [COMET: distinct aggregates are not supported] | ||
| ... | ||
| ``` | ||
| - Check [here](https://datafusion.apache.org/comet/user-guide/installation.html#supported-spark-versions) for _version compatibility_ between Spark and Comet. | ||
| - Check [here](https://datafusion.apache.org/comet/user-guide/installation.html#using-a-published-jar-file) for _links to Comet jar_. | ||
| - Check [here](https://datafusion.apache.org/comet/user-guide/installation.html#run-spark-shell-with-comet-enabled) for _basic Comet configuration_. | ||
|
|
||
| ### Configuration | ||
| - Comet requires dedicated memory allocation, which can be provided either through memoryOverhead or off-heap memory. The [documentation recommends](https://datafusion.apache.org/comet/user-guide/tuning.html#configuring-comet-memory-in-off-heap-mode) using off-heap memory, which is the approach used in ClickBench. | ||
| Therefore, we need to split memory between heap (for Spark) and off-heap (for Comet). For both TPC-H and TPC-DS benchmarks, Comet documentation suggests a 50/50 proportion | ||
| (see [here](https://datafusion.apache.org/comet/contributor-guide/benchmarking.html)). Seems that this allocation is appropriate to support fallback to Spark execution when Comet can't handle certain operations, which is also relevant for ClickBench. | ||
| - `spark.driver.extraClassPath` is set so Comet doesn't fail on some queries (check [here](https://datafusion.apache.org/comet/user-guide/installation.html#additional-configuration) for info). | ||
| - `spark.comet.regexp.allowIncompatible` is set to `True` to allow using incompatible regular expressions so Comet doesn't fall back to Spark (check [here](https://datafusion.apache.org/comet/user-guide/compatibility.html#regular-expressions) for info). | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,45 @@ | ||
| #!/bin/bash | ||
|
|
||
| # Differences with Spark setup (see README.md for details): | ||
Iskander14yo marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| # - pyspark==3.5.6 version is used (latest possible for Comet 0.9.0) | ||
| # - Comet installation is added | ||
| # - auto-save results | ||
|
|
||
| # Install | ||
|
|
||
| sudo apt-get update -y | ||
| sudo apt-get install -y python3-pip python3-venv openjdk-17-jdk | ||
|
|
||
| export JAVA_HOME="/usr/lib/jvm/java-17-openjdk-$(dpkg --print-architecture)/" | ||
rschu1ze marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| export PATH=$JAVA_HOME/bin:$PATH | ||
|
|
||
| python3 -m venv myenv | ||
| source myenv/bin/activate | ||
| pip install pyspark==3.5.6 psutil | ||
rschu1ze marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| # Load the data | ||
|
|
||
| wget --continue --progress=dot:giga 'https://datasets.clickhouse.com/hits_compatible/hits.parquet' | ||
|
|
||
| # Install Comet | ||
|
|
||
| wget --continue --progress=dot:giga 'https://repo1.maven.org/maven2/org/apache/datafusion/comet-spark-spark3.5_2.12/0.9.0/comet-spark-spark3.5_2.12-0.9.0.jar' -O comet.jar | ||
|
|
||
| # Run the queries | ||
|
|
||
| ./run.sh 2>&1 | tee log.txt | ||
|
|
||
| cat log.txt | grep -P '^Time:\s+([\d\.]+)|Failure!' | sed -r -e 's/Time: //; s/^Failure!$/null/' | | ||
| awk '{ if (i % 3 == 0) { printf "[" }; printf $1; if (i % 3 != 2) { printf "," } else { print "]," }; ++i; }' | ||
|
|
||
| # Save results | ||
|
|
||
| MACHINE="c6a.4xlarge" | ||
rschu1ze marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| mkdir -p results | ||
| ./process_results.py "$MACHINE" > "results/${MACHINE}.json" | ||
|
|
||
| echo "Results have been saved to results/${MACHINE}.json" | ||
|
|
||
| echo "Data size: $(du -b hits.parquet)" | ||
| echo "Load time: 0" | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,61 @@ | ||
| #!/usr/bin/env python3 | ||
Iskander14yo marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| import json | ||
| import sys | ||
| from datetime import date | ||
| import subprocess | ||
| import re | ||
|
|
||
| def get_data_size(): | ||
| # basically a duplicate of `benchmark.sh` | ||
| result = subprocess.run(['du', '-b', 'hits.parquet'], capture_output=True, text=True) | ||
| return int(result.stdout.split()[0]) | ||
|
|
||
| def process_times(log_file): | ||
| results = [] | ||
| current_array = [] | ||
|
|
||
Iskander14yo marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| with open(log_file) as f: | ||
| for line in f: | ||
| if line.startswith('Time: '): | ||
| value = float(line.strip().replace('Time: ', '')) | ||
| current_array.append(value) | ||
| elif line.startswith('Failure!'): | ||
| current_array.append(None) | ||
|
|
||
| if len(current_array) == 3: | ||
| results.append(current_array) | ||
| current_array = [] | ||
|
|
||
| return results | ||
|
|
||
| def get_comment(): | ||
| with open("benchmark.sh") as f: | ||
| for line in f: | ||
| if "comet.jar" in line: | ||
| comet_version = re.search(r"(.{5}).jar", line).group(1) | ||
| elif "pyspark" in line: | ||
| pyspark_version = re.search(r"pyspark==([^\s]+)", line).group(1) | ||
| return f"Using Comet {comet_version} with Spark {pyspark_version}" | ||
|
|
||
| def main(): | ||
| machine = sys.argv[1] if len(sys.argv) > 1 else "unknown" | ||
|
|
||
| data = { | ||
| "system": "Spark (Comet)", | ||
| "date": date.today().isoformat(), | ||
| "machine": machine, | ||
| "cluster_size": 1, | ||
| "proprietary": "no", | ||
| "tuned": "no", | ||
| "comment": get_comment(), | ||
| "tags": ["Java", "Rust", "column-oriented", "Spark derivative", "DataFusion", "Parquet"], | ||
| "load_time": 0, | ||
| "data_size": get_data_size(), | ||
| "result": process_times("log.txt") | ||
| } | ||
|
|
||
| print(json.dumps(data, indent=4)) | ||
|
|
||
| if __name__ == '__main__': | ||
| main() | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,43 @@ | ||
| SELECT COUNT(*) FROM hits; | ||
| SELECT COUNT(*) FROM hits WHERE AdvEngineID <> 0; | ||
| SELECT SUM(AdvEngineID), COUNT(*), AVG(ResolutionWidth) FROM hits; | ||
| SELECT AVG(UserID) FROM hits; | ||
| SELECT COUNT(DISTINCT UserID) FROM hits; | ||
| SELECT COUNT(DISTINCT SearchPhrase) FROM hits; | ||
| SELECT MIN(EventDate), MAX(EventDate) FROM hits; | ||
| SELECT AdvEngineID, COUNT(*) FROM hits WHERE AdvEngineID <> 0 GROUP BY AdvEngineID ORDER BY COUNT(*) DESC; | ||
| SELECT RegionID, COUNT(DISTINCT UserID) AS u FROM hits GROUP BY RegionID ORDER BY u DESC LIMIT 10; | ||
| SELECT RegionID, SUM(AdvEngineID), COUNT(*) AS c, AVG(ResolutionWidth), COUNT(DISTINCT UserID) FROM hits GROUP BY RegionID ORDER BY c DESC LIMIT 10; | ||
| SELECT MobilePhoneModel, COUNT(DISTINCT UserID) AS u FROM hits WHERE MobilePhoneModel <> '' GROUP BY MobilePhoneModel ORDER BY u DESC LIMIT 10; | ||
| SELECT MobilePhone, MobilePhoneModel, COUNT(DISTINCT UserID) AS u FROM hits WHERE MobilePhoneModel <> '' GROUP BY MobilePhone, MobilePhoneModel ORDER BY u DESC LIMIT 10; | ||
| SELECT SearchPhrase, COUNT(*) AS c FROM hits WHERE SearchPhrase <> '' GROUP BY SearchPhrase ORDER BY c DESC LIMIT 10; | ||
| SELECT SearchPhrase, COUNT(DISTINCT UserID) AS u FROM hits WHERE SearchPhrase <> '' GROUP BY SearchPhrase ORDER BY u DESC LIMIT 10; | ||
| SELECT SearchEngineID, SearchPhrase, COUNT(*) AS c FROM hits WHERE SearchPhrase <> '' GROUP BY SearchEngineID, SearchPhrase ORDER BY c DESC LIMIT 10; | ||
| SELECT UserID, COUNT(*) FROM hits GROUP BY UserID ORDER BY COUNT(*) DESC LIMIT 10; | ||
| SELECT UserID, SearchPhrase, COUNT(*) FROM hits GROUP BY UserID, SearchPhrase ORDER BY COUNT(*) DESC LIMIT 10; | ||
| SELECT UserID, SearchPhrase, COUNT(*) FROM hits GROUP BY UserID, SearchPhrase LIMIT 10; | ||
| SELECT UserID, extract(minute FROM EventTime) AS m, SearchPhrase, COUNT(*) FROM hits GROUP BY UserID, m, SearchPhrase ORDER BY COUNT(*) DESC LIMIT 10; | ||
| SELECT UserID FROM hits WHERE UserID = 435090932899640449; | ||
| SELECT COUNT(*) FROM hits WHERE URL LIKE '%google%'; | ||
| SELECT SearchPhrase, MIN(URL), COUNT(*) AS c FROM hits WHERE URL LIKE '%google%' AND SearchPhrase <> '' GROUP BY SearchPhrase ORDER BY c DESC LIMIT 10; | ||
| SELECT SearchPhrase, MIN(URL), MIN(Title), COUNT(*) AS c, COUNT(DISTINCT UserID) FROM hits WHERE Title LIKE '%Google%' AND URL NOT LIKE '%.google.%' AND SearchPhrase <> '' GROUP BY SearchPhrase ORDER BY c DESC LIMIT 10; | ||
| SELECT * FROM hits WHERE URL LIKE '%google%' ORDER BY EventTime LIMIT 10; | ||
| SELECT SearchPhrase FROM hits WHERE SearchPhrase <> '' ORDER BY EventTime LIMIT 10; | ||
| SELECT SearchPhrase FROM hits WHERE SearchPhrase <> '' ORDER BY SearchPhrase LIMIT 10; | ||
| SELECT SearchPhrase FROM hits WHERE SearchPhrase <> '' ORDER BY EventTime, SearchPhrase LIMIT 10; | ||
| SELECT CounterID, AVG(length(URL)) AS l, COUNT(*) AS c FROM hits WHERE URL <> '' GROUP BY CounterID HAVING COUNT(*) > 100000 ORDER BY l DESC LIMIT 25; | ||
| SELECT REGEXP_REPLACE(Referer, '^https?://(?:www\.)?([^/]+)/.*$', '\1') AS k, AVG(length(Referer)) AS l, COUNT(*) AS c, MIN(Referer) FROM hits WHERE Referer <> '' GROUP BY k HAVING COUNT(*) > 100000 ORDER BY l DESC LIMIT 25; | ||
| SELECT SUM(ResolutionWidth), SUM(ResolutionWidth + 1), SUM(ResolutionWidth + 2), SUM(ResolutionWidth + 3), SUM(ResolutionWidth + 4), SUM(ResolutionWidth + 5), SUM(ResolutionWidth + 6), SUM(ResolutionWidth + 7), SUM(ResolutionWidth + 8), SUM(ResolutionWidth + 9), SUM(ResolutionWidth + 10), SUM(ResolutionWidth + 11), SUM(ResolutionWidth + 12), SUM(ResolutionWidth + 13), SUM(ResolutionWidth + 14), SUM(ResolutionWidth + 15), SUM(ResolutionWidth + 16), SUM(ResolutionWidth + 17), SUM(ResolutionWidth + 18), SUM(ResolutionWidth + 19), SUM(ResolutionWidth + 20), SUM(ResolutionWidth + 21), SUM(ResolutionWidth + 22), SUM(ResolutionWidth + 23), SUM(ResolutionWidth + 24), SUM(ResolutionWidth + 25), SUM(ResolutionWidth + 26), SUM(ResolutionWidth + 27), SUM(ResolutionWidth + 28), SUM(ResolutionWidth + 29), SUM(ResolutionWidth + 30), SUM(ResolutionWidth + 31), SUM(ResolutionWidth + 32), SUM(ResolutionWidth + 33), SUM(ResolutionWidth + 34), SUM(ResolutionWidth + 35), SUM(ResolutionWidth + 36), SUM(ResolutionWidth + 37), SUM(ResolutionWidth + 38), SUM(ResolutionWidth + 39), SUM(ResolutionWidth + 40), SUM(ResolutionWidth + 41), SUM(ResolutionWidth + 42), SUM(ResolutionWidth + 43), SUM(ResolutionWidth + 44), SUM(ResolutionWidth + 45), SUM(ResolutionWidth + 46), SUM(ResolutionWidth + 47), SUM(ResolutionWidth + 48), SUM(ResolutionWidth + 49), SUM(ResolutionWidth + 50), SUM(ResolutionWidth + 51), SUM(ResolutionWidth + 52), SUM(ResolutionWidth + 53), SUM(ResolutionWidth + 54), SUM(ResolutionWidth + 55), SUM(ResolutionWidth + 56), SUM(ResolutionWidth + 57), SUM(ResolutionWidth + 58), SUM(ResolutionWidth + 59), SUM(ResolutionWidth + 60), SUM(ResolutionWidth + 61), SUM(ResolutionWidth + 62), SUM(ResolutionWidth + 63), SUM(ResolutionWidth + 64), SUM(ResolutionWidth + 65), SUM(ResolutionWidth + 66), SUM(ResolutionWidth + 67), SUM(ResolutionWidth + 68), SUM(ResolutionWidth + 69), SUM(ResolutionWidth + 70), SUM(ResolutionWidth + 71), SUM(ResolutionWidth + 72), SUM(ResolutionWidth + 73), SUM(ResolutionWidth + 74), SUM(ResolutionWidth + 75), SUM(ResolutionWidth + 76), SUM(ResolutionWidth + 77), SUM(ResolutionWidth + 78), SUM(ResolutionWidth + 79), SUM(ResolutionWidth + 80), SUM(ResolutionWidth + 81), SUM(ResolutionWidth + 82), SUM(ResolutionWidth + 83), SUM(ResolutionWidth + 84), SUM(ResolutionWidth + 85), SUM(ResolutionWidth + 86), SUM(ResolutionWidth + 87), SUM(ResolutionWidth + 88), SUM(ResolutionWidth + 89) FROM hits; | ||
| SELECT SearchEngineID, ClientIP, COUNT(*) AS c, SUM(IsRefresh), AVG(ResolutionWidth) FROM hits WHERE SearchPhrase <> '' GROUP BY SearchEngineID, ClientIP ORDER BY c DESC LIMIT 10; | ||
| SELECT WatchID, ClientIP, COUNT(*) AS c, SUM(IsRefresh), AVG(ResolutionWidth) FROM hits WHERE SearchPhrase <> '' GROUP BY WatchID, ClientIP ORDER BY c DESC LIMIT 10; | ||
| SELECT WatchID, ClientIP, COUNT(*) AS c, SUM(IsRefresh), AVG(ResolutionWidth) FROM hits GROUP BY WatchID, ClientIP ORDER BY c DESC LIMIT 10; | ||
| SELECT URL, COUNT(*) AS c FROM hits GROUP BY URL ORDER BY c DESC LIMIT 10; | ||
| SELECT 1, URL, COUNT(*) AS c FROM hits GROUP BY 1, URL ORDER BY c DESC LIMIT 10; | ||
| SELECT ClientIP, ClientIP - 1, ClientIP - 2, ClientIP - 3, COUNT(*) AS c FROM hits GROUP BY ClientIP, ClientIP - 1, ClientIP - 2, ClientIP - 3 ORDER BY c DESC LIMIT 10; | ||
| SELECT URL, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND DontCountHits = 0 AND IsRefresh = 0 AND URL <> '' GROUP BY URL ORDER BY PageViews DESC LIMIT 10; | ||
| SELECT Title, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND DontCountHits = 0 AND IsRefresh = 0 AND Title <> '' GROUP BY Title ORDER BY PageViews DESC LIMIT 10; | ||
| SELECT URL, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND IsRefresh = 0 AND IsLink <> 0 AND IsDownload = 0 GROUP BY URL ORDER BY PageViews DESC LIMIT 10 OFFSET 1000; | ||
| SELECT TraficSourceID, SearchEngineID, AdvEngineID, CASE WHEN (SearchEngineID = 0 AND AdvEngineID = 0) THEN Referer ELSE '' END AS Src, URL AS Dst, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND IsRefresh = 0 GROUP BY TraficSourceID, SearchEngineID, AdvEngineID, Src, Dst ORDER BY PageViews DESC LIMIT 10 OFFSET 1000; | ||
| SELECT URLHash, EventDate, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND IsRefresh = 0 AND TraficSourceID IN (-1, 6) AND RefererHash = 3594120000172545465 GROUP BY URLHash, EventDate ORDER BY PageViews DESC LIMIT 10 OFFSET 100; | ||
| SELECT WindowClientWidth, WindowClientHeight, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND IsRefresh = 0 AND DontCountHits = 0 AND URLHash = 2868770270353813622 GROUP BY WindowClientWidth, WindowClientHeight ORDER BY PageViews DESC LIMIT 10 OFFSET 10000; | ||
| SELECT DATE_TRUNC('minute', EventTime) AS M, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-14' AND EventDate <= '2013-07-15' AND IsRefresh = 0 AND DontCountHits = 0 GROUP BY DATE_TRUNC('minute', EventTime) ORDER BY DATE_TRUNC('minute', EventTime) LIMIT 10 OFFSET 1000; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,63 @@ | ||
| #!/usr/bin/env python3 | ||
|
|
||
| """ | ||
Iskander14yo marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| Differences with Spark setup (see README.md for details): | ||
| - memory is split between heap (for Spark) and off-heap (for Comet) | ||
rschu1ze marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| - Comet configuration is added to `SparkSession` | ||
| """ | ||
|
|
||
| from pyspark.sql import SparkSession | ||
| import pyspark.sql.functions as F | ||
|
|
||
| import timeit | ||
| import psutil | ||
| import sys | ||
| import re | ||
|
|
||
| query = sys.stdin.read() | ||
| # Replace \1 to $1 because spark recognizes only this pattern style (in query 28) | ||
| query = re.sub(r"""(REGEXP_REPLACE\(.*?,\s*('[^']*')\s*,\s*)('1')""", r"\1'$1'", query) | ||
| print(query) | ||
|
|
||
| # Calculate available memory to configurate SparkSession | ||
| ram = int(round(psutil.virtual_memory().available / (1024 ** 3) * 0.7)) | ||
| heap = ram // 2 | ||
| off_heap = ram - heap | ||
| print(f"SparkSession will use {heap} GB of heap and {off_heap} GB of off-heap memory") | ||
|
|
||
| spark = ( | ||
| SparkSession | ||
| .builder | ||
| .appName("ClickBench") | ||
| .config("spark.driver", "local[*]") # To ensure using all cores | ||
| .config("spark.driver.memory", f"{heap}g") # Set amount of memory SparkSession can use | ||
| .config("spark.sql.parquet.binaryAsString", True) # Treat binary as string to get correct length calculations and text results | ||
|
|
||
Iskander14yo marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| # Comet configuration | ||
| .config("spark.jars", "comet.jar") | ||
| .config("spark.driver.extraClassPath", "comet.jar") | ||
| .config("spark.plugins", "org.apache.spark.CometPlugin") | ||
| .config("spark.shuffle.manager", "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") | ||
| .config("spark.memory.offHeap.enabled", "true") | ||
| .config("spark.memory.offHeap.size", f"{off_heap}g") | ||
| .config("spark.comet.regexp.allowIncompatible", True) | ||
|
|
||
| .getOrCreate() | ||
| ) | ||
|
|
||
| df = spark.read.parquet("hits.parquet") | ||
| # Do casting before creating the view so no need to change to unreadable integer dates in SQL | ||
| df = df.withColumn("EventTime", F.col("EventTime").cast("timestamp")) | ||
| df = df.withColumn("EventDate", F.date_add(F.lit("1970-01-01"), F.col("EventDate"))) | ||
| df.createOrReplaceTempView("hits") | ||
|
|
||
| for try_num in range(3): | ||
| try: | ||
| start = timeit.default_timer() | ||
| result = spark.sql(query) | ||
| result.show(100) # some queries should return more than 20 rows which is the default show limit | ||
| end = timeit.default_timer() | ||
| print("Time: ", end - start) | ||
| except Exception as e: | ||
| print(e); | ||
| print("Failure!") | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.