Skip to content

Commit ec384d7

Browse files
committed
Merge thread and concurrency utils in one class
1 parent dc241e3 commit ec384d7

File tree

9 files changed

+78
-124
lines changed

9 files changed

+78
-124
lines changed

src/main/java/com/rabbitmq/stream/impl/Client.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,7 @@ public void initChannel(SocketChannel ch) {
357357
});
358358
this.dispatchingExecutorService =
359359
Executors.newSingleThreadExecutor(
360-
threadFactory("dispatching-" + clientConnectionName + "-"));
360+
threadFactory("rabbitmq-stream-dispatching-" + clientConnectionName + "-"));
361361
} else {
362362
this.closeDispatchingExecutorService =
363363
Utils.makeIdempotent(

src/main/java/com/rabbitmq/stream/impl/ConcurrencyUtils.java

Lines changed: 0 additions & 68 deletions
This file was deleted.

src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ final class DynamicBatch<T> implements AutoCloseable {
3535
private final int configuredBatchSize, minBatchSize, maxBatchSize;
3636
private final Thread thread;
3737

38-
DynamicBatch(BatchConsumer<T> consumer, int batchSize, int maxUnconfirmed) {
38+
DynamicBatch(BatchConsumer<T> consumer, int batchSize, int maxUnconfirmed, String id) {
3939
this.consumer = consumer;
4040
if (batchSize < maxUnconfirmed) {
4141
this.minBatchSize = min(MIN_BATCH_SIZE, batchSize / 2);
@@ -44,7 +44,7 @@ final class DynamicBatch<T> implements AutoCloseable {
4444
}
4545
this.configuredBatchSize = batchSize;
4646
this.maxBatchSize = batchSize * 2;
47-
this.thread = ConcurrencyUtils.defaultThreadFactory().newThread(this::loop);
47+
this.thread = ThreadUtils.newInternalThread(id, this::loop);
4848
this.thread.start();
4949
}
5050

src/main/java/com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ final class DynamicBatchMessageAccumulator implements MessageAccumulator {
4848
CompressionCodec compressionCodec,
4949
ByteBufAllocator byteBufAllocator,
5050
ObservationCollector<?> observationCollector,
51-
StreamProducer producer) {
51+
StreamProducer producer,
52+
long producerId) {
5253
this.helper =
5354
new ProducerUtils.MessageAccumulatorHelper(
5455
codec,
@@ -61,6 +62,7 @@ final class DynamicBatchMessageAccumulator implements MessageAccumulator {
6162
this.producer = producer;
6263
this.observationCollector = (ObservationCollector<Object>) observationCollector;
6364
boolean shouldObserve = !this.observationCollector.isNoop();
65+
String batchId = "rabbitmq-stream-dynamic-batch-producer-" + producerId;
6466
if (subEntrySize <= 1) {
6567
this.dynamicBatch =
6668
new DynamicBatch<>(
@@ -77,7 +79,8 @@ final class DynamicBatchMessageAccumulator implements MessageAccumulator {
7779
return result;
7880
},
7981
batchSize,
80-
maxUnconfirmedMessages);
82+
maxUnconfirmedMessages,
83+
batchId);
8184
} else {
8285
byte compressionCode =
8386
compressionCodec == null ? Compression.NONE.code() : compressionCodec.code();
@@ -127,7 +130,8 @@ final class DynamicBatchMessageAccumulator implements MessageAccumulator {
127130
return result;
128131
},
129132
batchSize * subEntrySize,
130-
maxUnconfirmedMessages);
133+
maxUnconfirmedMessages,
134+
batchId);
131135
}
132136
}
133137

src/main/java/com/rabbitmq/stream/impl/ProducerUtils.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ static MessageAccumulator createMessageAccumulator(
4040
Clock clock,
4141
String stream,
4242
ObservationCollector<?> observationCollector,
43-
StreamProducer producer) {
43+
StreamProducer producer,
44+
long producerId) {
4445
if (dynamicBatch) {
4546
return new DynamicBatchMessageAccumulator(
4647
subEntrySize,
@@ -55,7 +56,8 @@ static MessageAccumulator createMessageAccumulator(
5556
compressionCodec,
5657
byteBufAllocator,
5758
observationCollector,
58-
producer);
59+
producer,
60+
producerId);
5961
} else {
6062
if (subEntrySize <= 1) {
6163
return new SimpleMessageAccumulator(

src/main/java/com/rabbitmq/stream/impl/StreamProducer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,8 @@ public int fragmentLength(Object entity) {
190190
environment.clock(),
191191
stream,
192192
environment.observationCollector(),
193-
this);
193+
this,
194+
this.id);
194195

195196
boolean backgroundBatchPublishingTaskRequired =
196197
!dynamicBatch && batchPublishingDelay.toMillis() > 0;

src/main/java/com/rabbitmq/stream/impl/ThreadUtils.java

Lines changed: 49 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,9 @@
1717
import java.lang.reflect.InvocationTargetException;
1818
import java.lang.reflect.Method;
1919
import java.util.Arrays;
20-
import java.util.concurrent.ExecutorService;
2120
import java.util.concurrent.Executors;
2221
import java.util.concurrent.ThreadFactory;
2322
import java.util.concurrent.atomic.AtomicLong;
24-
import java.util.function.Function;
2523
import java.util.function.Predicate;
2624
import org.slf4j.Logger;
2725
import org.slf4j.LoggerFactory;
@@ -31,48 +29,30 @@ final class ThreadUtils {
3129
private static final Logger LOGGER = LoggerFactory.getLogger(ThreadUtils.class);
3230

3331
private static final ThreadFactory THREAD_FACTORY;
34-
private static final Function<String, ExecutorService> EXECUTOR_SERVICE_FACTORY;
3532
private static final Predicate<Thread> IS_VIRTUAL;
3633

3734
static {
35+
ThreadFactory tf;
3836
if (isJava21OrMore()) {
3937
LOGGER.debug("Running Java 21 or more, using virtual threads");
40-
Class<?> builderClass =
41-
Arrays.stream(Thread.class.getDeclaredClasses())
42-
.filter(c -> "Builder".equals(c.getSimpleName()))
43-
.findFirst()
44-
.get();
45-
// Reflection code is the same as:
46-
// Thread.ofVirtual().factory();
4738
try {
39+
Class<?> builderClass =
40+
Arrays.stream(Thread.class.getDeclaredClasses())
41+
.filter(c -> "Builder".equals(c.getSimpleName()))
42+
.findFirst()
43+
.get();
44+
// Reflection code is the same as:
45+
// Thread.ofVirtual().factory();
4846
Object builder = Thread.class.getDeclaredMethod("ofVirtual").invoke(null);
49-
THREAD_FACTORY = (ThreadFactory) builderClass.getDeclaredMethod("factory").invoke(builder);
50-
} catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
51-
throw new RuntimeException(e);
47+
tf = (ThreadFactory) builderClass.getDeclaredMethod("factory").invoke(builder);
48+
} catch (IllegalAccessException
49+
| InvocationTargetException
50+
| NoSuchMethodException
51+
| RuntimeException e) {
52+
LOGGER.debug("Error when creating virtual thread factory on Java 21+: {}", e.getMessage());
53+
LOGGER.debug("Falling back to default thread factory");
54+
tf = Executors.defaultThreadFactory();
5255
}
53-
EXECUTOR_SERVICE_FACTORY =
54-
prefix -> {
55-
try {
56-
// Reflection code is the same as the 2 following lines:
57-
// ThreadFactory factory = Thread.ofVirtual().name(prefix, 0).factory();
58-
// Executors.newThreadPerTaskExecutor(factory);
59-
Object builder = Thread.class.getDeclaredMethod("ofVirtual").invoke(null);
60-
if (prefix != null) {
61-
builder =
62-
builderClass
63-
.getDeclaredMethod("name", String.class, Long.TYPE)
64-
.invoke(builder, prefix, 0L);
65-
}
66-
ThreadFactory factory =
67-
(ThreadFactory) builderClass.getDeclaredMethod("factory").invoke(builder);
68-
return (ExecutorService)
69-
Executors.class
70-
.getDeclaredMethod("newThreadPerTaskExecutor", ThreadFactory.class)
71-
.invoke(null, factory);
72-
} catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
73-
throw new RuntimeException(e);
74-
}
75-
};
7656
IS_VIRTUAL =
7757
thread -> {
7858
Method method = null;
@@ -85,14 +65,21 @@ final class ThreadUtils {
8565
}
8666
};
8767
} else {
88-
THREAD_FACTORY = Executors.defaultThreadFactory();
89-
EXECUTOR_SERVICE_FACTORY = prefix -> Executors.newCachedThreadPool(threadFactory(prefix));
68+
tf = Executors.defaultThreadFactory();
9069
IS_VIRTUAL = ignored -> false;
9170
}
71+
THREAD_FACTORY = tf;
9272
}
9373

9474
private ThreadUtils() {}
9575

76+
/**
77+
* Create a thread factory that prefixes thread names. Based on {@link
78+
* java.util.concurrent.Executors#defaultThreadFactory()}, so always creates platform threads.
79+
*
80+
* @param prefix used to prefix thread names
81+
* @return thread factory
82+
*/
9683
static ThreadFactory threadFactory(String prefix) {
9784
if (prefix == null) {
9885
return Executors.defaultThreadFactory();
@@ -101,8 +88,31 @@ static ThreadFactory threadFactory(String prefix) {
10188
}
10289
}
10390

91+
/**
92+
* Returns a thread factory that creates virtual threads if available.
93+
*
94+
* @param prefix
95+
* @return
96+
*/
10497
static ThreadFactory internalThreadFactory(String prefix) {
105-
return new NamedThreadFactory(THREAD_FACTORY, prefix);
98+
if (prefix == null) {
99+
return THREAD_FACTORY;
100+
} else {
101+
return new NamedThreadFactory(THREAD_FACTORY, prefix);
102+
}
103+
}
104+
105+
/**
106+
* Creates a virtual thread if available.
107+
*
108+
* @param name
109+
* @param task
110+
* @return
111+
*/
112+
static Thread newInternalThread(String name, Runnable task) {
113+
Thread t = THREAD_FACTORY.newThread(task);
114+
t.setName(name);
115+
return t;
106116
}
107117

108118
static boolean isVirtual(Thread thread) {

src/test/java/com/rabbitmq/stream/impl/AsyncRetryTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import static com.rabbitmq.stream.BackOffDelayPolicy.fixedWithInitialDelay;
1818
import static com.rabbitmq.stream.impl.Assertions.assertThat;
1919
import static com.rabbitmq.stream.impl.TestUtils.sync;
20+
import static com.rabbitmq.stream.impl.ThreadUtils.internalThreadFactory;
2021
import static java.time.Duration.*;
2122
import static org.assertj.core.api.Assertions.assertThat;
2223
import static org.mockito.Mockito.*;
@@ -204,7 +205,6 @@ void completeExceptionally(ScheduledExecutorService scheduler) throws Exception
204205
static List<ScheduledExecutorService> schedulers() {
205206
return List.of(
206207
Executors.newSingleThreadScheduledExecutor(),
207-
Executors.newScheduledThreadPool(
208-
0, ThreadUtils.internalThreadFactory("async-retry-test-")));
208+
Executors.newScheduledThreadPool(0, internalThreadFactory("async-retry-test-")));
209209
}
210210
}

src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.concurrent.atomic.AtomicLong;
3131
import java.util.stream.IntStream;
3232
import org.junit.jupiter.api.Test;
33+
import org.junit.jupiter.api.TestInfo;
3334

3435
public class DynamicBatchTest {
3536

@@ -58,7 +59,7 @@ private static void printHistogram(Histogram histogram) {
5859
}
5960

6061
@Test
61-
void itemAreProcessed() {
62+
void itemAreProcessed(TestInfo info) {
6263
MetricRegistry metrics = new MetricRegistry();
6364
Histogram batchSizeMetrics = metrics.histogram("batch-size");
6465
int itemCount = 3000;
@@ -71,7 +72,7 @@ void itemAreProcessed() {
7172
sync.down(items.size());
7273
return true;
7374
};
74-
try (DynamicBatch<String> batch = new DynamicBatch<>(action, 100, 10_000)) {
75+
try (DynamicBatch<String> batch = new DynamicBatch<>(action, 100, 10_000, batchId(info))) {
7576
RateLimiter rateLimiter = RateLimiter.create(10000);
7677
IntStream.range(0, itemCount)
7778
.forEach(
@@ -85,7 +86,7 @@ void itemAreProcessed() {
8586
}
8687

8788
@Test
88-
void failedProcessingIsReplayed() throws Exception {
89+
void failedProcessingIsReplayed(TestInfo info) throws Exception {
8990
int itemCount = 10000;
9091
AtomicInteger collected = new AtomicInteger(0);
9192
AtomicInteger processed = new AtomicInteger(0);
@@ -102,7 +103,7 @@ void failedProcessingIsReplayed() throws Exception {
102103
}
103104
return result;
104105
};
105-
try (DynamicBatch<String> batch = new DynamicBatch<>(action, 100, 10_000)) {
106+
try (DynamicBatch<String> batch = new DynamicBatch<>(action, 100, 10_000, batchId(info))) {
106107
int firstRoundCount = itemCount / 5;
107108
IntStream.range(0, firstRoundCount)
108109
.forEach(
@@ -123,7 +124,7 @@ void failedProcessingIsReplayed() throws Exception {
123124
}
124125

125126
@Test
126-
void lowThrottlingValueShouldStillHighPublishingRate() throws Exception {
127+
void lowThrottlingValueShouldStillHighPublishingRate(TestInfo info) throws Exception {
127128
int batchSize = 10;
128129
Semaphore semaphore = new Semaphore(batchSize);
129130
DynamicBatch.BatchConsumer<Long> action =
@@ -132,7 +133,7 @@ void lowThrottlingValueShouldStillHighPublishingRate() throws Exception {
132133
return true;
133134
};
134135

135-
try (DynamicBatch<Long> batch = new DynamicBatch<>(action, batchSize, 10_000)) {
136+
try (DynamicBatch<Long> batch = new DynamicBatch<>(action, batchSize, 10_000, batchId(info))) {
136137
MetricRegistry metrics = new MetricRegistry();
137138
Meter rate = metrics.meter("publishing-rate");
138139
AtomicBoolean keepGoing = new AtomicBoolean(true);
@@ -154,4 +155,8 @@ void lowThrottlingValueShouldStillHighPublishingRate() throws Exception {
154155
System.nanoTime() - start > TimeUnit.SECONDS.toNanos(1) && rate.getMeanRate() > 1000);
155156
}
156157
}
158+
159+
private static String batchId(TestInfo info) {
160+
return info.getTestMethod().get().getName();
161+
}
157162
}

0 commit comments

Comments
 (0)