Skip to content

Commit 7c3d486

Browse files
authored
GH-48105: [C++][Parquet][IPC] Cap allocated memory when fuzzing (#48108)
### Rationale for this change OSS-Fuzz will trigger an out-of-memory crash if the allocated memory goes beyond a predefined limit (usually 2560 MB, though that can be configured). For Parquet and IPC, it is legitimate to allocate a lot of memory when decompressing data, though, so that can happen on both valid and invalid input files. Unfortunately, OSS-Fuzz checks for this memory limit not by instrumenting malloc and having it return NULL when the limit is reached, but by checking allocated memory periodically from a separate thread. This can be solved by implementing our custom allocator with an upper limit, exactly how the mupdf project did in google/oss-fuzz#1830 ### What changes are included in this PR? 1. Implement a `CappedMemoryPool` 2. Use the `CappedMemoryPool` with a hardcoded limit in the Parquet and IPC fuzz targets ### Are these changes tested? Yes, by additional unit tests. ### Are there any user-facing changes? No. * GitHub Issue: #48105 Authored-by: Antoine Pitrou <[email protected]> Signed-off-by: Antoine Pitrou <[email protected]>
1 parent de02131 commit 7c3d486

File tree

10 files changed

+258
-8
lines changed

10 files changed

+258
-8
lines changed

cpp/src/arrow/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -511,6 +511,7 @@ set(ARROW_UTIL_SRCS
511511
util/float16.cc
512512
util/formatting.cc
513513
util/future.cc
514+
util/fuzz_internal.cc
514515
util/hashing.cc
515516
util/int_util.cc
516517
util/io_util.cc

cpp/src/arrow/ipc/file_fuzz.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,11 @@
1919

2020
#include "arrow/ipc/reader.h"
2121
#include "arrow/status.h"
22+
#include "arrow/util/fuzz_internal.h"
2223
#include "arrow/util/macros.h"
2324

2425
extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
2526
auto status = arrow::ipc::internal::FuzzIpcFile(data, static_cast<int64_t>(size));
26-
ARROW_UNUSED(status);
27+
arrow::internal::LogFuzzStatus(status, data, static_cast<int64_t>(size));
2728
return 0;
2829
}

cpp/src/arrow/ipc/reader.cc

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
#include "arrow/util/checked_cast.h"
5454
#include "arrow/util/compression.h"
5555
#include "arrow/util/endian.h"
56+
#include "arrow/util/fuzz_internal.h"
5657
#include "arrow/util/key_value_metadata.h"
5758
#include "arrow/util/logging_internal.h"
5859
#include "arrow/util/parallel.h"
@@ -2618,14 +2619,21 @@ Status ValidateFuzzBatch(const RecordBatch& batch) {
26182619
return st;
26192620
}
26202621

2622+
IpcReadOptions FuzzingOptions() {
2623+
IpcReadOptions options;
2624+
options.memory_pool = ::arrow::internal::fuzzing_memory_pool();
2625+
return options;
2626+
}
2627+
26212628
} // namespace
26222629

26232630
Status FuzzIpcStream(const uint8_t* data, int64_t size) {
26242631
auto buffer = std::make_shared<Buffer>(data, size);
26252632
io::BufferReader buffer_reader(buffer);
26262633

26272634
std::shared_ptr<RecordBatchReader> batch_reader;
2628-
ARROW_ASSIGN_OR_RAISE(batch_reader, RecordBatchStreamReader::Open(&buffer_reader));
2635+
ARROW_ASSIGN_OR_RAISE(batch_reader,
2636+
RecordBatchStreamReader::Open(&buffer_reader, FuzzingOptions()));
26292637
Status st;
26302638

26312639
while (true) {
@@ -2645,7 +2653,8 @@ Status FuzzIpcFile(const uint8_t* data, int64_t size) {
26452653
io::BufferReader buffer_reader(buffer);
26462654

26472655
std::shared_ptr<RecordBatchFileReader> batch_reader;
2648-
ARROW_ASSIGN_OR_RAISE(batch_reader, RecordBatchFileReader::Open(&buffer_reader));
2656+
ARROW_ASSIGN_OR_RAISE(batch_reader,
2657+
RecordBatchFileReader::Open(&buffer_reader, FuzzingOptions()));
26492658
Status st;
26502659

26512660
const int n_batches = batch_reader->num_record_batches();

cpp/src/arrow/ipc/stream_fuzz.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,11 @@
1919

2020
#include "arrow/ipc/reader.h"
2121
#include "arrow/status.h"
22+
#include "arrow/util/fuzz_internal.h"
2223
#include "arrow/util/macros.h"
2324

2425
extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
2526
auto status = arrow::ipc::internal::FuzzIpcStream(data, static_cast<int64_t>(size));
26-
ARROW_UNUSED(status);
27+
arrow::internal::LogFuzzStatus(status, data, static_cast<int64_t>(size));
2728
return 0;
2829
}

cpp/src/arrow/memory_pool.h

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include "arrow/result.h"
2727
#include "arrow/status.h"
2828
#include "arrow/type_fwd.h"
29+
#include "arrow/util/macros.h"
2930
#include "arrow/util/visibility.h"
3031

3132
namespace arrow {
@@ -245,6 +246,73 @@ class ARROW_EXPORT ProxyMemoryPool : public MemoryPool {
245246
std::unique_ptr<ProxyMemoryPoolImpl> impl_;
246247
};
247248

249+
/// EXPERIMENTAL MemoryPool wrapper with an upper limit
250+
///
251+
/// Checking for limits is not done in a fully thread-safe way, therefore
252+
/// multi-threaded allocations might be able to go successfully above the
253+
/// configured limit.
254+
class ARROW_EXPORT CappedMemoryPool : public MemoryPool {
255+
public:
256+
CappedMemoryPool(MemoryPool* wrapped_pool, int64_t bytes_allocated_limit)
257+
: wrapped_(wrapped_pool), bytes_allocated_limit_(bytes_allocated_limit) {}
258+
259+
using MemoryPool::Allocate;
260+
using MemoryPool::Reallocate;
261+
262+
Status Allocate(int64_t size, int64_t alignment, uint8_t** out) override {
263+
// XXX Another thread may allocate memory between the limit check and
264+
// the `Allocate` call. It is possible for the two allocations to be successful
265+
// while going above the limit.
266+
// Solving this issue would require refactoring the `MemoryPool` implementation
267+
// to delegate the limit check to `MemoryPoolStats`.
268+
const auto attempted = size + wrapped_->bytes_allocated();
269+
if (ARROW_PREDICT_FALSE(attempted > bytes_allocated_limit_)) {
270+
return OutOfMemory(attempted);
271+
}
272+
return wrapped_->Allocate(size, alignment, out);
273+
}
274+
275+
Status Reallocate(int64_t old_size, int64_t new_size, int64_t alignment,
276+
uint8_t** ptr) override {
277+
const auto attempted = new_size - old_size + wrapped_->bytes_allocated();
278+
if (ARROW_PREDICT_FALSE(attempted > bytes_allocated_limit_)) {
279+
return OutOfMemory(attempted);
280+
}
281+
return wrapped_->Reallocate(old_size, new_size, alignment, ptr);
282+
}
283+
284+
void Free(uint8_t* buffer, int64_t size, int64_t alignment) override {
285+
return wrapped_->Free(buffer, size, alignment);
286+
}
287+
288+
void ReleaseUnused() override { wrapped_->ReleaseUnused(); }
289+
290+
void PrintStats() override { wrapped_->PrintStats(); }
291+
292+
int64_t bytes_allocated() const override { return wrapped_->bytes_allocated(); }
293+
294+
int64_t max_memory() const override { return wrapped_->max_memory(); }
295+
296+
int64_t total_bytes_allocated() const override {
297+
return wrapped_->total_bytes_allocated();
298+
}
299+
300+
int64_t num_allocations() const override { return wrapped_->num_allocations(); }
301+
302+
std::string backend_name() const override { return wrapped_->backend_name(); }
303+
304+
private:
305+
Status OutOfMemory(int64_t value) {
306+
return Status::OutOfMemory(
307+
"MemoryPool bytes_allocated cap exceeded: "
308+
"limit=",
309+
bytes_allocated_limit_, ", attempted=", value);
310+
}
311+
312+
MemoryPool* wrapped_;
313+
const int64_t bytes_allocated_limit_;
314+
};
315+
248316
/// \brief Return a process-wide memory pool based on the system allocator.
249317
ARROW_EXPORT MemoryPool* system_memory_pool();
250318

cpp/src/arrow/memory_pool_test.cc

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
#include <algorithm>
1919
#include <cstdint>
20+
#include <memory>
2021

2122
#include <gtest/gtest.h>
2223

@@ -290,4 +291,93 @@ TEST(Jemalloc, GetAllocationStats) {
290291
#endif
291292
}
292293

294+
class TestCappedMemoryPool : public ::arrow::TestMemoryPoolBase {
295+
public:
296+
MemoryPool* memory_pool() override { return InitPool(/*limit=*/1'000'000'000LL); }
297+
298+
MemoryPool* InitPool(int64_t limit) {
299+
proxy_memory_pool_ = std::make_shared<ProxyMemoryPool>(default_memory_pool());
300+
capped_memory_pool_ =
301+
std::make_shared<CappedMemoryPool>(proxy_memory_pool_.get(), limit);
302+
return capped_memory_pool_.get();
303+
}
304+
305+
protected:
306+
std::shared_ptr<MemoryPool> proxy_memory_pool_;
307+
std::shared_ptr<CappedMemoryPool> capped_memory_pool_;
308+
};
309+
310+
TEST_F(TestCappedMemoryPool, MemoryTracking) { this->TestMemoryTracking(); }
311+
312+
TEST_F(TestCappedMemoryPool, OOM) {
313+
// CappedMemoryPool rejects the huge allocation without hitting the underlying
314+
// allocator, so this should work even under Address Sanitizer.
315+
this->TestOOM();
316+
}
317+
318+
TEST_F(TestCappedMemoryPool, Reallocate) { this->TestReallocate(); }
319+
320+
TEST_F(TestCappedMemoryPool, Alignment) { this->TestAlignment(); }
321+
322+
TEST_F(TestCappedMemoryPool, AllocateLimit) {
323+
auto pool = InitPool(/*limit=*/1000);
324+
325+
uint8_t* data1;
326+
uint8_t* data2;
327+
ASSERT_OK(pool->Allocate(600, &data1));
328+
ASSERT_EQ(600, pool->bytes_allocated());
329+
ASSERT_EQ(600, pool->total_bytes_allocated());
330+
ASSERT_EQ(600, pool->max_memory());
331+
332+
ASSERT_OK(pool->Allocate(400, &data2));
333+
ASSERT_EQ(1000, pool->bytes_allocated());
334+
ASSERT_EQ(1000, pool->total_bytes_allocated());
335+
ASSERT_EQ(1000, pool->max_memory());
336+
pool->Free(data2, 400);
337+
ASSERT_EQ(600, pool->bytes_allocated());
338+
ASSERT_EQ(1000, pool->total_bytes_allocated());
339+
ASSERT_EQ(1000, pool->max_memory());
340+
341+
ASSERT_OK(pool->Allocate(300, &data2));
342+
ASSERT_EQ(900, pool->bytes_allocated());
343+
ASSERT_EQ(1300, pool->total_bytes_allocated());
344+
ASSERT_EQ(1000, pool->max_memory());
345+
pool->Free(data2, 300);
346+
ASSERT_EQ(600, pool->bytes_allocated());
347+
ASSERT_EQ(1300, pool->total_bytes_allocated());
348+
ASSERT_EQ(1000, pool->max_memory());
349+
350+
ASSERT_RAISES(OutOfMemory, pool->Allocate(401, &data2));
351+
ASSERT_EQ(600, pool->bytes_allocated());
352+
ASSERT_EQ(1300, pool->total_bytes_allocated());
353+
ASSERT_EQ(1000, pool->max_memory());
354+
355+
pool->Free(data1, 600);
356+
}
357+
358+
TEST_F(TestCappedMemoryPool, ReallocateLimit) {
359+
auto pool = InitPool(/*limit=*/1000);
360+
361+
uint8_t* data1;
362+
uint8_t* data2;
363+
ASSERT_OK(pool->Allocate(600, &data1));
364+
ASSERT_OK(pool->Allocate(400, &data2));
365+
ASSERT_EQ(1000, pool->bytes_allocated());
366+
ASSERT_EQ(1000, pool->total_bytes_allocated());
367+
ASSERT_EQ(1000, pool->max_memory());
368+
369+
ASSERT_OK(pool->Reallocate(400, 300, &data2));
370+
ASSERT_EQ(900, pool->bytes_allocated());
371+
ASSERT_EQ(1000, pool->total_bytes_allocated());
372+
ASSERT_EQ(1000, pool->max_memory());
373+
374+
ASSERT_RAISES(OutOfMemory, pool->Reallocate(300, 401, &data2));
375+
ASSERT_EQ(900, pool->bytes_allocated());
376+
ASSERT_EQ(1000, pool->total_bytes_allocated());
377+
ASSERT_EQ(1000, pool->max_memory());
378+
379+
pool->Free(data1, 600);
380+
pool->Free(data2, 300);
381+
}
382+
293383
} // namespace arrow
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include "arrow/util/fuzz_internal.h"
19+
20+
#include "arrow/memory_pool.h"
21+
#include "arrow/status.h"
22+
#include "arrow/util/logging_internal.h"
23+
24+
namespace arrow::internal {
25+
26+
MemoryPool* fuzzing_memory_pool() {
27+
static auto pool = std::make_shared<::arrow::CappedMemoryPool>(
28+
::arrow::default_memory_pool(), /*bytes_allocated_limit=*/kFuzzingMemoryLimit);
29+
return pool.get();
30+
}
31+
32+
void LogFuzzStatus(const Status& st, const uint8_t* data, int64_t size) {
33+
// Most fuzz inputs will be invalid and generate errors, only log potential OOMs
34+
if (st.IsOutOfMemory()) {
35+
ARROW_LOG(WARNING) << "Fuzzing input with size=" << size
36+
<< " hit allocation failure: " << st.ToString();
37+
}
38+
}
39+
40+
} // namespace arrow::internal

cpp/src/arrow/util/fuzz_internal.h

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#pragma once
19+
20+
#include <cstdint>
21+
22+
#include "arrow/type_fwd.h"
23+
#include "arrow/util/macros.h"
24+
25+
namespace arrow::internal {
26+
27+
// The default rss_limit_mb on OSS-Fuzz is 2560 MB and we want to fail allocations
28+
// before that limit is reached, otherwise the fuzz target gets killed (GH-48105).
29+
constexpr int64_t kFuzzingMemoryLimit = 2200LL * 1000 * 1000;
30+
31+
/// Return a memory pool that will not allocate more than kFuzzingMemoryLimit bytes.
32+
ARROW_EXPORT MemoryPool* fuzzing_memory_pool();
33+
34+
// Optionally log the outcome of fuzzing an input
35+
ARROW_EXPORT void LogFuzzStatus(const Status&, const uint8_t* data, int64_t size);
36+
37+
} // namespace arrow::internal

cpp/src/parquet/arrow/fuzz.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,11 @@
1616
// under the License.
1717

1818
#include "arrow/status.h"
19+
#include "arrow/util/fuzz_internal.h"
1920
#include "parquet/arrow/reader.h"
2021

2122
extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
2223
auto status = parquet::arrow::internal::FuzzReader(data, static_cast<int64_t>(size));
23-
ARROW_UNUSED(status);
24+
arrow::internal::LogFuzzStatus(status, data, static_cast<int64_t>(size));
2425
return 0;
2526
}

cpp/src/parquet/arrow/reader.cc

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,15 @@
2929
#include "arrow/buffer.h"
3030
#include "arrow/extension_type.h"
3131
#include "arrow/io/memory.h"
32+
#include "arrow/memory_pool.h"
3233
#include "arrow/record_batch.h"
3334
#include "arrow/table.h"
3435
#include "arrow/type.h"
3536
#include "arrow/type_traits.h"
3637
#include "arrow/util/async_generator.h"
3738
#include "arrow/util/bit_util.h"
3839
#include "arrow/util/future.h"
40+
#include "arrow/util/fuzz_internal.h"
3941
#include "arrow/util/iterator.h"
4042
#include "arrow/util/logging_internal.h"
4143
#include "arrow/util/parallel.h"
@@ -1403,7 +1405,7 @@ namespace internal {
14031405

14041406
namespace {
14051407

1406-
Status FuzzReader(std::unique_ptr<FileReader> reader) {
1408+
Status FuzzReadData(std::unique_ptr<FileReader> reader) {
14071409
auto st = Status::OK();
14081410
for (int i = 0; i < reader->num_row_groups(); ++i) {
14091411
std::shared_ptr<Table> table;
@@ -1490,7 +1492,7 @@ Status FuzzReader(const uint8_t* data, int64_t size) {
14901492

14911493
auto buffer = std::make_shared<::arrow::Buffer>(data, size);
14921494
auto file = std::make_shared<::arrow::io::BufferReader>(buffer);
1493-
auto pool = ::arrow::default_memory_pool();
1495+
auto pool = ::arrow::internal::fuzzing_memory_pool();
14941496
auto reader_properties = default_reader_properties();
14951497
std::default_random_engine rng(/*seed*/ 42);
14961498

@@ -1562,7 +1564,7 @@ Status FuzzReader(const uint8_t* data, int64_t size) {
15621564

15631565
std::unique_ptr<FileReader> reader;
15641566
RETURN_NOT_OK(FileReader::Make(pool, std::move(pq_file_reader), properties, &reader));
1565-
st &= FuzzReader(std::move(reader));
1567+
st &= FuzzReadData(std::move(reader));
15661568
}
15671569
return st;
15681570
}

0 commit comments

Comments
 (0)