@@ -188,27 +188,43 @@ short getEpoch() {
188188 */
189189 private void flushNewPartitions () {
190190 LOG .info ("Flushing new partitions" );
191- Object transactionManager = getValue (kafkaProducer , "transactionManager" );
192- Set <TopicPartition > newPartitionsInTransaction =
193- (Set <TopicPartition >) getValue (transactionManager , "newPartitionsInTransaction" );
194- if (!newPartitionsInTransaction .isEmpty ()) {
195- TransactionalRequestResult result = enqueueNewPartitions ();
196- Object sender = getValue (kafkaProducer , "sender" );
197- invoke (sender , "wakeup" );
198- result .await ();
199- }
191+ TransactionalRequestResult result = enqueueNewPartitions ();
192+ Object sender = getValue (kafkaProducer , "sender" );
193+ invoke (sender , "wakeup" );
194+ result .await ();
200195 }
201196
202197 private synchronized TransactionalRequestResult enqueueNewPartitions () {
203198 Object transactionManager = getValue (kafkaProducer , "transactionManager" );
204- Object txnRequestHandler = invoke (transactionManager , "addPartitionsToTransactionHandler" );
205- invoke (transactionManager ,
206- "enqueueRequest" ,
207- new Class [] {txnRequestHandler .getClass ().getSuperclass ()},
208- new Object [] {txnRequestHandler });
209- return (TransactionalRequestResult ) getValue (txnRequestHandler ,
210- txnRequestHandler .getClass ().getSuperclass (),
211- "result" );
199+ synchronized (transactionManager ) {
200+ Object newPartitionsInTransaction =
201+ getValue (transactionManager , "newPartitionsInTransaction" );
202+ Object newPartitionsInTransactionIsEmpty =
203+ invoke (newPartitionsInTransaction , "isEmpty" );
204+ TransactionalRequestResult result ;
205+ if (newPartitionsInTransactionIsEmpty instanceof Boolean
206+ && !((Boolean ) newPartitionsInTransactionIsEmpty )) {
207+ Object txnRequestHandler =
208+ invoke (transactionManager , "addPartitionsToTransactionHandler" );
209+ invoke (
210+ transactionManager ,
211+ "enqueueRequest" ,
212+ new Class []{txnRequestHandler .getClass ().getSuperclass ()},
213+ new Object []{txnRequestHandler });
214+
215+ result = (TransactionalRequestResult )
216+ getValue (
217+ txnRequestHandler ,
218+ txnRequestHandler .getClass ().getSuperclass (),
219+ "result" );
220+ } else {
221+ // we don't have an operation but this operation string is also used in
222+ // addPartitionsToTransactionHandler.
223+ result = new TransactionalRequestResult ("AddPartitionsToTxn" );
224+ result .done ();
225+ }
226+ return result ;
227+ }
212228 }
213229
214230 @ SuppressWarnings ("unchecked" ) private static Enum <?> getEnum (String enumFullName ) {
0 commit comments