77
88import io .vertx .core .AbstractVerticle ;
99import io .vertx .core .Promise ;
10- import io .vertx .core .TimeoutStream ;
1110import io .vertx .core .eventbus .Message ;
1211import io .vertx .core .json .JsonObject ;
1312import io .vertx .kafka .client .producer .KafkaProducer ;
1615import org .slf4j .Logger ;
1716import org .slf4j .LoggerFactory ;
1817
19- import java .util .HashMap ;
18+ import java .time .Duration ;
19+ import java .util .Map ;
20+ import java .util .stream .Collectors ;
2021
2122public class PeriodicProducer extends AbstractVerticle {
2223
2324 private static final Logger logger = LoggerFactory .getLogger (PeriodicProducer .class );
25+ private static final long PRODUCE_INTERVAL_MS = Duration .ofSeconds (2 ).toMillis ();
26+
27+ private KafkaProducer <String , String > kafkaProducer ;
28+ private long timerId = -1 ;
2429 private String customMessage ;
2530
2631 @ Override
2732 public void start (Promise <Void > startPromise ) {
28- String propertiesPath = System .getProperty (Main .PROPERTIES_PATH_ENV_NAME , Main .DEFAULT_PROPERTIES_PATH );
33+ String propertiesPath = System .getProperty (
34+ Main .PROPERTIES_PATH_ENV_NAME ,
35+ Main .DEFAULT_PROPERTIES_PATH
36+ );
37+
2938 Main .loadKafkaConfig (vertx , propertiesPath )
3039 .onSuccess (config -> {
31- HashMap <String , String > props = config .mapTo (HashMap .class );
32- setup (props );
40+ setup (config ); // pass JsonObject directly
3341 startPromise .complete ();
3442 })
3543 .onFailure (startPromise ::fail );
3644 }
3745
38- private void setup (HashMap <String , String > props ) {
39- // Don't retry and only wait 10 secs for partition info as this is a demo app
46+ private void setup (JsonObject config ) {
47+ // Convert JsonObject config -> Map<String,String>
48+ Map <String , String > props = config .getMap ()
49+ .entrySet ()
50+ .stream ()
51+ .collect (Collectors .toMap (
52+ Map .Entry ::getKey ,
53+ e -> String .valueOf (e .getValue ()) // force everything to String
54+ ));
55+
56+ // Demo-friendly settings
4057 props .put (ProducerConfig .RETRIES_CONFIG , "0" );
4158 props .put (ProducerConfig .MAX_BLOCK_MS_CONFIG , "10000" );
42- KafkaProducer <String , String > kafkaProducer = KafkaProducer .create (vertx , props );
4359
44- kafkaProducer .exceptionHandler (err -> logger .debug ("Kafka error: {}" , err ));
60+ // Explicit serializers
61+ kafkaProducer = KafkaProducer .create (vertx , props , String .class , String .class );
62+ kafkaProducer .exceptionHandler (err -> logger .error ("Kafka producer error" , err ));
4563
46- TimeoutStream timerStream = vertx .periodicStream ( 2000 );
47- timerStream . handler ( tick -> produceKafkaRecord ( kafkaProducer , props . get (Main .TOPIC_KEY )));
48- timerStream . pause ( );
64+ vertx .eventBus ()
65+ .< JsonObject > consumer (Main .PERIODIC_PRODUCER_ADDRESS ,
66+ msg -> handleCommand ( props , msg ) );
4967
50- vertx .eventBus ().<JsonObject >consumer (Main .PERIODIC_PRODUCER_ADDRESS , message -> handleCommand (timerStream , message ));
5168 logger .info ("🚀 PeriodicProducer started" );
5269 }
5370
54- private void handleCommand (TimeoutStream timerStream , Message <JsonObject > message ) {
71+ private void handleCommand (Map < String , String > props , Message <JsonObject > message ) {
5572 String command = message .body ().getString (WebSocketServer .ACTION , "none" );
56- if (WebSocketServer .START_ACTION .equals (command )) {
57- logger .info ("Producing Kafka records" );
58- customMessage = message .body ().getString ("custom" , "Hello World" );
59- timerStream .resume ();
60- } else if (WebSocketServer .STOP_ACTION .equals (command )) {
61- logger .info ("Stopping producing Kafka records" );
62- timerStream .pause ();
73+ switch (command ) {
74+ case WebSocketServer .START_ACTION :
75+ customMessage = message .body ().getString ("custom" , "Hello World" );
76+ if (timerId == -1 ) {
77+ timerId = vertx .setPeriodic (PRODUCE_INTERVAL_MS ,
78+ id -> produceKafkaRecord (props .get (Main .TOPIC_KEY ).toString ()));
79+ logger .info ("Producing Kafka records with message template: {}" , customMessage );
80+ }
81+ break ;
82+
83+ case WebSocketServer .STOP_ACTION :
84+ if (timerId != -1 ) {
85+ vertx .cancelTimer (timerId );
86+ timerId = -1 ;
87+ logger .info ("Stopped producing Kafka records" );
88+ }
89+ break ;
90+
91+ default :
92+ logger .warn ("Unknown command received: {}" , command );
6393 }
6494 }
6595
66- private void produceKafkaRecord (KafkaProducer < String , String > kafkaProducer , String topic ) {
96+ private void produceKafkaRecord (String topic ) {
6797 String payload = customMessage ;
6898 KafkaProducerRecord <String , String > record = KafkaProducerRecord .create (topic , payload );
6999 logger .debug ("Producing record to topic {} with payload {}" , topic , payload );
@@ -84,4 +114,12 @@ private void produceKafkaRecord(KafkaProducer<String, String> kafkaProducer, Str
84114 vertx .eventBus ().send (Main .PERIODIC_PRODUCER_BROADCAST , new JsonObject ().put ("status" , "ERROR" ));
85115 });
86116 }
117+
118+ @ Override
119+ public void stop () {
120+ if (kafkaProducer != null ) {
121+ kafkaProducer .close ()
122+ .onComplete (ar -> logger .info ("KafkaProducer closed: {}" , ar .succeeded () ? "success" : ar .cause ()));
123+ }
124+ }
87125}
0 commit comments