77
88import io .vertx .core .AbstractVerticle ;
99import io .vertx .core .Promise ;
10- import io .vertx .core .TimeoutStream ;
1110import io .vertx .core .eventbus .Message ;
1211import io .vertx .core .json .JsonObject ;
1312import io .vertx .kafka .client .producer .KafkaProducer ;
1615import org .slf4j .Logger ;
1716import org .slf4j .LoggerFactory ;
1817
19- import java .util .HashMap ;
18+ import java .time .Duration ;
19+ import java .util .Map ;
20+ import java .util .stream .Collectors ;
2021
2122public class PeriodicProducer extends AbstractVerticle {
2223
2324 private static final Logger logger = LoggerFactory .getLogger (PeriodicProducer .class );
24- private String customMessage ;
25+ private static final long PRODUCE_INTERVAL_MS = Duration .ofSeconds (2 ).toMillis ();
26+
27+ private KafkaProducer <String , String > kafkaProducer ;
28+ private static final long TIMER_NOT_SET = -1L ;
29+ private long timerId = TIMER_NOT_SET ;
30+ private String customMessage = "Hello World" ;
2531
2632 @ Override
2733 public void start (Promise <Void > startPromise ) {
28- String propertiesPath = System .getProperty (Main .PROPERTIES_PATH_ENV_NAME , Main .DEFAULT_PROPERTIES_PATH );
34+ String propertiesPath = System .getProperty (
35+ Main .PROPERTIES_PATH_ENV_NAME ,
36+ Main .DEFAULT_PROPERTIES_PATH
37+ );
38+
2939 Main .loadKafkaConfig (vertx , propertiesPath )
3040 .onSuccess (config -> {
31- HashMap <String , String > props = config .mapTo (HashMap .class );
32- setup (props );
41+ setup (config );
3342 startPromise .complete ();
3443 })
3544 .onFailure (startPromise ::fail );
3645 }
3746
38- private void setup (HashMap <String , String > props ) {
39- // Don't retry and only wait 10 secs for partition info as this is a demo app
47+ private void setup (JsonObject config ) {
48+ // Convert JsonObject config -> Map<String,String>
49+ Map <String , String > props = config .getMap ()
50+ .entrySet ()
51+ .stream ()
52+ .collect (Collectors .toMap (
53+ Map .Entry ::getKey ,
54+ e -> String .valueOf (e .getValue ())
55+ ));
56+
4057 props .put (ProducerConfig .RETRIES_CONFIG , "0" );
4158 props .put (ProducerConfig .MAX_BLOCK_MS_CONFIG , "10000" );
42- KafkaProducer <String , String > kafkaProducer = KafkaProducer .create (vertx , props );
4359
44- kafkaProducer .exceptionHandler (err -> logger .debug ("Kafka error: {}" , err ));
60+ kafkaProducer = KafkaProducer .create (vertx , props );
61+ kafkaProducer .exceptionHandler (err -> logger .error ("Kafka producer error" , err ));
4562
46- TimeoutStream timerStream = vertx .periodicStream ( 2000 );
47- timerStream . handler ( tick -> produceKafkaRecord ( kafkaProducer , props . get (Main .TOPIC_KEY )));
48- timerStream . pause ( );
63+ vertx .eventBus ()
64+ .< JsonObject > consumer (Main .PERIODIC_PRODUCER_ADDRESS ,
65+ msg -> handleCommand ( props , msg ) );
4966
50- vertx .eventBus ().<JsonObject >consumer (Main .PERIODIC_PRODUCER_ADDRESS , message -> handleCommand (timerStream , message ));
5167 logger .info ("🚀 PeriodicProducer started" );
5268 }
5369
54- private void handleCommand (TimeoutStream timerStream , Message <JsonObject > message ) {
70+ private void handleCommand (Map < String , String > props , Message <JsonObject > message ) {
5571 String command = message .body ().getString (WebSocketServer .ACTION , "none" );
56- if (WebSocketServer .START_ACTION .equals (command )) {
57- logger .info ("Producing Kafka records" );
58- customMessage = message .body ().getString ("custom" , "Hello World" );
59- timerStream .resume ();
60- } else if (WebSocketServer .STOP_ACTION .equals (command )) {
61- logger .info ("Stopping producing Kafka records" );
62- timerStream .pause ();
72+ switch (command ) {
73+ case WebSocketServer .START_ACTION :
74+ customMessage = message .body ().getString ("custom" , "Hello World" );
75+ if (timerId == TIMER_NOT_SET ) {
76+ timerId = vertx .setPeriodic (PRODUCE_INTERVAL_MS ,
77+ id -> produceKafkaRecord (props .get (Main .TOPIC_KEY )));
78+ logger .info ("Producing Kafka records with message template: {}" , customMessage );
79+ }
80+ break ;
81+
82+ case WebSocketServer .STOP_ACTION :
83+ if (timerId != TIMER_NOT_SET ) {
84+ vertx .cancelTimer (timerId );
85+ timerId = TIMER_NOT_SET ;
86+ logger .info ("Stopped producing Kafka records" );
87+ }
88+ break ;
89+
90+ default :
91+ logger .warn ("Unknown command received: {}" , command );
6392 }
6493 }
6594
66- private void produceKafkaRecord (KafkaProducer < String , String > kafkaProducer , String topic ) {
95+ private void produceKafkaRecord (String topic ) {
6796 String payload = customMessage ;
6897 KafkaProducerRecord <String , String > record = KafkaProducerRecord .create (topic , payload );
6998 logger .debug ("Producing record to topic {} with payload {}" , topic , payload );
@@ -84,4 +113,12 @@ private void produceKafkaRecord(KafkaProducer<String, String> kafkaProducer, Str
84113 vertx .eventBus ().send (Main .PERIODIC_PRODUCER_BROADCAST , new JsonObject ().put ("status" , "ERROR" ));
85114 });
86115 }
116+
117+ @ Override
118+ public void stop () {
119+ if (kafkaProducer != null ) {
120+ kafkaProducer .close ()
121+ .onComplete (ar -> logger .info ("KafkaProducer closed: {}" , ar .succeeded () ? "success" : ar .cause ()));
122+ }
123+ }
87124}
0 commit comments