Skip to content

Commit bcc27b3

Browse files
author
Radek Kraus
committed
AMQ-9813 - fix wrong QueueSize for non-persistent message with TTL
- add "missing" invocation of discardExpiredMessage() method into tryAddMessageLast(), addMessageFirst(), probably caused in context of AMQ-5785 - use same "postponed" strategy (outside of synchronization) like was already done in original commit (see onUsageChanged() method)
1 parent 35ffb9b commit bcc27b3

File tree

2 files changed

+429
-8
lines changed

2 files changed

+429
-8
lines changed

activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,19 @@ public synchronized LinkedList<MessageReference> pageInList(int maxItems) {
206206
* @throws Exception
207207
*/
208208
@Override
209-
public synchronized boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception {
209+
public boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception {
210+
// Discarding expired message should be done outside of synchronized section (deadlock, see AMQ-5785)
211+
List<MessageReference> expiredMessages = new ArrayList<>();
212+
boolean isExpiration = tryAddMessageLastInternal(node, maxWaitTime, expiredMessages);
213+
for (MessageReference expiredMessage : expiredMessages) {
214+
discardExpiredMessage(expiredMessage);
215+
}
216+
return isExpiration;
217+
}
218+
219+
private synchronized boolean tryAddMessageLastInternal(
220+
MessageReference node, long maxWaitTime, List<MessageReference> expiredMessages
221+
) {
210222
if (!node.isExpired()) {
211223
try {
212224
regionDestination = (Destination) node.getMessage().getRegionDestination();
@@ -220,7 +232,7 @@ public synchronized boolean tryAddMessageLast(MessageReference node, long maxWai
220232
}
221233
if (!hasSpace()) {
222234
if (isDiskListEmpty()) {
223-
expireOldMessages();
235+
expiredMessages.addAll(expireOldMessages());
224236
if (hasSpace()) {
225237
memoryList.addMessageLast(node);
226238
node.incrementReferenceCount();
@@ -242,7 +254,7 @@ public synchronized boolean tryAddMessageLast(MessageReference node, long maxWai
242254
throw new RuntimeException(e);
243255
}
244256
} else {
245-
discardExpiredMessage(node);
257+
expiredMessages.add(node);
246258
}
247259
//message expired
248260
return true;
@@ -254,7 +266,16 @@ public synchronized boolean tryAddMessageLast(MessageReference node, long maxWai
254266
* @param node
255267
*/
256268
@Override
257-
public synchronized void addMessageFirst(MessageReference node) {
269+
public void addMessageFirst(MessageReference node) {
270+
// Discarding expired message should be done outside of synchronized section (deadlock, see AMQ-5785)
271+
List<MessageReference> expiredMessages = addMessageFirstInternal(node);
272+
for (MessageReference expiredMessage : expiredMessages) {
273+
discardExpiredMessage(expiredMessage);
274+
}
275+
}
276+
277+
private synchronized List<MessageReference> addMessageFirstInternal(MessageReference node) {
278+
List<MessageReference> expiredMessages = new ArrayList<>();
258279
if (!node.isExpired()) {
259280
try {
260281
regionDestination = (Destination) node.getMessage().getRegionDestination();
@@ -263,16 +284,16 @@ public synchronized void addMessageFirst(MessageReference node) {
263284
memoryList.addMessageFirst(node);
264285
node.incrementReferenceCount();
265286
setCacheEnabled(true);
266-
return;
287+
return expiredMessages;
267288
}
268289
}
269290
if (!hasSpace()) {
270291
if (isDiskListEmpty()) {
271-
expireOldMessages();
292+
expiredMessages = expireOldMessages();
272293
if (hasSpace()) {
273294
memoryList.addMessageFirst(node);
274295
node.incrementReferenceCount();
275-
return;
296+
return expiredMessages;
276297
} else {
277298
flushToDisk();
278299
}
@@ -289,8 +310,9 @@ public synchronized void addMessageFirst(MessageReference node) {
289310
throw new RuntimeException(e);
290311
}
291312
} else {
292-
discardExpiredMessage(node);
313+
expiredMessages.add(node);
293314
}
315+
return expiredMessages;
294316
}
295317

296318
/**

0 commit comments

Comments
 (0)