6161import org .springframework .kafka .transaction .KafkaAwareTransactionManager ;
6262import org .springframework .scheduling .concurrent .ThreadPoolTaskExecutor ;
6363import org .springframework .transaction .support .TransactionSynchronizationManager ;
64+ import org .springframework .util .CollectionUtils ;
6465
6566import static org .assertj .core .api .Assertions .assertThat ;
6667import static org .mockito .ArgumentMatchers .any ;
@@ -1289,15 +1290,16 @@ void pruneRevokedPartitionsFromPendingOutOfOrderCommitsCoopAssignor() throws Int
12891290 @ SuppressWarnings ({ "unchecked" , "rawtypes" })
12901291 @ Test
12911292 void removeOffsetsInBatchForRetryRecords () throws InterruptedException {
1292- TopicPartition topicPartition = new TopicPartition ("foo" , 0 );
1293+ String retryTopic = "retry-offsets-topic" ;
1294+ TopicPartition topicPartition = new TopicPartition (retryTopic , 0 );
12931295 Map <TopicPartition , List <ConsumerRecord <String , String >>> recordMap = new LinkedHashMap <>();
12941296 recordMap .put (topicPartition ,
1295- List .of (new ConsumerRecord ( "foo" , 0 , 0 , null , "bar -0" ),
1296- new ConsumerRecord ( "foo" , 0 , 1 , null , "bar -1" )));
1297- ConsumerRecords polledRecords = new ConsumerRecords <>(recordMap , Map .of ());
1297+ List .of (new ConsumerRecord <>( retryTopic , 0 , 0 , null , "failed-record -0" ),
1298+ new ConsumerRecord <>( retryTopic , 0 , 1 , null , "failed-record -1" )));
1299+ ConsumerRecords < String , String > polledRecords = new ConsumerRecords <>(recordMap , Map .of ());
12981300 AtomicInteger pollCount = new AtomicInteger ();
12991301
1300- Consumer consumer = mock (Consumer . class );
1302+ Consumer < String , String > consumer = mock ();
13011303 AtomicReference <ConsumerRebalanceListener > rebal = new AtomicReference <>();
13021304 CountDownLatch subscribeLatch = new CountDownLatch (1 );
13031305 willAnswer (invocation -> {
@@ -1310,21 +1312,21 @@ void removeOffsetsInBatchForRetryRecords() throws InterruptedException {
13101312 rebal .get ().onPartitionsAssigned (List .of (topicPartition ));
13111313 return polledRecords ;
13121314 }
1313- Thread .sleep (50 );
13141315 return ConsumerRecords .empty ();
13151316 }).given (consumer ).poll (any ());
1316- ConsumerFactory cf = mock (ConsumerFactory . class );
1317+ ConsumerFactory < String , String > cf = mock ();
13171318 given (cf .createConsumer (any (), any (), any (), any ())).willReturn (consumer );
13181319 given (cf .getConfigurationProperties ())
13191320 .willReturn (Collections .singletonMap (ConsumerConfig .AUTO_OFFSET_RESET_CONFIG , "earliest" ));
1320- ContainerProperties containerProperties = new ContainerProperties ("foo" );
1321+ ContainerProperties containerProperties = new ContainerProperties (retryTopic );
13211322 containerProperties .setGroupId ("grp" );
13221323 containerProperties .setAckMode (AckMode .MANUAL );
13231324 containerProperties .setAsyncAcks (true );
13241325 containerProperties .setMessageListener ((MessageListener ) rec -> {
13251326 throw new RuntimeException ("test" );
13261327 });
1327- ConcurrentMessageListenerContainer container = new ConcurrentMessageListenerContainer (cf , containerProperties );
1328+ ConcurrentMessageListenerContainer <String , String > container = new ConcurrentMessageListenerContainer <>(cf ,
1329+ containerProperties );
13281330 CountDownLatch handleRemainingLatch = new CountDownLatch (1 );
13291331 container .setCommonErrorHandler (new CommonErrorHandler () {
13301332
@@ -1346,14 +1348,14 @@ public void handleRemaining(Exception thrownException, List<ConsumerRecord<?, ?>
13461348 try {
13471349 assertThat (subscribeLatch .await (10 , TimeUnit .SECONDS )).isTrue ();
13481350 assertThat (handleRemainingLatch .await (10 , TimeUnit .SECONDS )).isTrue ();
1349- KafkaMessageListenerContainer child = (KafkaMessageListenerContainer ) KafkaTestUtils
1351+ KafkaMessageListenerContainer < String , String > child = (KafkaMessageListenerContainer < String , String > ) KafkaTestUtils
13501352 .getPropertyValue (container , "containers" , List .class ).get (0 );
1351- Map offsets = null ;
1352- Map deferred = null ;
1353+ Map <?, ?> offsets = null ;
1354+ Map <?, ?> deferred = null ;
13531355 for (int i = 0 ; i < 20 ; i ++) {
13541356 offsets = KafkaTestUtils .getPropertyValue (child , "listenerConsumer.offsetsInThisBatch" , Map .class );
13551357 deferred = KafkaTestUtils .getPropertyValue (child , "listenerConsumer.deferredOffsets" , Map .class );
1356- if (( offsets == null || offsets .isEmpty ()) && ( deferred == null || deferred .isEmpty () )) {
1358+ if (CollectionUtils .isEmpty (offsets ) && CollectionUtils .isEmpty (deferred )) {
13571359 break ;
13581360 }
13591361 Thread .sleep (50 );
0 commit comments