On JmsSource.java, line 81 we have the following:
source = Multi.createFrom().publisher(publisher)
.emitOn(context::runOnContext)
.<IncomingJmsMessage<?>> map(m -> new IncomingJmsMessage<>(m, executor, jsonMapping))
by default, emitOn( x ) with no second argument will, through a series of calls, cause 256 messages to be pulled from whatever is on the other side (in older versions of mutiny, it was 16).
This can be observed if one adds a log.info to the enqueue method:
private void enqueue(long n) {
log.info("enqueing " + n + " messages");
for (int i = 0; i < n; i++) {
executor.execute(() -> {
try {
Message message = null;
while (message == null && downstream.get() != null) {
log.info("pulling a message");
message = consumerHolder.getClient().receive();
...
}
} catch (JMSRuntimeException e) {
...
}
});
}
}
Then the logs will look like this:
2026-01-06 18:52:37,712 INFO [io.smallrye.reactive.messaging.jms] (Quarkus Main Thread) enqueing 256 messages
2026-01-06 18:52:37,713 INFO [io.smallrye.reactive.messaging.jms] (pool-22-thread-1) pulling a message
...
2026-01-06 18:52:38,003 INFO [io.smallrye.reactive.messaging.jms] (pool-22-thread-1) pulling a message
2026-01-06 18:52:38,007 INFO [io.smallrye.reactive.messaging.jms] (pool-22-thread-1) pulling a message
2026-01-06 18:52:38,009 INFO [io.smallrye.reactive.messaging.jms] (pool-22-thread-1) pulling a message
2026-01-06 18:52:39,719 INFO [org.acme.Ingestor] (vert.x-eventloop-thread-3) done msg 7
2026-01-06 18:52:39,723 INFO [org.acme.Ingestor] (vert.x-eventloop-thread-3) <<<< msg 8
2026-01-06 18:52:41,722 INFO [org.acme.Ingestor] (vert.x-eventloop-thread-3) done msg 8
2026-01-06 18:52:41,727 INFO [org.acme.Ingestor] (vert.x-eventloop-thread-3) <<<< msg 9
2026-01-06 18:52:43,728 INFO [org.acme.Ingestor] (vert.x-eventloop-thread-3) done msg 9
2026-01-06 18:52:43,731 INFO [org.acme.Ingestor] (vert.x-eventloop-thread-3) <<<< msg 10
2026-01-06 18:52:45,732 INFO [org.acme.Ingestor] (vert.x-eventloop-thread-3) done msg 10
2026-01-06 18:52:45,736 INFO [org.acme.Ingestor] (vert.x-eventloop-thread-3) <<<< msg 11
This creates 1) a dangerous situation and 2) a weird side effect.
-
Since it will pull 256 messages regardless of how fast they can be processed, and due to the way of how jms works, if any of them are .ack()ed, all of them are acknowledge()d as well, and therefore removed from the broker. If for any reason the application fails or restarts, the unprocessed messages which are held in the internal buffer will be lost, which some people may find undesirable.
-
In some situations, despite having pulled every single remaining message from a queue, it will wait until the buffer is full again in order to process the messages it's holding. I saw this by having a combination of (session mode CLIENT_ACKNOWLEDGE + @acknowledgment(Acknowledgment.Strategy.MANUAL) + no @Blocking annotation ).
In order to control the buffer size we can do something that was already done for the sqs connector ( ea548e1 )
First we add a
@ConnectorAttribute(name = "max-number-of-messages", type = "int", direction = INCOMING, description = "The maximum number of messages that can be retrieved at once", defaultValue = "16")
To JmsConnector.java
And we change that .emitOn to
.emitOn(context::runOnContext, config.getMaxNumberOfMessages())
Like this:
main...Alfrederson:smallrye-reactive-messaging:jms_emit_max_number_of_messages
Then we can set max-number-of-messages to 1, or 256, or whatever we want. With a buffer size of 1 we can control concurrency explicitly with a @Blocking annotation and a specific worker pool size.
On JmsSource.java, line 81 we have the following:
by default, emitOn( x ) with no second argument will, through a series of calls, cause 256 messages to be pulled from whatever is on the other side (in older versions of mutiny, it was 16).
This can be observed if one adds a log.info to the enqueue method:
Then the logs will look like this:
This creates 1) a dangerous situation and 2) a weird side effect.
Since it will pull 256 messages regardless of how fast they can be processed, and due to the way of how jms works, if any of them are .ack()ed, all of them are acknowledge()d as well, and therefore removed from the broker. If for any reason the application fails or restarts, the unprocessed messages which are held in the internal buffer will be lost, which some people may find undesirable.
In some situations, despite having pulled every single remaining message from a queue, it will wait until the buffer is full again in order to process the messages it's holding. I saw this by having a combination of (session mode CLIENT_ACKNOWLEDGE + @acknowledgment(Acknowledgment.Strategy.MANUAL) + no @Blocking annotation ).
In order to control the buffer size we can do something that was already done for the sqs connector ( ea548e1 )
First we add a
To JmsConnector.java
And we change that .emitOn to
Like this:
main...Alfrederson:smallrye-reactive-messaging:jms_emit_max_number_of_messages
Then we can set max-number-of-messages to 1, or 256, or whatever we want. With a buffer size of 1 we can control concurrency explicitly with a
@Blockingannotation and a specific worker pool size.