|
45 | 45 | import java.util.List; |
46 | 46 | import java.util.Map; |
47 | 47 | import java.util.Properties; |
48 | | -import java.util.Set; |
49 | 48 | import java.util.concurrent.Future; |
50 | 49 |
|
51 | 50 | /** |
@@ -188,27 +187,43 @@ short getEpoch() { |
188 | 187 | */ |
189 | 188 | private void flushNewPartitions() { |
190 | 189 | LOG.info("Flushing new partitions"); |
191 | | - Object transactionManager = getValue(kafkaProducer, "transactionManager"); |
192 | | - Set<TopicPartition> newPartitionsInTransaction = |
193 | | - (Set<TopicPartition>) getValue(transactionManager, "newPartitionsInTransaction"); |
194 | | - if (!newPartitionsInTransaction.isEmpty()) { |
195 | | - TransactionalRequestResult result = enqueueNewPartitions(); |
196 | | - Object sender = getValue(kafkaProducer, "sender"); |
197 | | - invoke(sender, "wakeup"); |
198 | | - result.await(); |
199 | | - } |
| 190 | + TransactionalRequestResult result = enqueueNewPartitions(); |
| 191 | + Object sender = getValue(kafkaProducer, "sender"); |
| 192 | + invoke(sender, "wakeup"); |
| 193 | + result.await(); |
200 | 194 | } |
201 | 195 |
|
202 | 196 | private synchronized TransactionalRequestResult enqueueNewPartitions() { |
203 | 197 | Object transactionManager = getValue(kafkaProducer, "transactionManager"); |
204 | | - Object txnRequestHandler = invoke(transactionManager, "addPartitionsToTransactionHandler"); |
205 | | - invoke(transactionManager, |
206 | | - "enqueueRequest", |
207 | | - new Class[] {txnRequestHandler.getClass().getSuperclass()}, |
208 | | - new Object[] {txnRequestHandler}); |
209 | | - return (TransactionalRequestResult) getValue(txnRequestHandler, |
210 | | - txnRequestHandler.getClass().getSuperclass(), |
211 | | - "result"); |
| 198 | + synchronized (transactionManager) { |
| 199 | + Object newPartitionsInTransaction = |
| 200 | + getValue(transactionManager, "newPartitionsInTransaction"); |
| 201 | + Object newPartitionsInTransactionIsEmpty = |
| 202 | + invoke(newPartitionsInTransaction, "isEmpty"); |
| 203 | + TransactionalRequestResult result; |
| 204 | + if (newPartitionsInTransactionIsEmpty instanceof Boolean |
| 205 | + && !((Boolean) newPartitionsInTransactionIsEmpty)) { |
| 206 | + Object txnRequestHandler = |
| 207 | + invoke(transactionManager, "addPartitionsToTransactionHandler"); |
| 208 | + invoke( |
| 209 | + transactionManager, |
| 210 | + "enqueueRequest", |
| 211 | + new Class[]{txnRequestHandler.getClass().getSuperclass()}, |
| 212 | + new Object[]{txnRequestHandler}); |
| 213 | + |
| 214 | + result = (TransactionalRequestResult) |
| 215 | + getValue( |
| 216 | + txnRequestHandler, |
| 217 | + txnRequestHandler.getClass().getSuperclass(), |
| 218 | + "result"); |
| 219 | + } else { |
| 220 | + // we don't have an operation but this operation string is also used in |
| 221 | + // addPartitionsToTransactionHandler. |
| 222 | + result = new TransactionalRequestResult("AddPartitionsToTxn"); |
| 223 | + result.done(); |
| 224 | + } |
| 225 | + return result; |
| 226 | + } |
212 | 227 | } |
213 | 228 |
|
214 | 229 | @SuppressWarnings("unchecked") private static Enum<?> getEnum(String enumFullName) { |
|
0 commit comments