|
22 | 22 | import java.util.Collection;
|
23 | 23 | import java.util.Collections;
|
24 | 24 | import java.util.HashMap;
|
25 |
| -import java.util.Iterator; |
26 | 25 | import java.util.LinkedList;
|
27 | 26 | import java.util.List;
|
28 | 27 | import java.util.Map;
|
29 |
| -import java.util.Map.Entry; |
30 | 28 | import java.util.Optional;
|
| 29 | +import java.util.Set; |
31 | 30 | import java.util.concurrent.ExecutionException;
|
32 | 31 | import java.util.concurrent.TimeUnit;
|
33 | 32 | import java.util.concurrent.TimeoutException;
|
@@ -323,41 +322,27 @@ private void updateClusterId(Admin adminClient) throws InterruptedException, Exe
|
323 | 322 | */
|
324 | 323 | protected Collection<NewTopic> newTopics() {
|
325 | 324 | Assert.state(this.applicationContext != null, "'applicationContext' cannot be null");
|
326 |
| - Map<String, NewTopic> newTopicsMap = new HashMap<>( |
327 |
| - this.applicationContext.getBeansOfType(NewTopic.class, false, false)); |
328 |
| - Map<String, NewTopics> wrappers = this.applicationContext.getBeansOfType(NewTopics.class, false, false); |
329 |
| - AtomicInteger count = new AtomicInteger(); |
330 |
| - wrappers.forEach((name, newTopics) -> { |
331 |
| - newTopics.getNewTopics().forEach(nt -> newTopicsMap.put(name + "#" + count.getAndIncrement(), nt)); |
332 |
| - }); |
333 |
| - Map<String, NewTopic> topicsForRetry = newTopicsMap.entrySet().stream() |
334 |
| - .filter(entry -> entry.getValue() instanceof TopicForRetryable) |
335 |
| - .collect(Collectors.toMap(Entry::getKey, Entry::getValue)); |
336 |
| - for (Entry<String, NewTopic> entry : topicsForRetry.entrySet()) { |
337 |
| - Iterator<Entry<String, NewTopic>> iterator = newTopicsMap.entrySet().iterator(); |
338 |
| - boolean remove = false; |
339 |
| - while (iterator.hasNext()) { |
340 |
| - Entry<String, NewTopic> nt = iterator.next(); |
341 |
| - // if we have a NewTopic and TopicForRetry with the same name, remove the latter |
342 |
| - if (nt.getValue().name().equals(entry.getValue().name()) |
343 |
| - && !(nt.getValue() instanceof TopicForRetryable)) { |
344 |
| - |
345 |
| - remove = true; |
346 |
| - break; |
347 |
| - } |
348 |
| - } |
349 |
| - if (remove) { |
350 |
| - newTopicsMap.remove(entry.getKey()); |
351 |
| - } |
352 |
| - } |
353 |
| - Iterator<Entry<String, NewTopic>> iterator = newTopicsMap.entrySet().iterator(); |
354 |
| - while (iterator.hasNext()) { |
355 |
| - Entry<String, NewTopic> next = iterator.next(); |
356 |
| - if (!this.createOrModifyTopic.test(next.getValue())) { |
357 |
| - iterator.remove(); |
358 |
| - } |
359 |
| - } |
360 |
| - return new ArrayList<>(newTopicsMap.values()); |
| 325 | + |
| 326 | + // Deal with List<NewTopic> directly instead of Map (no need for bean names) |
| 327 | + List<NewTopic> newTopicsList = new ArrayList<>( |
| 328 | + this.applicationContext.getBeansOfType(NewTopic.class, false, false).values()); |
| 329 | + |
| 330 | + // Add topics from NewTopics wrappers (no need for bean names either) |
| 331 | + this.applicationContext.getBeansOfType(NewTopics.class, false, false).values() |
| 332 | + .forEach(wrapper -> newTopicsList.addAll(wrapper.getNewTopics())); |
| 333 | + |
| 334 | + // Collect regular topic names to check against TopicForRetryable |
| 335 | + Set<String> regularTopicNames = newTopicsList.stream() |
| 336 | + .filter(nt -> !(nt instanceof TopicForRetryable)) |
| 337 | + .map(NewTopic::name) |
| 338 | + .collect(Collectors.toSet()); |
| 339 | + |
| 340 | + // Apply combined filter: remove TopicForRetryable with same name as regular topic OR topics that don't pass predicate |
| 341 | + newTopicsList.removeIf(nt -> |
| 342 | + (nt instanceof TopicForRetryable && regularTopicNames.contains(nt.name())) || |
| 343 | + !this.createOrModifyTopic.test(nt)); |
| 344 | + |
| 345 | + return newTopicsList; |
361 | 346 | }
|
362 | 347 |
|
363 | 348 | @Override
|
|
0 commit comments