-
Notifications
You must be signed in to change notification settings - Fork 5.5k
perf: Change shuffle read API to return a row batch instead if io buffer #26322
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Reviewer's GuideThis PR refactors the shuffle read API to return parsed row batches (ReadBatch) instead of raw IO buffers, introducing a ReadBatch type, updating ShuffleReader.next signatures and implementations in both LocalPersistentShuffleReader and CompactRowExchangeSource, renaming UnsafeRowExchangeSource to CompactRowExchangeSource (with a CompactRowBatch wrapper), and updating tests, ExchangeSource registration, and CMake build files accordingly. Class diagram for updated ShuffleReader and ReadBatchclassDiagram
class ShuffleReader {
<<interface>>
+next() : SemiFuture<ReadBatch>
+noMoreData(success: bool)
}
class ReadBatch {
+rows: vector<string_view>
+data: BufferPtr
+ReadBatch(rows, data)
}
ShuffleReader --> ReadBatch
Class diagram for CompactRowExchangeSource and CompactRowBatchclassDiagram
class CompactRowBatch {
+CompactRowBatch(rowBatch: ReadBatch)
+rows() : vector<string_view>
-rowBatch_: unique_ptr<ReadBatch>
}
class CompactRowExchangeSource {
+CompactRowExchangeSource(taskId, destination, queue, shuffleReader, pool)
+request(maxBytes, maxWait) : SemiFuture<Response>
+requestDataSizes(maxWait) : SemiFuture<Response>
+stats() : F14FastMap<string, int64_t>
+createExchangeSource(url, destination, queue, pool) : shared_ptr<ExchangeSource>
-shuffleReader_: shared_ptr<ShuffleReader>
}
CompactRowExchangeSource --> CompactRowBatch
Class diagram for LocalPersistentShuffleReader changesclassDiagram
class LocalPersistentShuffleReader {
+LocalPersistentShuffleReader(rootPath, queryId, partitionIds, pool)
+next() : SemiFuture<ReadBatch>
+noMoreData(success: bool)
-pool_: MemoryPool*
-readPartitionFileIndex_: size_t
-readPartitionFiles_: vector<string>
}
LocalPersistentShuffleReader --> ReadBatch
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes and they look great!
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location> `presto-native-execution/presto_cpp/main/operators/LocalPersistentShuffle.cpp:185-203` </location>
<code_context>
+ size_t offset = 0;
+ const size_t totalSize = buffer->size();
+
+ while (offset + sizeof(TRowSize) <= totalSize) {
+ // Read row size (stored in big endian).
+ TRowSize rowSize = folly::Endian::big(*(TRowSize*)(data + offset));
+ offset += sizeof(TRowSize);
+
+ if (offset + rowSize > totalSize) {
+ VELOX_FAIL(
+ "Invalid row data: row size {} exceeds remaining buffer size {}",
+ rowSize,
+ totalSize - offset);
+ }
+
+ // Create a Row with empty key and the row data as value.
+ rows.emplace_back(std::string_view{data + offset, rowSize});
+ offset += rowSize;
+ }
+
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Consider validating rowSize for zero or excessive values.
The code should also check for rowSize == 0 and enforce a reasonable maximum to prevent processing invalid or corrupted data.
```suggestion
size_t offset = 0;
const size_t totalSize = buffer->size();
constexpr size_t kMaxRowSize = 16 * 1024 * 1024; // 16MB max row size
while (offset + sizeof(TRowSize) <= totalSize) {
// Read row size (stored in big endian).
TRowSize rowSize = folly::Endian::big(*(TRowSize*)(data + offset));
offset += sizeof(TRowSize);
if (rowSize == 0) {
VELOX_FAIL("Invalid row data: row size is zero at offset {}", offset - sizeof(TRowSize));
}
if (rowSize > kMaxRowSize) {
VELOX_FAIL(
"Invalid row data: row size {} exceeds maximum allowed row size {}",
rowSize,
kMaxRowSize);
}
if (offset + rowSize > totalSize) {
VELOX_FAIL(
"Invalid row data: row size {} exceeds remaining buffer size {}",
rowSize,
totalSize - offset);
}
// Create a Row with empty key and the row data as value.
rows.emplace_back(std::string_view{data + offset, rowSize});
offset += rowSize;
}
```
</issue_to_address>
### Comment 2
<location> `presto-native-execution/presto_cpp/main/operators/LocalPersistentShuffle.cpp:190` </location>
<code_context>
+
+ while (offset + sizeof(TRowSize) <= totalSize) {
+ // Read row size (stored in big endian).
+ TRowSize rowSize = folly::Endian::big(*(TRowSize*)(data + offset));
+ offset += sizeof(TRowSize);
+
</code_context>
<issue_to_address>
**issue (bug_risk):** Direct pointer casting for rowSize extraction may be unsafe on platforms with strict alignment requirements.
Using memcpy to extract the row size avoids potential undefined behavior from misaligned pointer casts.
</issue_to_address>
### Comment 3
<location> `presto-native-execution/presto_cpp/main/operators/tests/UnsafeRowShuffleTest.cpp:306` </location>
<code_context>
}
} // namespace
-class UnsafeRowShuffleTest : public exec::test::OperatorTestBase {
+class CompactRowShuffleTest : public exec::test::OperatorTestBase {
public:
std::string testShuffleInfo(
</code_context>
<issue_to_address>
**suggestion (testing):** Test class and test names updated, but no new tests for row batch semantics.
Please add or update tests to cover the new row batch behavior, including edge cases like empty batches, single-row batches, and invalid row sizes.
Suggested implementation:
```cpp
class CompactRowShuffleTest : public exec::test::OperatorTestBase {
public:
std::string testShuffleInfo(
uint32_t numPartitions,
}
// Test: Empty batch should produce no output.
TEST_F(CompactRowShuffleTest, emptyBatch) {
auto emptyData = makeRowVector({
makeFlatVector<int32_t>({}),
makeFlatVector<int64_t>({})
});
TestShuffleWriter::reset();
// Assuming runShuffle is the method to execute shuffle
auto result = runShuffle(emptyData, /*numPartitions=*/4);
ASSERT_TRUE(result->size() == 0) << "Empty batch should produce no output";
}
// Test: Single-row batch should produce correct output.
TEST_F(CompactRowShuffleTest, singleRowBatch) {
auto singleData = makeRowVector({
makeFlatVector<int32_t>({42}),
makeFlatVector<int64_t>({420})
});
TestShuffleWriter::reset();
auto result = runShuffle(singleData, /*numPartitions=*/4);
ASSERT_EQ(result->size(), 1) << "Single-row batch should produce one output row";
ASSERT_EQ(result->childAt(0)->as<FlatVector<int32_t>>()->getValue(0), 42);
ASSERT_EQ(result->childAt(1)->as<FlatVector<int64_t>>()->getValue(0), 420);
}
// Test: Invalid row size (zero columns).
TEST_F(CompactRowShuffleTest, invalidRowSizeZeroColumns) {
auto invalidData = makeRowVector({});
TestShuffleWriter::reset();
EXPECT_THROW(runShuffle(invalidData, /*numPartitions=*/4), VeloxException);
}
// Test: Invalid row size (exceeding max allowed).
TEST_F(CompactRowShuffleTest, invalidRowSizeTooLarge) {
// Assuming max allowed columns is 100
std::vector<VectorPtr> columns;
for (int i = 0; i < 101; ++i) {
columns.push_back(makeFlatVector<int32_t>({i}));
}
auto invalidData = makeRowVector(columns);
TestShuffleWriter::reset();
EXPECT_THROW(runShuffle(invalidData, /*numPartitions=*/4), VeloxException);
}
};
```
- You may need to adjust the `runShuffle` function name and signature to match your actual shuffle execution method.
- If your test framework uses a different exception type than `VeloxException`, replace it accordingly.
- If your row vector creation utility differs, adapt the test data creation to your codebase.
- Ensure the test class is registered with your test runner (e.g., GoogleTest).
</issue_to_address>
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
size_t offset = 0; | ||
const size_t totalSize = buffer->size(); | ||
|
||
while (offset + sizeof(TRowSize) <= totalSize) { | ||
// Read row size (stored in big endian). | ||
TRowSize rowSize = folly::Endian::big(*(TRowSize*)(data + offset)); | ||
offset += sizeof(TRowSize); | ||
|
||
if (offset + rowSize > totalSize) { | ||
VELOX_FAIL( | ||
"Invalid row data: row size {} exceeds remaining buffer size {}", | ||
rowSize, | ||
totalSize - offset); | ||
} | ||
|
||
// Create a Row with empty key and the row data as value. | ||
rows.emplace_back(std::string_view{data + offset, rowSize}); | ||
offset += rowSize; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (bug_risk): Consider validating rowSize for zero or excessive values.
The code should also check for rowSize == 0 and enforce a reasonable maximum to prevent processing invalid or corrupted data.
size_t offset = 0; | |
const size_t totalSize = buffer->size(); | |
while (offset + sizeof(TRowSize) <= totalSize) { | |
// Read row size (stored in big endian). | |
TRowSize rowSize = folly::Endian::big(*(TRowSize*)(data + offset)); | |
offset += sizeof(TRowSize); | |
if (offset + rowSize > totalSize) { | |
VELOX_FAIL( | |
"Invalid row data: row size {} exceeds remaining buffer size {}", | |
rowSize, | |
totalSize - offset); | |
} | |
// Create a Row with empty key and the row data as value. | |
rows.emplace_back(std::string_view{data + offset, rowSize}); | |
offset += rowSize; | |
} | |
size_t offset = 0; | |
const size_t totalSize = buffer->size(); | |
constexpr size_t kMaxRowSize = 16 * 1024 * 1024; // 16MB max row size | |
while (offset + sizeof(TRowSize) <= totalSize) { | |
// Read row size (stored in big endian). | |
TRowSize rowSize = folly::Endian::big(*(TRowSize*)(data + offset)); | |
offset += sizeof(TRowSize); | |
if (rowSize == 0) { | |
VELOX_FAIL("Invalid row data: row size is zero at offset {}", offset - sizeof(TRowSize)); | |
} | |
if (rowSize > kMaxRowSize) { | |
VELOX_FAIL( | |
"Invalid row data: row size {} exceeds maximum allowed row size {}", | |
rowSize, | |
kMaxRowSize); | |
} | |
if (offset + rowSize > totalSize) { | |
VELOX_FAIL( | |
"Invalid row data: row size {} exceeds remaining buffer size {}", | |
rowSize, | |
totalSize - offset); | |
} | |
// Create a Row with empty key and the row data as value. | |
rows.emplace_back(std::string_view{data + offset, rowSize}); | |
offset += rowSize; | |
} |
|
||
while (offset + sizeof(TRowSize) <= totalSize) { | ||
// Read row size (stored in big endian). | ||
TRowSize rowSize = folly::Endian::big(*(TRowSize*)(data + offset)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (bug_risk): Direct pointer casting for rowSize extraction may be unsafe on platforms with strict alignment requirements.
Using memcpy to extract the row size avoids potential undefined behavior from misaligned pointer casts.
…if io buffer (prestodb#26322) Summary: Extend shuffle read API to return a row batch instead of a iobuf so that we can avoid redundant parsing Differential Revision: D84737440
c34745e
to
0c74da0
Compare
…if io buffer (prestodb#26322) Summary: Extend shuffle read API to return a row batch instead of a iobuf so that we can avoid redundant parsing Differential Revision: D84737440
0c74da0
to
95e4451
Compare
…if io buffer (prestodb#26322) Summary: Extend shuffle read API to return a row batch instead of a iobuf so that we can avoid redundant parsing Differential Revision: D84737440
95e4451
to
120bd7a
Compare
…if io buffer (prestodb#26322) Summary: Extend shuffle read API to return a row batch instead of a iobuf so that we can avoid redundant parsing Differential Revision: D84737440
120bd7a
to
b2b0eed
Compare
…if io buffer (prestodb#26322) Summary: Extend shuffle read API to return a row batch instead of a iobuf so that we can avoid redundant parsing Differential Revision: D84737440
b2b0eed
to
1825011
Compare
…if io buffer (prestodb#26322) Summary: Extend shuffle read API to return a row batch instead of a iobuf so that we can avoid redundant parsing Differential Revision: D84737440
1825011
to
7c2b7b3
Compare
…if io buffer (prestodb#26322) Summary: Extend shuffle read API to return a row batch instead of a iobuf so that we can avoid redundant parsing Differential Revision: D84737440
7c2b7b3
to
e4979df
Compare
…if io buffer (prestodb#26322) Summary: Extend shuffle read API to return a row batch instead of a iobuf so that we can avoid redundant parsing Differential Revision: D84737440
e4979df
to
6b31d83
Compare
Summary: Extend shuffle read API to return a row batch instead of a iobuf so that we can avoid redundant parsing