Skip to content

Commit 53109ff

Browse files
committed
refactor: update producer code to work with latest packages
* Added new Dockerfile and docker compose file to test locally Resolves: #0 Signed-off-by: Joel Hanson <[email protected]>
1 parent c0d8da2 commit 53109ff

File tree

5 files changed

+140
-59
lines changed

5 files changed

+140
-59
lines changed

Dockerfile

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# Build stage
2+
FROM maven:3.8.7-ibm-semeru-17-focal AS build
3+
4+
WORKDIR /app
5+
COPY pom.xml .
6+
COPY package.json .
7+
COPY src ./src
8+
COPY ui ./ui
9+
10+
# Build the JAR (this will run frontend-maven-plugin too)
11+
RUN mvn clean package -DskipTests
12+
13+
# Runtime stage
14+
FROM openjdk:17-jdk-slim
15+
16+
WORKDIR /app
17+
COPY kafka.properties kafka.properties
18+
COPY --from=build /app/target/demo-all.jar app.jar
19+
20+
EXPOSE 8080
21+
22+
ENTRYPOINT ["java", "-jar", "app.jar"]

docker-compose.yml

Lines changed: 45 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,52 @@
1-
version: '2'
21
services:
32

4-
zookeeper:
5-
image: strimzi/zookeeper
6-
command: [
7-
"sh", "-c",
8-
"bin/zookeeper-server-start.sh config/zookeeper.properties"
9-
]
3+
broker:
4+
image: apache/kafka:latest
5+
container_name: broker
6+
hostname: broker
107
ports:
11-
- "2181:2181"
8+
- "9092:9092"
129
environment:
13-
LOG_DIR: /tmp/logs
10+
KAFKA_NODE_ID: 1
11+
KAFKA_PROCESS_ROLES: broker,controller
12+
KAFKA_LISTENERS: PLAINTEXT://:9092,PLAINTEXT_HOST://:29092,CONTROLLER://:9093
13+
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
14+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,CONTROLLER:PLAINTEXT
15+
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
16+
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker:9093
17+
KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
18+
KAFKA_CONFIG_DIR: /var/lib/kafka-config
19+
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
20+
healthcheck:
21+
test: ["CMD", "/opt/kafka/bin/kafka-broker-api-versions.sh", "--bootstrap-server", "localhost:9092"]
22+
interval: 5s
23+
timeout: 10s
24+
retries: 3
25+
start_period: 3s
26+
27+
create-topics:
28+
image: apache/kafka:latest
29+
container_name: create-topics
30+
depends_on:
31+
broker:
32+
condition: service_healthy
33+
entrypoint: ["/bin/sh", "-c"]
34+
command: |
35+
"
36+
/opt/kafka/bin/kafka-topics.sh --create --if-not-exists --topic __consumer_offsets --bootstrap-server broker:9092 --partitions 1 --replication-factor 1
37+
/opt/kafka/bin/kafka-topics.sh --create --if-not-exists --topic demo --bootstrap-server broker:9092 --partitions 1 --replication-factor 1
38+
"
39+
restart: "no"
1440

15-
kafka:
16-
image: strimzi/kafka
17-
command: [
18-
"sh", "-c",
19-
"bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}"
20-
]
41+
42+
app:
43+
build: .
44+
container_name: demo-app
2145
depends_on:
22-
- zookeeper
46+
broker:
47+
condition: service_healthy
48+
create-topics:
49+
condition: service_completed_successfully
2350
ports:
24-
- "9092:9092"
25-
environment:
26-
LOG_DIR: "/tmp/logs"
27-
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
28-
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
29-
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
51+
- "8080:8080"
52+
restart: always

kafka.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
bootstrap.servers=localhost:9092
1+
bootstrap.servers=broker:9092
22
## Optional topic configuration - otherwise default value will be chosen
33
# topic=
44

src/main/java/kafka/vertx/demo/PeriodicProducer.java

Lines changed: 59 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
import io.vertx.core.AbstractVerticle;
99
import io.vertx.core.Promise;
10-
import io.vertx.core.TimeoutStream;
1110
import io.vertx.core.eventbus.Message;
1211
import io.vertx.core.json.JsonObject;
1312
import io.vertx.kafka.client.producer.KafkaProducer;
@@ -16,54 +15,84 @@
1615
import org.slf4j.Logger;
1716
import org.slf4j.LoggerFactory;
1817

19-
import java.util.HashMap;
18+
import java.time.Duration;
19+
import java.util.Map;
20+
import java.util.stream.Collectors;
2021

2122
public class PeriodicProducer extends AbstractVerticle {
2223

2324
private static final Logger logger = LoggerFactory.getLogger(PeriodicProducer.class);
25+
private static final long PRODUCE_INTERVAL_MS = Duration.ofSeconds(2).toMillis();
26+
27+
private KafkaProducer<String, String> kafkaProducer;
28+
private long timerId = -1;
29+
private long TIMER_NOT_SET = -1;
2430
private String customMessage;
2531

2632
@Override
2733
public void start(Promise<Void> startPromise) {
28-
String propertiesPath = System.getProperty(Main.PROPERTIES_PATH_ENV_NAME, Main.DEFAULT_PROPERTIES_PATH);
34+
String propertiesPath = System.getProperty(
35+
Main.PROPERTIES_PATH_ENV_NAME,
36+
Main.DEFAULT_PROPERTIES_PATH
37+
);
38+
2939
Main.loadKafkaConfig(vertx, propertiesPath)
3040
.onSuccess(config -> {
31-
HashMap<String, String> props = config.mapTo(HashMap.class);
32-
setup(props);
41+
setup(config);
3342
startPromise.complete();
3443
})
3544
.onFailure(startPromise::fail);
3645
}
3746

38-
private void setup(HashMap<String, String> props) {
39-
// Don't retry and only wait 10 secs for partition info as this is a demo app
47+
private void setup(JsonObject config) {
48+
// Convert JsonObject config -> Map<String,String>
49+
Map<String, String> props = config.getMap()
50+
.entrySet()
51+
.stream()
52+
.collect(Collectors.toMap(
53+
Map.Entry::getKey,
54+
e -> String.valueOf(e.getValue())
55+
));
56+
4057
props.put(ProducerConfig.RETRIES_CONFIG, "0");
4158
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000");
42-
KafkaProducer<String, String> kafkaProducer = KafkaProducer.create(vertx, props);
4359

44-
kafkaProducer.exceptionHandler(err -> logger.debug("Kafka error: {}", err));
60+
kafkaProducer = KafkaProducer.create(vertx, props);
61+
kafkaProducer.exceptionHandler(err -> logger.error("Kafka producer error", err));
4562

46-
TimeoutStream timerStream = vertx.periodicStream(2000);
47-
timerStream.handler(tick -> produceKafkaRecord(kafkaProducer, props.get(Main.TOPIC_KEY)));
48-
timerStream.pause();
63+
vertx.eventBus()
64+
.<JsonObject>consumer(Main.PERIODIC_PRODUCER_ADDRESS,
65+
msg -> handleCommand(props, msg));
4966

50-
vertx.eventBus().<JsonObject>consumer(Main.PERIODIC_PRODUCER_ADDRESS, message -> handleCommand(timerStream, message));
5167
logger.info("🚀 PeriodicProducer started");
5268
}
5369

54-
private void handleCommand(TimeoutStream timerStream, Message<JsonObject> message) {
70+
private void handleCommand(Map<String, String> props, Message<JsonObject> message) {
5571
String command = message.body().getString(WebSocketServer.ACTION, "none");
56-
if (WebSocketServer.START_ACTION.equals(command)) {
57-
logger.info("Producing Kafka records");
58-
customMessage = message.body().getString("custom", "Hello World");
59-
timerStream.resume();
60-
} else if (WebSocketServer.STOP_ACTION.equals(command)) {
61-
logger.info("Stopping producing Kafka records");
62-
timerStream.pause();
72+
switch (command) {
73+
case WebSocketServer.START_ACTION:
74+
customMessage = message.body().getString("custom", "Hello World");
75+
if (timerId == TIMER_NOT_SET) {
76+
timerId = vertx.setPeriodic(PRODUCE_INTERVAL_MS,
77+
id -> produceKafkaRecord(props.get(Main.TOPIC_KEY)));
78+
logger.info("Producing Kafka records with message template: {}", customMessage);
79+
}
80+
break;
81+
82+
case WebSocketServer.STOP_ACTION:
83+
if (timerId != TIMER_NOT_SET) {
84+
vertx.cancelTimer(timerId);
85+
timerId = -1;
86+
logger.info("Stopped producing Kafka records");
87+
}
88+
break;
89+
90+
default:
91+
logger.warn("Unknown command received: {}", command);
6392
}
6493
}
6594

66-
private void produceKafkaRecord(KafkaProducer<String, String> kafkaProducer, String topic) {
95+
private void produceKafkaRecord(String topic) {
6796
String payload = customMessage;
6897
KafkaProducerRecord<String, String> record = KafkaProducerRecord.create(topic, payload);
6998
logger.debug("Producing record to topic {} with payload {}", topic, payload);
@@ -84,4 +113,12 @@ private void produceKafkaRecord(KafkaProducer<String, String> kafkaProducer, Str
84113
vertx.eventBus().send(Main.PERIODIC_PRODUCER_BROADCAST, new JsonObject().put("status", "ERROR"));
85114
});
86115
}
116+
117+
@Override
118+
public void stop() {
119+
if (kafkaProducer != null) {
120+
kafkaProducer.close()
121+
.onComplete(ar -> logger.info("KafkaProducer closed: {}", ar.succeeded() ? "success" : ar.cause()));
122+
}
123+
}
87124
}

src/main/java/kafka/vertx/demo/WebSocketServer.java

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,12 @@
1717
import io.vertx.ext.web.Router;
1818
import io.vertx.ext.web.handler.StaticHandler;
1919
import io.vertx.ext.web.templ.thymeleaf.ThymeleafTemplateEngine;
20-
import io.vertx.kafka.client.common.TopicPartition;
2120
import io.vertx.kafka.client.consumer.KafkaConsumer;
2221
import org.apache.kafka.clients.consumer.ConsumerConfig;
2322
import org.slf4j.Logger;
2423
import org.slf4j.LoggerFactory;
2524

2625
import java.util.HashMap;
27-
import java.util.Set;
2826

2927
public class WebSocketServer extends AbstractVerticle {
3028

@@ -70,7 +68,7 @@ private Future<HttpServer> createRouterAndStartServer(JsonObject config) {
7068
JsonObject props = new JsonObject();
7169

7270
String topic = config.getString("topic");
73-
71+
7472
props.put("topic", topic);
7573
props.put("producerPath", PRODUCE_PATH);
7674
props.put("consumerPath", CONSUME_PATH);
@@ -94,7 +92,7 @@ private Future<HttpServer> startWebSocket(Router router) {
9492
return vertx.createHttpServer(new HttpServerOptions().setRegisterWebSocketWriteHandlers(true))
9593
.requestHandler(router)
9694
.webSocketHandler(this::handleWebSocket)
97-
.listen(8080)
95+
.listen(8080, "0.0.0.0")
9896
.onSuccess(ok -> logger.info("🚀 WebSocketServer started"))
9997
.onFailure(err -> logger.error("❌ WebSocketServer failed to listen", err));
10098
}
@@ -140,11 +138,7 @@ private void handleProduceSocket(ServerWebSocket webSocket) {
140138

141139
private void handleConsumeSocket(ServerWebSocket webSocket) {
142140
KafkaConsumer<String, JsonObject> kafkaConsumer = KafkaConsumer.create(vertx, kafkaConfig);
143-
144-
kafkaConsumer.exceptionHandler(err -> logger.error("Kafka error", err));
145-
146141
String topic = kafkaConfig.get(Main.TOPIC_KEY);
147-
TopicPartition topicPartition = new TopicPartition().setTopic(topic);
148142

149143
kafkaConsumer.handler(record -> {
150144
JsonObject payload = new JsonObject()
@@ -157,16 +151,21 @@ private void handleConsumeSocket(ServerWebSocket webSocket) {
157151
vertx.eventBus().send(webSocket.textHandlerID(), payload.encode());
158152
});
159153

154+
kafkaConsumer.subscribe(topic)
155+
.onSuccess(v -> {
156+
logger.info("Subscribed to {}", topic);
157+
})
158+
.onFailure(err -> logger.error("Could not subscribe to {}", topic, err));
159+
160160
webSocket.handler(buffer -> {
161161
String action = buffer.toJsonObject().getString(ACTION, "none");
162+
162163
if (START_ACTION.equals(action)) {
163-
kafkaConsumer.subscription()
164-
.compose(sub -> (sub.size() > 0) ? kafkaConsumer.resume(topicPartition) : kafkaConsumer.subscribe(topic))
165-
.onSuccess(ok -> logger.info("Subscribed to {}", topic))
166-
.onFailure(err -> logger.error("Could not subscribe to {}", topic, err));
164+
kafkaConsumer.resume();
165+
logger.info("Consumer resumed");
167166
} else if (STOP_ACTION.equals(action)) {
168-
kafkaConsumer.pause(topicPartition)
169-
.onFailure(err -> logger.error("Cannot pause consumer", err));
167+
kafkaConsumer.pause();
168+
logger.info("Consumer paused");
170169
}
171170
});
172171

0 commit comments

Comments
 (0)