Skip to content

Commit b2fed62

Browse files
Tom SchreiberTom Schreiber
authored andcommitted
Query parallelism performance guide
1 parent 815a28c commit b2fed62

File tree

8 files changed

+268
-0
lines changed

8 files changed

+268
-0
lines changed

docs/guides/best-practices/index.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ especially [Primary Indices](./sparse-primary-indexes.md).
1515
| Topic | Description |
1616
|---------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
1717
| [Query Optimization Guide](/optimize/query-optimization) | A good place to start for query optimization, this simple guide describes common scenarios of how to use different performance and optimization techniques to improve query performance. |
18+
| [Query Parallelism](/optimize/query-parallelism) | Explains how ClickHouse parallelizes query execution using processing lanes and the max_threads setting. Covers how data is distributed across lanes, how max_threads is applied, when it isn’t fully used, and how to inspect execution with tools like EXPLAIN and trace logs. |
1819
| [Partitioning Key](/optimize/partitioning-key) | Delves into ClickHouse partition key optimization. Explains how choosing the right partition key can significantly improve query performance by allowing ClickHouse to quickly locate relevant data segments. Covers best practices for selecting efficient partition keys and potential pitfalls to avoid. |
1920
| [Data Skipping Indexes](/optimize/skipping-indexes) | Explains data skipping indexes as a way to optimize performance. |
2021
| [Bulk Inserts](/optimize/bulk-inserts) | Explains the benefits of using bulk inserts in ClickHouse. |
Lines changed: 266 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,266 @@
1+
---
2+
slug: /optimize/query-parallelism
3+
sidebar_label: 'Query Parallelism'
4+
sidebar_position: 20
5+
description: 'ClickHouse parallelizes query execution using processing lanes and the max_threads setting.'
6+
title: 'How ClickHouse executes a query in parallel'
7+
---
8+
9+
import visual01 from '@site/static/images/guides/best-practices/query-parallelism_01.gif';
10+
import visual02 from '@site/static/images/guides/best-practices/query-parallelism_02.gif';
11+
import visual03 from '@site/static/images/guides/best-practices/query-parallelism_03.gif';
12+
import visual04 from '@site/static/images/guides/best-practices/query-parallelism_04.gif';
13+
import visual05 from '@site/static/images/guides/best-practices/query-parallelism_05.png';
14+
15+
import Image from '@theme/IdealImage';
16+
17+
# How ClickHouse executes a query in parallel
18+
19+
ClickHouse is [built for speed](/concepts/why-clickhouse-is-so-fast). It executes queries in a highly parallel fashion, using all available CPU cores, distributing data across processing lanes, and often pushing hardware close to its limits.
20+
21+
22+
This guide walks through how query parallelism works in ClickHouse and how you can tune or monitor it to improve performance on large workloads.
23+
24+
We use an aggregation query on the [uk_price_paid_simple](/parts) dataset to illustrate key concepts.
25+
26+
27+
## Step-by-step: How ClickHouse parallelizes an aggregation query {#step-by-step-how-clickHouse-parallelizes-an-aggregation-query}
28+
29+
When ClickHouse ① runs an aggregation query with a filter on the table’s primary key, it ② loads the primary index into memory to ③ identify which granules need to be processed, and which can be safely skipped:
30+
31+
<Image img={visual01} size="md" alt="Index analysis"/>
32+
33+
### Distributing work across processing lanes {#distributing-work-across-processing-lanes}
34+
35+
The selected data is then [dynamically](#load-balancing-across-processing-lanes) distributed across `n` parallel processing lanes, which stream and process the data [block](/development/architecture#block) by block into the final result:
36+
37+
<Image img={visual02} size="md" alt="4 parallel processing lanes"/>
38+
39+
<br/><br/>
40+
The number of `n` parallel processing lanes is controlled by the [max_threads](/operations/settings/settings#max_threads) setting, which by default matches the number of CPU cores available to ClickHouse on the server. In the example above, we assume `4` cores.
41+
42+
On a machine with `8` cores, query processing throughput would roughly double (but memory usage would also increase accordingly), as more lanes process data in parallel:
43+
44+
<Image img={visual03} size="md" alt="8 parallel processing lanes"/>
45+
46+
<br/><br/>
47+
Efficient lane distribution is key to maximizing CPU utilization and reducing total query time.
48+
49+
### Processing queries on sharded tables {#processing-queries-on-sharded-tables}
50+
51+
When table data is distributed across multiple servers as [shards](/shards), each server processes its shard in parallel. Within each server, the local data is handled using parallel processing lanes, just as described above:
52+
53+
<Image img={visual04} size="md" alt="Distributed lanes"/>
54+
55+
<br/><br/>
56+
The server that initially receives the query collects all sub-results from the shards and combines them into the final global result.
57+
58+
Distributing query load across shards allows horizontal scaling of parallelism, especially for high-throughput environments.
59+
60+
:::note ClickHouse Cloud uses parallel replicas instead of shards
61+
In ClickHouse Cloud, this same parallelism is achieved through [parallel replicas](https://clickhouse.com/docs/deployment-guides/parallel-replicas), which function similarly to shards in shared-nothing clusters. Each ClickHouse Cloud replica—a stateless compute node—processes a portion of the data in parallel and contributes to the final result, just like an independent shard would.
62+
:::
63+
64+
## Monitoring query parallelism {#monitoring-query-parallelism}
65+
66+
Use these tools to verify that your query fully utilizes available CPU resources and to diagnose when it doesn’t.
67+
68+
We’re running this on a test server with 59 CPU cores, which allows ClickHouse to fully showcase its intra-query parallelism.
69+
70+
To observe how the example query is executed, we can instruct the ClickHouse server to return all trace-level log entries during the aggregation query. For this demonstration, we removed the query’s predicate—otherwise, only 3 granules would be processed, which isn’t enough data for ClickHouse to make use of more than a few parallel processing lanes:
71+
```sql runnable=false
72+
SELECT
73+
max(price)
74+
FROM
75+
uk.uk_price_paid_simple
76+
SETTINGS send_logs_level='trace';
77+
```
78+
79+
```txt
80+
① <Debug> ...: 3609 marks to read from 3 ranges
81+
② <Trace> ...: Spreading mark ranges among streams
82+
② <Debug> ...: Reading approx. 29564928 rows with 59 streams
83+
```
84+
85+
We can see that
86+
87+
88+
89+
* ① ClickHouse needs to read 3,609 granules (indicated as marks in the trace logs) across 3 data ranges.
90+
* ② With 59 CPU cores, it distributes this work across 59 parallel processing streams—one per lane.
91+
92+
Alternatively, we can use the [EXPLAIN](/sql-reference/statements/explain#explain-pipeline) clause to inspect the [physical operator plan](/academic_overview#4-2-multi-core-parallelization)—also known as the "query pipeline"—for the aggregation query:
93+
```sql runnable=false
94+
EXPLAIN PIPELINE
95+
SELECT
96+
max(price)
97+
FROM
98+
uk.uk_price_paid_simple;
99+
```
100+
101+
```txt
102+
┌─explain───────────────────────────────────────────────────────────────────────────┐
103+
1. │ (Expression) │
104+
2. │ ExpressionTransform × 59 │
105+
3. │ (Aggregating) │
106+
4. │ Resize 59 → 59 │
107+
5. │ AggregatingTransform × 59 │
108+
6. │ StrictResize 59 → 59 │
109+
7. │ (Expression) │
110+
8. │ ExpressionTransform × 59 │
111+
9. │ (ReadFromMergeTree) │
112+
10. │ MergeTreeSelect(pool: PrefetchedReadPool, algorithm: Thread) × 59 0 → 1 │
113+
└───────────────────────────────────────────────────────────────────────────────────┘
114+
```
115+
116+
Note: Read the operator plan above from bottom to top. Each line represents a stage in the physical execution plan, starting with reading data from storage at the bottom and ending with the final processing steps at the top. Operators marked with `× 59` are executed concurrently on non-overlapping data regions across 59 parallel processing lanes. This reflects the value of `max_threads` and illustrates how each stage of the query is parallelized across CPU cores.
117+
118+
ClickHouse’s [embedded web UI](/interfaces/http) (available at the `/play` endpoint) can render the physical plan from above as a graphical visualization. In this example, we set `max_threads` to `4` to keep the visualization compact, showing just 4 parallel processing lanes:
119+
120+
<Image img={visual05} alt="Query pipeline"/>
121+
122+
Note: Read the visualization from left to right. Each row represents a parallel processing lane that streams data block by block, applying transformations such as filtering, aggregation, and final processing stages. In this example, you can see four parallel lanes corresponding to the `max_threads = 4` setting.
123+
124+
125+
### Load balancing across processing lanes {#load-balancing-across-processing-lanes}
126+
127+
Note that the `Resize` operators in the physical plan above [repartition and redistribute](/academic_overview#4-2-multi-core-parallelization) data block streams across processing lanes to keep them evenly utilized. This rebalancing is essential when data ranges have varying predicate selectivities, which can otherwise overload some lanes while leaving others underutilized. By redistributing the work, faster lanes effectively help out slower ones—optimizing overall query runtime.
128+
129+
130+
## Why max_threads isn't always respected {#why-max-threads-isnt-always-respected}
131+
132+
As mentioned above, the number of `n` parallel processing lanes is controlled by the `max_threads` setting, which by default matches the number of CPU cores available to ClickHouse on the server:
133+
```sql runnable=false
134+
SELECT getSetting('max_threads');
135+
```
136+
137+
```txt
138+
┌─getSetting('max_threads')─┐
139+
1. │ 59 │
140+
└───────────────────────────┘
141+
```
142+
143+
However, the `max_threads` value may be ignored depending on the amount of data selected for processing:
144+
```sql runnable=false
145+
EXPLAIN PIPELINE
146+
SELECT
147+
max(price)
148+
FROM
149+
uk.uk_price_paid_simple
150+
WHERE town = 'LONDON';
151+
```
152+
153+
```txt
154+
...
155+
(ReadFromMergeTree)
156+
MergeTreeSelect(pool: PrefetchedReadPool, algorithm: Thread) × 30
157+
```
158+
159+
As shown in the operator plan extract above, even though `max_threads` is set to `59`, ClickHouse uses only **30** concurrent streams to scan the data.
160+
161+
Now let’s run the query:
162+
```sql runnable=false
163+
SELECT
164+
max(price)
165+
FROM
166+
uk.uk_price_paid_simple
167+
WHERE town = 'LONDON';
168+
```
169+
170+
```txt
171+
┌─max(price)─┐
172+
1. │ 594300000 │ -- 594.30 million
173+
└────────────┘
174+
175+
1 row in set. Elapsed: 0.013 sec. Processed 2.31 million rows, 13.66 MB (173.12 million rows/s., 1.02 GB/s.)
176+
Peak memory usage: 27.24 MiB.
177+
```
178+
179+
As shown in the output above, the query processed 2.31 million rows and read 13.66MB of data. This is because, during the index analysis phase, ClickHouse selected **282 granules** for processing, each containing 8,192 rows, totaling approximately 2.31 million rows:
180+
181+
```sql runnable=false
182+
EXPLAIN indexes = 1
183+
SELECT
184+
max(price)
185+
FROM
186+
uk.uk_price_paid_simple
187+
WHERE town = 'LONDON';
188+
```
189+
190+
```txt
191+
┌─explain───────────────────────────────────────────────┐
192+
1. │ Expression ((Project names + Projection)) │
193+
2. │ Aggregating │
194+
3. │ Expression (Before GROUP BY) │
195+
4. │ Expression │
196+
5. │ ReadFromMergeTree (uk.uk_price_paid_simple) │
197+
6. │ Indexes: │
198+
7. │ PrimaryKey │
199+
8. │ Keys: │
200+
9. │ town │
201+
10. │ Condition: (town in ['LONDON', 'LONDON']) │
202+
11. │ Parts: 3/3 │
203+
12. │ Granules: 282/3609 │
204+
└───────────────────────────────────────────────────────┘
205+
```
206+
207+
Regardless of the configured `max_threads` value, ClickHouse only allocates additional parallel processing lanes when there’s enough data to justify them. The "max" in `max_threads` refers to an upper limit, not a guaranteed number of threads used.
208+
209+
What "enough data" means is primarily determined by two settings, which define the minimum number of rows (163,840 by default) and the minimum number of bytes (2,097,152 by default) that each processing lane should handle:
210+
211+
For shared-nothing clusters:
212+
* [merge_tree_min_rows_for_concurrent_read](https://clickhouse.com/docs/operations/settings/settings#merge_tree_min_rows_for_concurrent_read)
213+
* [merge_tree_min_bytes_for_concurrent_read](https://clickhouse.com/docs/operations/settings/settings#merge_tree_min_bytes_for_concurrent_read)
214+
215+
For clusters with shared storage (e.g. ClickHouse Cloud):
216+
* [merge_tree_min_rows_for_concurrent_read_for_remote_filesystem](https://clickhouse.com/docs/operations/settings/settings#merge_tree_min_rows_for_concurrent_read_for_remote_filesystem)
217+
* [merge_tree_min_bytes_for_concurrent_read_for_remote_filesystem](https://clickhouse.com/docs/operations/settings/settings#merge_tree_min_bytes_for_concurrent_read_for_remote_filesystem)
218+
219+
Additionally, there’s a hard lower limit for read task size, controlled by:
220+
* [Merge_tree_min_read_task_size](https://clickhouse.com/docs/operations/settings/settings#merge_tree_min_read_task_size) + [merge_tree_min_bytes_per_task_for_remote_reading](https://clickhouse.com/docs/operations/settings/settings#merge_tree_min_bytes_per_task_for_remote_reading)
221+
222+
We don’t recommend modifying these settings in production. They’re shown here solely to illustrate why `max_threads` doesn’t always determine the actual level of parallelism.
223+
224+
For demonstration purposes, let’s inspect the physical plan with these settings overridden to force maximum concurrency:
225+
```sql runnable=false
226+
EXPLAIN PIPELINE
227+
SELECT
228+
max(price)
229+
FROM
230+
uk.uk_price_paid_simple
231+
WHERE town = 'LONDON'
232+
SETTINGS
233+
max_threads = 59,
234+
merge_tree_min_read_task_size = 0,
235+
merge_tree_min_rows_for_concurrent_read_for_remote_filesystem = 0,
236+
merge_tree_min_bytes_for_concurrent_read_for_remote_filesystem = 0;
237+
```
238+
239+
```txt
240+
...
241+
(ReadFromMergeTree)
242+
MergeTreeSelect(pool: PrefetchedReadPool, algorithm: Thread) × 59
243+
```
244+
245+
Now ClickHouse uses 59 concurrent streams to scan the data, fully respecting the configured `max_threads`.
246+
247+
This demonstrates that for queries on small datasets, ClickHouse will intentionally limit concurrency. Use setting overrides only for testing—not in production—as they can lead to inefficient execution or resource contention.
248+
249+
## Key takeaways {#key-takeaways}
250+
251+
* ClickHouse parallelizes queries using processing lanes tied to `max_threads`.
252+
* The actual number of lanes depends on the size of data selected for processing.
253+
* Use EXPLAIN PIPELINE and trace logs to analyze lane usage.
254+
255+
256+
## Where to find more information {#where-to-find-more-information}
257+
258+
If you’d like to dive deeper into how ClickHouse executes queries in parallel and how it achieves high performance at scale, explore the following resources:
259+
260+
* [Query Processing Layer – VLDB 2024 Paper (Web Edition)](https://clickhouse.com/docs/academic_overview#4-query-processing-layer) - A detailed breakdown of ClickHouse’s internal execution model, including scheduling, pipelining, and operator design.
261+
262+
* [Partial aggregation states explained](https://clickhouse.com/blog/clickhouse_vs_elasticsearch_mechanics_of_count_aggregations#-multi-core-parallelization) - A technical deep dive into how partial aggregation stated enables efficient parallel execution across processing lanes.
263+
264+
* A video tutorial walking in detail through all ClickHouse query processing steps:
265+
<iframe width="1024" height="576" src="https://www.youtube.com/embed/hP6G2Nlz_cA?si=Imd_i427J_kZOXHe" title="YouTube video player" frameborder="0" allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture; web-share" referrerpolicy="strict-origin-when-cross-origin" allowfullscreen></iframe>
266+

sidebars.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1145,6 +1145,7 @@ const sidebars = {
11451145
link: { type: "doc", id: "guides/best-practices/index" },
11461146
items: [
11471147
"guides/best-practices/query-optimization",
1148+
"guides/best-practices/query-parallelism",
11481149
"guides/best-practices/partitioningkey",
11491150
"guides/best-practices/skipping-indexes",
11501151
"guides/best-practices/bulkinserts",
1.16 MB
Loading
660 KB
Loading
894 KB
Loading
572 KB
Loading
355 KB
Loading

0 commit comments

Comments
 (0)