77
88import io .vertx .core .AbstractVerticle ;
99import io .vertx .core .Promise ;
10- import io .vertx .core .TimeoutStream ;
1110import io .vertx .core .eventbus .Message ;
1211import io .vertx .core .json .JsonObject ;
1312import io .vertx .kafka .client .producer .KafkaProducer ;
1615import org .slf4j .Logger ;
1716import org .slf4j .LoggerFactory ;
1817
19- import java .util .HashMap ;
18+ import java .time .Duration ;
19+ import java .util .Map ;
20+ import java .util .stream .Collectors ;
2021
2122public class PeriodicProducer extends AbstractVerticle {
2223
2324 private static final Logger logger = LoggerFactory .getLogger (PeriodicProducer .class );
25+ private static final long PRODUCE_INTERVAL_MS = Duration .ofSeconds (2 ).toMillis ();
26+
27+ private KafkaProducer <String , String > kafkaProducer ;
28+ private long timerId = -1 ;
2429 private String customMessage ;
2530
2631 @ Override
2732 public void start (Promise <Void > startPromise ) {
28- String propertiesPath = System .getProperty (Main .PROPERTIES_PATH_ENV_NAME , Main .DEFAULT_PROPERTIES_PATH );
33+ String propertiesPath = System .getProperty (
34+ Main .PROPERTIES_PATH_ENV_NAME ,
35+ Main .DEFAULT_PROPERTIES_PATH
36+ );
37+
2938 Main .loadKafkaConfig (vertx , propertiesPath )
3039 .onSuccess (config -> {
31- HashMap <String , String > props = config .mapTo (HashMap .class );
32- setup (props );
40+ setup (config );
3341 startPromise .complete ();
3442 })
3543 .onFailure (startPromise ::fail );
3644 }
3745
38- private void setup (HashMap <String , String > props ) {
39- // Don't retry and only wait 10 secs for partition info as this is a demo app
46+ private void setup (JsonObject config ) {
47+ // Convert JsonObject config -> Map<String,String>
48+ Map <String , String > props = config .getMap ()
49+ .entrySet ()
50+ .stream ()
51+ .collect (Collectors .toMap (
52+ Map .Entry ::getKey ,
53+ e -> String .valueOf (e .getValue ())
54+ ));
55+
4056 props .put (ProducerConfig .RETRIES_CONFIG , "0" );
4157 props .put (ProducerConfig .MAX_BLOCK_MS_CONFIG , "10000" );
42- KafkaProducer <String , String > kafkaProducer = KafkaProducer .create (vertx , props );
4358
44- kafkaProducer .exceptionHandler (err -> logger .debug ("Kafka error: {}" , err ));
59+ kafkaProducer = KafkaProducer .create (vertx , props , String .class , String .class );
60+ kafkaProducer .exceptionHandler (err -> logger .error ("Kafka producer error" , err ));
4561
46- TimeoutStream timerStream = vertx .periodicStream ( 2000 );
47- timerStream . handler ( tick -> produceKafkaRecord ( kafkaProducer , props . get (Main .TOPIC_KEY )));
48- timerStream . pause ( );
62+ vertx .eventBus ()
63+ .< JsonObject > consumer (Main .PERIODIC_PRODUCER_ADDRESS ,
64+ msg -> handleCommand ( props , msg ) );
4965
50- vertx .eventBus ().<JsonObject >consumer (Main .PERIODIC_PRODUCER_ADDRESS , message -> handleCommand (timerStream , message ));
5166 logger .info ("🚀 PeriodicProducer started" );
5267 }
5368
54- private void handleCommand (TimeoutStream timerStream , Message <JsonObject > message ) {
69+ private void handleCommand (Map < String , String > props , Message <JsonObject > message ) {
5570 String command = message .body ().getString (WebSocketServer .ACTION , "none" );
56- if (WebSocketServer .START_ACTION .equals (command )) {
57- logger .info ("Producing Kafka records" );
58- customMessage = message .body ().getString ("custom" , "Hello World" );
59- timerStream .resume ();
60- } else if (WebSocketServer .STOP_ACTION .equals (command )) {
61- logger .info ("Stopping producing Kafka records" );
62- timerStream .pause ();
71+ switch (command ) {
72+ case WebSocketServer .START_ACTION :
73+ customMessage = message .body ().getString ("custom" , "Hello World" );
74+ if (timerId == -1 ) {
75+ timerId = vertx .setPeriodic (PRODUCE_INTERVAL_MS ,
76+ id -> produceKafkaRecord (props .get (Main .TOPIC_KEY )));
77+ logger .info ("Producing Kafka records with message template: {}" , customMessage );
78+ }
79+ break ;
80+
81+ case WebSocketServer .STOP_ACTION :
82+ if (timerId != -1 ) {
83+ vertx .cancelTimer (timerId );
84+ timerId = -1 ;
85+ logger .info ("Stopped producing Kafka records" );
86+ }
87+ break ;
88+
89+ default :
90+ logger .warn ("Unknown command received: {}" , command );
6391 }
6492 }
6593
66- private void produceKafkaRecord (KafkaProducer < String , String > kafkaProducer , String topic ) {
94+ private void produceKafkaRecord (String topic ) {
6795 String payload = customMessage ;
6896 KafkaProducerRecord <String , String > record = KafkaProducerRecord .create (topic , payload );
6997 logger .debug ("Producing record to topic {} with payload {}" , topic , payload );
@@ -84,4 +112,12 @@ private void produceKafkaRecord(KafkaProducer<String, String> kafkaProducer, Str
84112 vertx .eventBus ().send (Main .PERIODIC_PRODUCER_BROADCAST , new JsonObject ().put ("status" , "ERROR" ));
85113 });
86114 }
115+
116+ @ Override
117+ public void stop () {
118+ if (kafkaProducer != null ) {
119+ kafkaProducer .close ()
120+ .onComplete (ar -> logger .info ("KafkaProducer closed: {}" , ar .succeeded () ? "success" : ar .cause ()));
121+ }
122+ }
87123}
0 commit comments