Skip to content

Commit 90ab120

Browse files
author
Konstantin
committed
Optimize and refactor project
1 parent fb2e1b5 commit 90ab120

File tree

13 files changed

+58
-131
lines changed

13 files changed

+58
-131
lines changed

springboot-kafka-real-world-project/kafka-consumer-database/pom.xml

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
<modelVersion>4.0.0</modelVersion>
1111
<packaging>jar</packaging>
1212
<artifactId>kafka-consumer-database</artifactId>
13+
1314
<dependencies>
1415
<dependency>
1516
<groupId>org.springframework.boot</groupId>
@@ -22,9 +23,5 @@
2223
<scope>runtime</scope>
2324
</dependency>
2425
</dependencies>
25-
<properties>
26-
<maven.compiler.source>17</maven.compiler.source>
27-
<maven.compiler.target>17</maven.compiler.target>
28-
</properties>
2926

3027
</project>

springboot-kafka-real-world-project/kafka-consumer-database/src/main/java/net/javaguides/springboot/KafkaDatabaseConsumer.java

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,24 @@
11
package net.javaguides.springboot;
22

3+
import lombok.RequiredArgsConstructor;
4+
import lombok.extern.slf4j.Slf4j;
35
import net.javaguides.springboot.entity.WikimediaData;
46
import net.javaguides.springboot.repository.WikimediaDataRepository;
5-
import org.slf4j.Logger;
6-
import org.slf4j.LoggerFactory;
77
import org.springframework.kafka.annotation.KafkaListener;
88
import org.springframework.stereotype.Service;
99

1010
@Service
11+
@Slf4j
12+
@RequiredArgsConstructor
1113
public class KafkaDatabaseConsumer {
12-
13-
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaDatabaseConsumer.class);
14-
15-
private WikimediaDataRepository dataRepository;
16-
17-
public KafkaDatabaseConsumer(WikimediaDataRepository dataRepository) {
18-
this.dataRepository = dataRepository;
19-
}
14+
private final WikimediaDataRepository dataRepository;
2015

2116
@KafkaListener(
2217
topics = "${spring.kafka.topic.name}",
2318
groupId = "${spring.kafka.consumer.group-id}"
2419
)
25-
public void consume(String eventMessage){
26-
27-
LOGGER.info(String.format("Event message received -> %s", eventMessage));
20+
public void consume(String eventMessage) {
21+
log.info("Event message received -> {}", eventMessage);
2822

2923
WikimediaData wikimediaData = new WikimediaData();
3024
wikimediaData.setWikiEventData(eventMessage);

springboot-kafka-real-world-project/kafka-consumer-database/src/main/resources/application.properties

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
spring.kafka.consumer.boostrap-servers: localhost:9092
2-
spring.kafka.consumer.group-id: myGroup
3-
spring.kafka.consumer.auto-offset-reset: earliest
4-
spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
5-
spring.kafka.consumer.value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
1+
spring.kafka.consumer.boostrap-servers=localhost:9092
2+
spring.kafka.consumer.group-id=myGroup
3+
spring.kafka.consumer.auto-offset-reset=earliest
4+
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
5+
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
66

77
spring.datasource.url=jdbc:mysql://localhost:3306/wikimedia
88
spring.datasource.username=root

springboot-kafka-real-world-project/kafka-producer-wikimedia/pom.xml

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
<modelVersion>4.0.0</modelVersion>
1111
<packaging>jar</packaging>
1212
<artifactId>kafka-producer-wikimedia</artifactId>
13+
1314
<dependencies>
1415
<!-- https://mvnrepository.com/artifact/com.launchdarkly/okhttp-eventsource -->
1516
<dependency>
@@ -25,23 +26,10 @@
2526
<version>4.9.3</version>
2627
</dependency>
2728

28-
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
29-
<dependency>
30-
<groupId>com.fasterxml.jackson.core</groupId>
31-
<artifactId>jackson-core</artifactId>
32-
<version>2.13.2</version>
33-
</dependency>
34-
3529
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
3630
<dependency>
3731
<groupId>com.fasterxml.jackson.core</groupId>
3832
<artifactId>jackson-databind</artifactId>
39-
<version>2.13.2.2</version>
4033
</dependency>
4134
</dependencies>
42-
<properties>
43-
<maven.compiler.source>17</maven.compiler.source>
44-
<maven.compiler.target>17</maven.compiler.target>
45-
</properties>
46-
4735
</project>

springboot-kafka-real-world-project/kafka-producer-wikimedia/src/main/java/net/javaguides/springboot/KafkaTopicConfig.java

Lines changed: 0 additions & 20 deletions
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,21 @@
11
package net.javaguides.springboot;
22

3-
import org.springframework.beans.factory.annotation.Autowired;
4-
import org.springframework.boot.CommandLineRunner;
3+
import org.springframework.boot.ApplicationRunner;
54
import org.springframework.boot.SpringApplication;
65
import org.springframework.boot.autoconfigure.SpringBootApplication;
6+
import org.springframework.context.annotation.Bean;
77

88
@SpringBootApplication
9-
public class SpringBootProducerApplication implements CommandLineRunner {
9+
public class SpringBootProducerApplication {
1010

1111
public static void main(String[] args) {
1212
SpringApplication.run(SpringBootProducerApplication.class);
1313
}
1414

15-
@Autowired
16-
private WikimediaChangesProducer wikimediaChangesProducer;
17-
18-
@Override
19-
public void run(String... args) throws Exception {
20-
wikimediaChangesProducer.sendMessage();
15+
@Bean
16+
ApplicationRunner applicationRunner(WikimediaChangesProducer wikimediaChangesProducer) {
17+
return args -> {
18+
wikimediaChangesProducer.sendMessage();
19+
};
2120
}
2221
}

springboot-kafka-real-world-project/kafka-producer-wikimedia/src/main/java/net/javaguides/springboot/WikimediaChangesHandler.java

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,21 +2,15 @@
22

33
import com.launchdarkly.eventsource.EventHandler;
44
import com.launchdarkly.eventsource.MessageEvent;
5-
import org.slf4j.Logger;
6-
import org.slf4j.LoggerFactory;
5+
import lombok.RequiredArgsConstructor;
6+
import lombok.extern.slf4j.Slf4j;
77
import org.springframework.kafka.core.KafkaTemplate;
88

9+
@Slf4j
10+
@RequiredArgsConstructor
911
public class WikimediaChangesHandler implements EventHandler {
10-
11-
private static final Logger LOGGER = LoggerFactory.getLogger(WikimediaChangesHandler.class);
12-
13-
private KafkaTemplate<String, String> kafkaTemplate;
14-
private String topic;
15-
16-
public WikimediaChangesHandler(KafkaTemplate<String, String> kafkaTemplate, String topic) {
17-
this.kafkaTemplate = kafkaTemplate;
18-
this.topic = topic;
19-
}
12+
private final KafkaTemplate<String, String> kafkaTemplate;
13+
private final String topic;
2014

2115
@Override
2216
public void onOpen() throws Exception {
@@ -29,14 +23,13 @@ public void onClosed() throws Exception {
2923
}
3024

3125
@Override
32-
public void onMessage(String s, MessageEvent messageEvent) throws Exception {
33-
LOGGER.info(String.format("event data -> %s", messageEvent.getData()));
34-
26+
public void onMessage(String event, MessageEvent messageEvent) throws Exception {
27+
log.info("event data -> {}", messageEvent.getData());
3528
kafkaTemplate.send(topic, messageEvent.getData());
3629
}
3730

3831
@Override
39-
public void onComment(String s) throws Exception {
32+
public void onComment(String event) throws Exception {
4033

4134
}
4235

springboot-kafka-real-world-project/kafka-producer-wikimedia/src/main/java/net/javaguides/springboot/WikimediaChangesProducer.java

Lines changed: 13 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22

33
import com.launchdarkly.eventsource.EventHandler;
44
import com.launchdarkly.eventsource.EventSource;
5-
import org.slf4j.Logger;
6-
import org.slf4j.LoggerFactory;
5+
import lombok.RequiredArgsConstructor;
6+
import lombok.extern.slf4j.Slf4j;
77
import org.springframework.beans.factory.annotation.Value;
88
import org.springframework.kafka.core.KafkaTemplate;
99
import org.springframework.stereotype.Service;
@@ -12,27 +12,21 @@
1212
import java.util.concurrent.TimeUnit;
1313

1414
@Service
15+
@Slf4j
16+
@RequiredArgsConstructor
1517
public class WikimediaChangesProducer {
16-
17-
@Value("${spring.kafka.topic.name}")
18-
private String topicName;
19-
20-
private static final Logger LOGGER = LoggerFactory.getLogger(WikimediaChangesProducer.class);
21-
22-
private KafkaTemplate<String, String> kafkaTemplate;
23-
24-
public WikimediaChangesProducer(KafkaTemplate<String, String> kafkaTemplate) {
25-
this.kafkaTemplate = kafkaTemplate;
26-
}
18+
@Value("${source.stream.url}")
19+
private String sourceStreamUrl;
20+
private final KafkaTemplate<String, String> kafkaTemplate;
2721

2822
public void sendMessage() throws InterruptedException {
2923
// to read real time stream data from wikimedia, we use event source
30-
EventHandler eventHandler = new WikimediaChangesHandler(kafkaTemplate, topicName);
31-
String url = "https://stream.wikimedia.org/v2/stream/recentchange";
32-
EventSource.Builder builder = new EventSource.Builder(eventHandler, URI.create(url));
33-
EventSource eventSource = builder.build();
34-
eventSource.start();
24+
log.info("Sending data from {} stream", sourceStreamUrl);
25+
EventHandler eventHandler = new WikimediaChangesHandler(kafkaTemplate, kafkaTemplate.getDefaultTopic());
3526

36-
TimeUnit.MINUTES.sleep(10);
27+
try(EventSource eventSource = new EventSource.Builder(eventHandler, URI.create(sourceStreamUrl)).build()) {
28+
eventSource.start();
29+
TimeUnit.MINUTES.sleep(10);
30+
}
3731
}
3832
}
Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
spring.kafka.producer.bootstrap-servers: localhost:9092
2-
spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.StringSerializer
3-
spring.kafka.producer.value-serializer: org.apache.kafka.common.serialization.StringSerializer
1+
spring.kafka.producer.bootstrap-servers=localhost:9092
2+
spring.kafka.template.default-topic=wikimedia_recentchange
3+
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
4+
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
45

5-
spring.kafka.topic.name=wikimedia_recentchange
6+
source.stream.url=https://stream.wikimedia.org/v2/stream/recentchange

springboot-kafka-real-world-project/pom.xml

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,13 @@
1818
<name>springboot-kafka-real-world-project</name>
1919
<description>Demo project for Spring Boot and Kafka</description>
2020
<packaging>pom</packaging>
21+
2122
<properties>
22-
<java.version>11</java.version>
23+
<java.version>17</java.version>
24+
<maven.compiler.source>17</maven.compiler.source>
25+
<maven.compiler.target>17</maven.compiler.target>
2326
</properties>
27+
2428
<dependencies>
2529
<dependency>
2630
<groupId>org.springframework.boot</groupId>
@@ -30,6 +34,10 @@
3034
<groupId>org.springframework.kafka</groupId>
3135
<artifactId>spring-kafka</artifactId>
3236
</dependency>
37+
<dependency>
38+
<groupId>com.fasterxml.jackson.core</groupId>
39+
<artifactId>jackson-databind</artifactId>
40+
</dependency>
3341

3442
<dependency>
3543
<groupId>org.projectlombok</groupId>

0 commit comments

Comments
 (0)