Skip to content

Commit 78d645c

Browse files
authored
feat: Queues for Kafka tutorial (#110)
1 parent 4cc2b0b commit 78d645c

File tree

20 files changed

+1077
-12
lines changed

20 files changed

+1077
-12
lines changed

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ subprojects {
2727
}
2828

2929
tasks.withType(Test) {
30+
maxParallelForks = 1
3031
testLogging {
3132
outputs.upToDateWhen { false }
3233
events "PASSED", "SKIPPED", "FAILED", "STANDARD_OUT", "STANDARD_ERROR"

confluent-parallel-consumer-application/kafka/build.gradle

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,9 @@ repositories {
2929

3030

3131
dependencies {
32-
implementation project(':common')
3332
implementation "org.slf4j:slf4j-simple:2.0.7"
34-
implementation "io.confluent.parallelconsumer:parallel-consumer-core:0.5.2.4"
33+
implementation "io.confluent.parallelconsumer:parallel-consumer-core:0.5.3.3"
34+
implementation "org.apache.kafka:kafka-clients:3.9.1"
3535
implementation "org.apache.commons:commons-lang3:3.12.0"
3636
implementation "me.tongfei:progressbar:0.9.3"
3737
implementation 'org.awaitility:awaitility:4.2.0'
@@ -41,7 +41,7 @@ dependencies {
4141
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.9.2'
4242
testImplementation 'org.hamcrest:hamcrest:2.2'
4343
testImplementation 'org.awaitility:awaitility:4.2.0'
44-
testImplementation "io.confluent.parallelconsumer:parallel-consumer-core:0.5.2.4:tests" // for LongPollingMockConsumer
44+
testImplementation "io.confluent.parallelconsumer:parallel-consumer-core:0.5.3.3:tests" // for LongPollingMockConsumer
4545
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
4646
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.9.2'
4747
}

confluent-parallel-consumer-application/kafka/settings.gradle

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,4 @@
66
* This project uses @Incubating APIs which are subject to change.
77
*/
88

9-
rootProject.name = 'parallel-consumer'
10-
include ':common'
11-
project(':common').projectDir = file('../../common')
9+
rootProject.name = 'parallel-consumer'

joining-stream-table/kstreams/src/test/java/io/confluent/developer/MovieRatingJoinerTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@ class MovieRatingJoinerTest {
1212
void testApply() {
1313
RatedMovie actualRatedMovie;
1414

15-
Movie treeOfLife = new Movie(354, "Tree of Life", 2011);
16-
Rating rating = new Rating(354, 9.8);
17-
RatedMovie expectedRatedMovie = new RatedMovie(354, "Tree of Life", 2011, 9.8);
15+
Movie treeOfLife = new Movie("354", "Tree of Life", 2011);
16+
Rating rating = new Rating("354", 9.8);
17+
RatedMovie expectedRatedMovie = new RatedMovie("354", "Tree of Life", 2011, 9.8);
1818
MovieRatingJoiner joiner = new MovieRatingJoiner();
1919
actualRatedMovie = joiner.apply(rating, treeOfLife);
2020

joining-table-table/kstreams/src/test/java/io/confluent/developer/MovieRatingJoinerTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@ class MovieRatingJoinerTest {
1212
void testApply() {
1313
RatedMovie actualRatedMovie;
1414

15-
Movie treeOfLife = new Movie(354, "Tree of Life", 2011);
16-
Rating rating = new Rating(354, 9.8);
17-
RatedMovie expectedRatedMovie = new RatedMovie(354, "Tree of Life", 2011, 9.8);
15+
Movie treeOfLife = new Movie("354", "Tree of Life", 2011);
16+
Rating rating = new Rating("354", 9.8);
17+
RatedMovie expectedRatedMovie = new RatedMovie("354", "Tree of Life", 2011, 9.8);
1818
MovieRatingJoiner joiner = new MovieRatingJoiner();
1919
actualRatedMovie = joiner.apply(rating, treeOfLife);
2020

queues-for-kafka/.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
gradle/
2+
gradlew
3+
gradlew.bat

queues-for-kafka/README.md

Lines changed: 329 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,329 @@
1+
<!-- title: How to scale Kafka consumption throughput with share consumers (KIP-932: Queues for Kafka) -->
2+
<!-- description: In this tutorial, learn how to scale Kafka consumption throughput with share consumers (from KIP-932: Queues for Kafka), with step-by-step instructions and supporting code. -->
3+
4+
# How to scale Kafka consumption throughput with share consumers (KIP-932: Queues for Kafka)
5+
6+
This tutorial demonstrates how to produce a high volume of messages to Kafka, and then compare consumption throughput when using both regular consumers and share consumers. The steps in this tutorial outline how to set up a cluster for share consumers, run the provided producer / consumer applications, and compare performance results between classic Kafka consumer instances and share consumers. For a deeper look at the application source code, refer to the `Code explanation` section at the bottom.
7+
8+
The following steps use Confluent Cloud. To run the tutorial locally with Docker, skip to the `Docker instructions` section at the bottom.
9+
10+
## Prerequisites
11+
12+
- A [Confluent Cloud](https://confluent.cloud/signup) account
13+
- The [Confluent CLI](https://docs.confluent.io/confluent-cli/current/install.html) installed on your machine
14+
- [Apache Kafka 4.1](https://kafka.apache.org/downloads) for its command-line tools
15+
- Clone the `confluentinc/tutorials` repository and navigate into its top-level directory:
16+
```shell
17+
git clone [email protected]:confluentinc/tutorials.git
18+
cd tutorials
19+
```
20+
21+
## Create Confluent Cloud resources
22+
23+
First, create a Dedicated 1-CKU cluster in Confluent Cloud by following the instructions [here](https://docs.confluent.io/cloud/current/clusters/create-cluster.html#create-ak-clusters).
24+
25+
Since Queues for Kafka is currently a Closed Preview feature, you'll need to open a support request to enable the feature on your cluster. In the [Confluent Support Portal](https://support.confluent.io/), open a ticket requesting that Queues for Kafka be enabled for your cluster. Provide the cluster ID in your request, which you can find in the [Confluent Cloud Console](https://confluent.cloud/) by navigating to `Cluster Settings` from your Dedicated cluster overview page.
26+
27+
## Confluent CLI setup
28+
29+
Run the following series of commands to log in and set the active Confluent Cloud environment and cluster.
30+
31+
```shell
32+
confluent login --prompt --save
33+
confluent environment list
34+
confluent environment use <ENVIRONMENT_ID>
35+
confluent kafka cluster list
36+
confluent kafka cluster use <CLUSTER_ID>
37+
```
38+
39+
## Generate Confluent Cloud credentials
40+
41+
Generate a Kafka API key by substituting the cluster ID from the previous command:
42+
43+
```shell
44+
confluent api-key create --resource <CLUSTER_ID>
45+
```
46+
47+
Copy the API key into the file `queues-for-kafka/src/main/resources/cloud.properties` where you see the `<API_KEY>` placeholder, and copy the API secret where you see the `<API_SECRET>` placeholder.
48+
49+
Run this command to get your cluster's bootstrap servers endpoint:
50+
51+
```shell
52+
confluent kafka cluster describe
53+
```
54+
55+
Copy the endpoint (of the form `pkc-<ID>.<REGION>.<CLOUD>.confluent.cloud:9092`) into the same `cloud.properties` file where you see the `<BOOTSTRAP_SERVERS>` placeholder. Do not copy the leading `SASL_SSL://`.
56+
57+
## Create topic
58+
59+
Create a 6-partition topic called `strings` that we will use to test consumption throughput.
60+
61+
```shell
62+
confluent kafka topic create strings --partitions 6
63+
```
64+
65+
## Compile and run the producer application
66+
67+
Compile the application from the top-level `tutorials` repository directory:
68+
69+
```shell
70+
./gradlew queues-for-kafka:shadowJar
71+
```
72+
73+
Navigate into the application's home directory:
74+
75+
```shell
76+
cd queues-for-kafka
77+
```
78+
79+
Run the producer application, passing the `cloud.properties` Kafka client configuration file that you populated with your Dedicated cluster's bootstrap servers endpoint and credentials:
80+
81+
```shell
82+
java -cp ./build/libs/kafka-consumer-comparison-app-0.0.1.jar \
83+
io.confluent.developer.ProducerApp \
84+
--properties-file ./src/main/resources/cloud.properties
85+
```
86+
87+
## Run consumer applications
88+
89+
In a separate shell, run the regular `KafkaConsumer`-based application. This will run 16 concurrent consumers. Only 6 will actively consume since a partition can only be assigned to one consumer instance. It will simulate a 500-millisecond workload and report throughput after consuming 1,000 events.
90+
91+
```shell
92+
java -cp ./build/libs/kafka-consumer-comparison-app-0.0.1.jar \
93+
io.confluent.developer.ConsumerApp \
94+
--properties-file ./src/main/resources/cloud.properties \
95+
--consumer-type consumer \
96+
--num-consumers 16 \
97+
--wait-ms 500 \
98+
--total-events 1000
99+
```
100+
101+
The app will exit once 1,000 events have been consumed, which should take around a minute and a half. You will see a log message like this reporting the duration:
102+
103+
```plaintext
104+
Completed consuming 1000 messages in 89.61 seconds.
105+
```
106+
107+
Next, run the consumer application using share consumers.
108+
109+
First, alter the `share-consumer-group` to begin consuming from the earliest offset:
110+
111+
```shell
112+
<KAFKA_HOME>/bin/kafka-configs.sh --bootstrap-server <BOOTSTRAP_SERVER> \
113+
--group share-consumer-group --alter --add-config 'share.auto.offset.reset=earliest' \
114+
--command-config ./src/main/resources/cloud.properties
115+
```
116+
117+
Run the consumer app again using the same number of threads and simulated event processing time, except this time pass the `share_consumer` consumer type:
118+
119+
```shell
120+
java -cp ./build/libs/kafka-consumer-comparison-app-0.0.1.jar \
121+
io.confluent.developer.ConsumerApp \
122+
--properties-file ./src/main/resources/cloud.properties \
123+
--consumer-type share_consumer \
124+
--num-consumers 16 \
125+
--wait-ms 500 \
126+
--total-events 1000
127+
```
128+
129+
This time, the app should take closer to 30 seconds to complete, given that consumption scales to all 16 threads. You will see a log message like this reporting the duration:
130+
131+
```plaintext
132+
Completed consuming 1000 messages in 31.42 seconds.
133+
```
134+
135+
## Other suggested experiments
136+
137+
Try different application configurations to see how consumption throughput is impacted. For example, vary `--num-consumers` and `--wait-ms` to see how throughput scales with more workers and different per-event wait times. Also try a different number of topic partitions. How does it impact consumption throughput?
138+
139+
## Clean up
140+
141+
When you are finished, delete the Confluent Cloud resources created for this tutorial. For example, if you are using an isolated environment, delete it by first getting the environment ID in the form `env-123456`:
142+
143+
```shell
144+
confluent environment list
145+
```
146+
147+
Delete the environment, including all resources created for this tutorial:
148+
149+
```shell
150+
confluent environment delete <ENVIRONMENT_ID>
151+
```
152+
153+
<details>
154+
<summary>Docker instructions</summary>
155+
156+
## Prerequisites
157+
158+
* Docker running via [Docker Desktop](https://docs.docker.com/desktop/) or [Docker Engine](https://docs.docker.com/engine/install/)
159+
* [Docker Compose](https://docs.docker.com/compose/install/). Ensure that the command `docker compose version` succeeds.
160+
* Clone the `confluentinc/tutorials` repository and navigate into its top-level directory:
161+
```shell
162+
git clone [email protected]:confluentinc/tutorials.git
163+
cd tutorials
164+
```
165+
166+
## Start Kafka in Docker
167+
168+
Start Apache Kafka 4.1 with the following command:
169+
170+
```shell
171+
docker compose -f ./queues-for-kafka/docker-compose.yml up -d
172+
```
173+
174+
## Enable share consumption
175+
176+
Open a shell in the broker container:
177+
178+
```shell
179+
docker exec --workdir /opt/kafka/bin/ -it broker /bin/bash
180+
```
181+
182+
Enable share consumers:
183+
184+
```shell
185+
./kafka-features.sh --bootstrap-server localhost:9092 upgrade --feature share.version=1
186+
```
187+
188+
Alter the `share-consumer-group` share group to begin consuming from the earliest offset:
189+
190+
```shell
191+
./kafka-configs.sh --bootstrap-server localhost:9092 \
192+
--group share-consumer-group --alter \
193+
--add-config 'share.auto.offset.reset=earliest'
194+
```
195+
196+
## Create topics
197+
198+
In the broker container, create a topic called `strings` with 6 partitions:
199+
200+
```shell
201+
./kafka-topics.sh --bootstrap-server localhost:9092 --create \
202+
--partitions 6 --topic strings
203+
```
204+
205+
Enter `Ctrl+D` to exit the container shell.
206+
207+
## Compile and run the producer application
208+
209+
On your local machine, compile the app:
210+
211+
```shell
212+
./gradlew queues-for-kafka:shadowJar
213+
```
214+
215+
Navigate into the application's home directory:
216+
217+
```shell
218+
cd queues-for-kafka
219+
```
220+
221+
Run the producer application, passing the `local.properties` Kafka client configuration file that points to the broker's bootstrap servers endpoint at `localhost:9092`:
222+
223+
```shell
224+
java -cp ./build/libs/kafka-consumer-comparison-app-0.0.1.jar \
225+
io.confluent.developer.ProducerApp \
226+
--properties-file ./src/main/resources/local.properties
227+
```
228+
229+
## Run consumer applications
230+
231+
In a separate shell, run the regular `KafkaConsumer`-based application. This will run 16 concurrent consumers. Only 6 will actively consume since a partition can only be assigned to one consumer instance. It will simulate a 500-millisecond workload and report throughput after consuming 1,000 events.
232+
233+
```shell
234+
java -cp ./build/libs/kafka-consumer-comparison-app-0.0.1.jar \
235+
io.confluent.developer.ConsumerApp \
236+
--properties-file ./src/main/resources/local.properties \
237+
--consumer-type consumer \
238+
--num-consumers 16 \
239+
--wait-ms 500 \
240+
--total-events 1000
241+
```
242+
243+
The app will exit once 1,000 events have been consumed, which should take around a minute and a half. You will see a log message like this reporting the duration:
244+
245+
```plaintext
246+
Completed consuming 1000 messages in 89.61 seconds.
247+
```
248+
249+
Next, run the consumer app again using the same number of threads and simulated event processing time, except this time pass the `share_consumer` consumer type:
250+
251+
```shell
252+
java -cp ./build/libs/kafka-consumer-comparison-app-0.0.1.jar \
253+
io.confluent.developer.ConsumerApp \
254+
--properties-file ./src/main/resources/local.properties \
255+
--consumer-type share_consumer \
256+
--num-consumers 16 \
257+
--wait-ms 500 \
258+
--total-events 1000
259+
```
260+
261+
This time, the app should take closer to 30 seconds to complete, given that consumption scales to all 16 threads. You will see a log message like this reporting the duration:
262+
263+
```plaintext
264+
Completed consuming 1000 messages in 31.42 seconds.
265+
```
266+
267+
## Other suggested experiments
268+
269+
Try different application configurations to see how consumption throughput is impacted. For example, vary `--num-consumers` and `--wait-ms` to see how throughput scales with more workers and different per-event wait times. Also try a different number of topic partitions. How does it impact consumption throughput?
270+
271+
## Clean up
272+
273+
From your local machine, stop the broker container:
274+
275+
```shell
276+
docker compose -f ./queues-for-kafka/docker-compose.yml down
277+
```
278+
279+
</details>
280+
281+
<details>
282+
<summary>Code explanation</summary>
283+
284+
This section summarizes the key application source files under `src/main/java/io/confluent/developer`.
285+
286+
- **`ProducerApp.java`**: Standalone producer that sends a high volume of string messages to the `strings` topic.
287+
288+
- Parses CLI options via `ProducerAppArgParser` to locate the Kafka client properties file.
289+
- Builds a `KafkaProducer<String, String>` with `StringSerializer` for keys/values and `acks=all` for durability.
290+
- Produces 1,000,000 messages (key and value are the stringified index), logs progress every 10,000, and throttles briefly to keep the producer running longer and avoid overwhelming the broker.
291+
- The producer flushes every event so that there aren't large batches. This ensures that each multiple share consumers will be able to actively consume from a given partition.
292+
293+
- **`ConsumerApp.java`**: Orchestrates multi-threaded consumption to compare regular `KafkaConsumer` vs `KafkaShareConsumer`-based throughput.
294+
295+
- Parses CLI options via `ConsumerAppArgParser`:
296+
- `--consumer-type` selects `consumer` (regular) or `share_consumer` (share consumer).
297+
- `--num-consumers` controls the number of consumer worker threads.
298+
- `--wait-ms` simulates per-record processing time (sleep per event).
299+
- `--total-events` stops after consuming the specified number of events across all workers.
300+
- Builds consumer properties common to both implementations: `StringDeserializer` for keys/values and the appropriate group.
301+
- For regular consumers: sets `group.id=consumer-group` and `auto.offset.reset=earliest`.
302+
- For share consumers: sets `group.id=share-consumer-group`, `share.acknowledgement.mode=explicit`, and `max.poll.records=100`.
303+
- Creates an `ExecutorService` and launches N `ConsumerThread` workers with an event handler that:
304+
- Sleeps for `--wait-ms` to simulate work.
305+
- Atomically counts records across all workers, logs progress periodically, and records total elapsed time when `--total-events` is reached.
306+
- Adds a shutdown hook to close all consumers and the executor service cleanly.
307+
308+
- **`ConsumerThread.java`**: A runnable worker used by `ConsumerApp` that encapsulates the consumption loop for either consumer type.
309+
310+
- Two constructors: one for a `KafkaConsumer` and one for a `KafkaShareConsumer`. Each subscribes the consumer to the `strings` topic.
311+
- In `run()`, polls for events and invokes the provided `EventHandler` per record.
312+
- Share consumer path: after handling a record, explicitly acknowledges with `AcknowledgeType.ACCEPT` or `REJECT` on error.
313+
- Regular consumer path: handles records without explicit acknowledgements (normal consumer semantics).
314+
- Cleanly closes the underlying consumers and exits once the desired number of events is consumed.
315+
316+
- **`ConsumerAppArgParser.java`**: Command-line parsing and validation for the consumer app using Apache Commons CLI.
317+
318+
- Options: `--properties-file`, `--consumer-type`, `--num-consumers`, `--wait-ms`, and `--total-events`.
319+
- Validates ranges (e.g., consumers 1–16, wait 1–5000 ms, events 1–1,000,000).
320+
321+
- **`ProducerAppArgParser.java`**: Minimal command-line parsing for the producer app.
322+
- Option: `--properties-file` to locate the Kafka client configuration.
323+
324+
### Notes
325+
326+
- All examples use the topic `strings`. Adjust the topic name in the source if needed.
327+
- Kafka client configuration is provided via the properties files in `src/main/resources` (`cloud.properties` or `local.properties`).
328+
329+
</details>

0 commit comments

Comments
 (0)