From 396e65aa7d3de82093de35e4ecb381d871e964f6 Mon Sep 17 00:00:00 2001 From: Jean-Louis Monteiro Date: Fri, 28 Nov 2025 17:43:00 +0100 Subject: [PATCH] AMQ-??: add a guard to the record length read from corrupted file --- .../kahadb/disk/journal/DataFileAccessor.java | 7 +++ ...JournalCorruptionEofIndexRecoveryTest.java | 8 ++- .../DataFileAccessorCorruptionTest.java | 56 +++++++++++++++++++ 3 files changed, 68 insertions(+), 3 deletions(-) create mode 100644 activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessorCorruptionTest.java diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java index 80bf636499b..ab5492c6a52 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java @@ -34,6 +34,7 @@ final class DataFileAccessor { private final DataFile dataFile; private final Map inflightWrites; private final RecoverableRandomAccessFile file; + private final int maxAllowedRecordSize; private boolean disposed; /** @@ -45,6 +46,8 @@ public DataFileAccessor(Journal dataManager, DataFile dataFile) throws IOExcepti this.dataFile = dataFile; this.inflightWrites = dataManager.getInflightWrites(); this.file = dataFile.openRandomAccessFile(); + // Avoid allocating absurd buffers on corrupted records; use configured max file length as cap. + this.maxAllowedRecordSize = dataManager.getMaxFileLength(); } public DataFile getDataFile() { @@ -83,6 +86,10 @@ public ByteSequence readRecord(Location location) throws IOException { file.seek(location.getOffset() + Journal.RECORD_HEAD_SPACE); } validateFileLength(location); + if (location.getSize() <= Journal.RECORD_HEAD_SPACE + || location.getSize() > maxAllowedRecordSize) { + throw new IOException("Invalid location size: " + location + ", size: " + location.getSize()); + } byte[] data = new byte[location.getSize() - Journal.RECORD_HEAD_SPACE]; file.readFully(data); return new ByteSequence(data, 0, data.length); diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java index cebb182be76..613310c3f66 100644 --- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java @@ -251,8 +251,8 @@ public void append(LogEvent event) { && event.getMessage().getFormattedMessage() != null && event.getMessage().getFormattedMessage().contains("Cannot recover message audit") && event.getThrown() != null - && event.getThrown() instanceof EOFException - && event.getThrown().getMessage() == null) { + && (event.getThrown() instanceof EOFException + || event.getThrown() instanceof IOException)) { trappedExpectedLogMessage.set(true); } @@ -419,7 +419,9 @@ private void corruptBatchEndEof(int id) throws Exception{ int pos = batchPositions.get(batchPositions.size() - 3); LOG.info("corrupting checksum and size (to push it past eof) of batch record at:" + id + "-" + pos); randomAccessFile.seek(pos + Journal.BATCH_CONTROL_RECORD_HEADER.length); - randomAccessFile.writeInt(31 * 1024 * 1024); + // use a large but bounded bad size (just over max file length) to trigger the guard without MAX_INT + int badSize = Journal.DEFAULT_MAX_FILE_LENGTH + 1024; + randomAccessFile.writeInt(badSize); randomAccessFile.writeLong(0l); randomAccessFile.getChannel().force(true); } diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessorCorruptionTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessorCorruptionTest.java new file mode 100644 index 00000000000..3032bdb3f19 --- /dev/null +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessorCorruptionTest.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.kahadb.disk.journal; + +import static org.junit.Assert.assertThrows; + +import java.io.File; +import java.io.IOException; + +import org.apache.activemq.util.IOHelper; +import org.junit.Test; + +/** + * Verify corrupted sizes are rejected without allocating huge buffers. + */ +public class DataFileAccessorCorruptionTest { + + @Test + public void testRejectsOversizedRecord() throws Exception { + // minimal journal setup with a single empty data file + final File dir = new File(IOHelper.getDefaultDataDirectory(), "DataFileAccessorCorruptionTest"); + dir.mkdirs(); + final Journal journal = new Journal(); + journal.setDirectory(dir); + journal.setMaxFileLength(1024 * 1024); // 1MB max file length + journal.start(); + + try { + // fabricate a location claiming a size larger than maxFileLength + Location bogus = new Location(); + bogus.setOffset(0); + bogus.setSize(journal.getMaxFileLength() + 1024); + bogus.setDataFileId(0); + + DataFile dataFile = new DataFile(new File(dir, "db-0.log"), 0); + DataFileAccessor accessor = new DataFileAccessor(journal, dataFile); + assertThrows(IOException.class, () -> accessor.readRecord(bogus)); + } finally { + journal.close(); + } + } +}