diff --git a/presto-native-execution/presto_cpp/main/PrestoServer.cpp b/presto-native-execution/presto_cpp/main/PrestoServer.cpp index d0d26d7b81b21..8b195c6c833e7 100644 --- a/presto-native-execution/presto_cpp/main/PrestoServer.cpp +++ b/presto-native-execution/presto_cpp/main/PrestoServer.cpp @@ -39,7 +39,7 @@ #include "presto_cpp/main/operators/LocalPersistentShuffle.h" #include "presto_cpp/main/operators/PartitionAndSerialize.h" #include "presto_cpp/main/operators/ShuffleRead.h" -#include "presto_cpp/main/operators/UnsafeRowExchangeSource.h" +#include "presto_cpp/main/operators/CompactRowExchangeSource.h" #include "presto_cpp/main/types/PrestoToVeloxQueryPlan.h" #include "presto_cpp/main/types/VeloxPlanConversion.h" #include "velox/common/base/Counters.h" @@ -451,7 +451,7 @@ void PrestoServer::run() { }); velox::exec::ExchangeSource::registerFactory( - operators::UnsafeRowExchangeSource::createExchangeSource); + operators::CompactRowExchangeSource::createExchangeSource); // Batch broadcast exchange source. velox::exec::ExchangeSource::registerFactory( diff --git a/presto-native-execution/presto_cpp/main/operators/CMakeLists.txt b/presto-native-execution/presto_cpp/main/operators/CMakeLists.txt index d42cdaae26558..e2862c86f38fb 100644 --- a/presto-native-execution/presto_cpp/main/operators/CMakeLists.txt +++ b/presto-native-execution/presto_cpp/main/operators/CMakeLists.txt @@ -16,7 +16,7 @@ add_library( PartitionAndSerialize.cpp ShuffleRead.cpp ShuffleWrite.cpp - UnsafeRowExchangeSource.cpp + CompactRowExchangeSource.cpp LocalPersistentShuffle.cpp BroadcastWrite.cpp BroadcastFactory.cpp diff --git a/presto-native-execution/presto_cpp/main/operators/UnsafeRowExchangeSource.cpp b/presto-native-execution/presto_cpp/main/operators/CompactRowExchangeSource.cpp similarity index 77% rename from presto-native-execution/presto_cpp/main/operators/UnsafeRowExchangeSource.cpp rename to presto-native-execution/presto_cpp/main/operators/CompactRowExchangeSource.cpp index 1f806ab919f1d..092a1631b186f 100644 --- a/presto-native-execution/presto_cpp/main/operators/UnsafeRowExchangeSource.cpp +++ b/presto-native-execution/presto_cpp/main/operators/CompactRowExchangeSource.cpp @@ -15,7 +15,7 @@ #include #include "presto_cpp/main/common/Configs.h" -#include "presto_cpp/main/operators/UnsafeRowExchangeSource.h" +#include "presto_cpp/main/operators/CompactRowExchangeSource.h" #include "velox/serializers/RowSerializer.h" namespace facebook::presto::operators { @@ -29,31 +29,27 @@ namespace facebook::presto::operators { VELOX_FAIL("ShuffleReader::{} failed: {}", methodName, e.what()); \ } -folly::SemiFuture -UnsafeRowExchangeSource::request( +folly::SemiFuture +CompactRowExchangeSource::request( uint32_t /*maxBytes*/, std::chrono::microseconds /*maxWait*/) { auto nextBatch = [this]() { return std::move(shuffleReader_->next()) - .deferValue([this](velox::BufferPtr buffer) { + .deferValue([this](std::unique_ptr batch) { std::vector promises; int64_t totalBytes{0}; - { std::lock_guard l(queue_->mutex()); - if (buffer == nullptr) { + if (batch == nullptr) { atEnd_ = true; queue_->enqueueLocked(nullptr, promises); } else { - totalBytes = buffer->size(); + totalBytes = batch->data->size(); VELOX_CHECK_LE(totalBytes, std::numeric_limits::max()); ++numBatches_; - auto ioBuf = folly::IOBuf::wrapBuffer( - buffer->as(), buffer->size()); queue_->enqueueLocked( - std::make_unique( - std::move(ioBuf), - [buffer](auto& /*unused*/) {}), + std::make_unique( + std::move(batch)), promises); } } @@ -61,12 +57,11 @@ UnsafeRowExchangeSource::request( for (auto& promise : promises) { promise.setValue(); } - return folly::makeFuture(Response{totalBytes, atEnd_}); }) .deferError( [](folly::exception_wrapper e) mutable - -> UnsafeRowExchangeSource::Response { + -> CompactRowExchangeSource::Response { VELOX_FAIL("ShuffleReader::{} failed: {}", "next", e.what()); }); }; @@ -74,8 +69,8 @@ UnsafeRowExchangeSource::request( CALL_SHUFFLE(return nextBatch(), "next"); } -folly::SemiFuture -UnsafeRowExchangeSource::requestDataSizes( +folly::SemiFuture +CompactRowExchangeSource::requestDataSizes( std::chrono::microseconds /*maxWait*/) { std::vector remainingBytes; if (!atEnd_) { @@ -87,7 +82,7 @@ UnsafeRowExchangeSource::requestDataSizes( return folly::makeSemiFuture(Response{0, atEnd_, std::move(remainingBytes)}); } -folly::F14FastMap UnsafeRowExchangeSource::stats() const { +folly::F14FastMap CompactRowExchangeSource::stats() const { return shuffleReader_->stats(); } @@ -106,29 +101,29 @@ std::optional getSerializedShuffleInfo(folly::Uri& uri) { // static std::shared_ptr -UnsafeRowExchangeSource::createExchangeSource( +CompactRowExchangeSource::createExchangeSource( const std::string& url, int32_t destination, const std::shared_ptr& queue, - velox::memory::MemoryPool* FOLLY_NONNULL pool) { + velox::memory::MemoryPool* pool) { if (::strncmp(url.c_str(), "batch://", 8) != 0) { return nullptr; } auto uri = folly::Uri(url); - auto serializedShuffleInfo = getSerializedShuffleInfo(uri); + const auto serializedShuffleInfo = getSerializedShuffleInfo(uri); // Not shuffle exchange source. if (!serializedShuffleInfo.has_value()) { return nullptr; } - auto shuffleName = SystemConfig::instance()->shuffleName(); + const auto shuffleName = SystemConfig::instance()->shuffleName(); VELOX_CHECK( !shuffleName.empty(), "shuffle.name is not provided in config.properties to create a shuffle " "interface."); auto shuffleFactory = ShuffleInterfaceFactory::factory(shuffleName); - return std::make_shared( + return std::make_shared( uri.host(), destination, queue, diff --git a/presto-native-execution/presto_cpp/main/operators/UnsafeRowExchangeSource.h b/presto-native-execution/presto_cpp/main/operators/CompactRowExchangeSource.h similarity index 72% rename from presto-native-execution/presto_cpp/main/operators/UnsafeRowExchangeSource.h rename to presto-native-execution/presto_cpp/main/operators/CompactRowExchangeSource.h index c9c4033a0f9e5..de55ea12cda58 100644 --- a/presto-native-execution/presto_cpp/main/operators/UnsafeRowExchangeSource.h +++ b/presto-native-execution/presto_cpp/main/operators/CompactRowExchangeSource.h @@ -13,6 +13,7 @@ */ #pragma once +#include "presto_cpp/main/operators/ShuffleInterface.h" #include "presto_cpp/main/operators/ShuffleWrite.h" #include "velox/core/PlanNode.h" #include "velox/exec/Exchange.h" @@ -20,14 +21,31 @@ namespace facebook::presto::operators { -class UnsafeRowExchangeSource : public velox::exec::ExchangeSource { +class CompactRowBatch : public velox::exec::SerializedPage { public: - UnsafeRowExchangeSource( + explicit CompactRowBatch( + std::unique_ptr rowBatch) + : velox::exec:: + SerializedPage{folly::IOBuf::wrapBuffer( + rowBatch->data->as(), rowBatch->data->size()), nullptr, rowBatch->rows.size()}, + rowBatch_{std::move(rowBatch)} {} + + const std::vector& rows() const { + return rowBatch_->rows; + } + + private: + const std::unique_ptr rowBatch_; +}; + +class CompactRowExchangeSource : public velox::exec::ExchangeSource { + public: + CompactRowExchangeSource( const std::string& taskId, int destination, const std::shared_ptr& queue, const std::shared_ptr& shuffleReader, - velox::memory::MemoryPool* FOLLY_NONNULL pool) + velox::memory::MemoryPool* pool) : ExchangeSource(taskId, destination, queue, pool), shuffleReader_(shuffleReader) {} @@ -54,7 +72,7 @@ class UnsafeRowExchangeSource : public velox::exec::ExchangeSource { const std::string& url, int32_t destination, const std::shared_ptr& queue, - velox::memory::MemoryPool* FOLLY_NONNULL pool); + velox::memory::MemoryPool* pool); private: const std::shared_ptr shuffleReader_; diff --git a/presto-native-execution/presto_cpp/main/operators/LocalPersistentShuffle.cpp b/presto-native-execution/presto_cpp/main/operators/LocalPersistentShuffle.cpp index f55cde51be86b..c5d6f5886be29 100644 --- a/presto-native-execution/presto_cpp/main/operators/LocalPersistentShuffle.cpp +++ b/presto-native-execution/presto_cpp/main/operators/LocalPersistentShuffle.cpp @@ -51,7 +51,7 @@ LocalPersistentShuffleWriter::LocalPersistentShuffleWriter( uint32_t shuffleId, uint32_t numPartitions, uint64_t maxBytesPerPartition, - velox::memory::MemoryPool* FOLLY_NONNULL pool) + velox::memory::MemoryPool* pool) : threadId_(std::this_thread::get_id()), pool_(pool), numPartitions_(numPartitions), @@ -153,7 +153,7 @@ LocalPersistentShuffleReader::LocalPersistentShuffleReader( const std::string& rootPath, const std::string& queryId, std::vector partitionIds, - velox::memory::MemoryPool* FOLLY_NONNULL pool) + velox::memory::MemoryPool* pool) : rootPath_(rootPath), queryId_(queryId), partitionIds_(std::move(partitionIds)), @@ -161,13 +161,15 @@ LocalPersistentShuffleReader::LocalPersistentShuffleReader( fileSystem_ = velox::filesystems::getFileSystem(rootPath_, nullptr); } -folly::SemiFuture LocalPersistentShuffleReader::next() { +folly::SemiFuture> LocalPersistentShuffleReader::next() { + using TRowSize = uint32_t; + if (readPartitionFiles_.empty()) { readPartitionFiles_ = getReadPartitionFiles(); } if (readPartitionFileIndex_ >= readPartitionFiles_.size()) { - return folly::makeSemiFuture(BufferPtr{}); + return folly::makeSemiFuture(std::unique_ptr{}); } const auto filename = readPartitionFiles_[readPartitionFileIndex_]; @@ -175,7 +177,27 @@ folly::SemiFuture LocalPersistentShuffleReader::next() { auto buffer = AlignedBuffer::allocate(file->size(), pool_, 0); file->pread(0, file->size(), buffer->asMutable()); ++readPartitionFileIndex_; - return folly::makeSemiFuture(std::move(buffer)); + + // Parse the buffer to extract individual rows. + // Each row is stored as: | row-size (4 bytes) | row-data (row-size bytes) | + std::vector rows; + const char* data = buffer->as(); + size_t offset = 0; + const size_t totalSize = buffer->size(); + + while (offset + sizeof(TRowSize) <= totalSize) { + // Read row size (stored in big endian). + const TRowSize rowSize = folly::Endian::big(*(TRowSize*)(data + offset)); + offset += sizeof(TRowSize); + + VELOX_CHECK_LE(offset + rowSize, totalSize, "Invalid row data: row size"); + // Create a Row with empty key and the row data as value. + rows.emplace_back(std::string_view{data + offset, rowSize}); + offset += rowSize; + } + + return folly::makeSemiFuture>( + std::make_unique(std::move(rows), std::move(buffer))); } void LocalPersistentShuffleReader::noMoreData(bool success) { diff --git a/presto-native-execution/presto_cpp/main/operators/LocalPersistentShuffle.h b/presto-native-execution/presto_cpp/main/operators/LocalPersistentShuffle.h index 678d5a2cb0002..1dc2492a73ad4 100644 --- a/presto-native-execution/presto_cpp/main/operators/LocalPersistentShuffle.h +++ b/presto-native-execution/presto_cpp/main/operators/LocalPersistentShuffle.h @@ -77,7 +77,7 @@ class LocalPersistentShuffleWriter : public ShuffleWriter { uint32_t shuffleId, uint32_t numPartitions, uint64_t maxBytesPerPartition, - velox::memory::MemoryPool* FOLLY_NONNULL pool); + velox::memory::MemoryPool* pool); void collect( int32_t partition, @@ -109,7 +109,7 @@ class LocalPersistentShuffleWriter : public ShuffleWriter { // Used to make sure files created by this thread have unique names. const std::thread::id threadId_; - velox::memory::MemoryPool* FOLLY_NONNULL pool_; + velox::memory::MemoryPool* pool_; const uint32_t numPartitions_; const uint64_t maxBytesPerPartition_; // The top directory of the shuffle files and its file system. @@ -129,9 +129,9 @@ class LocalPersistentShuffleReader : public ShuffleReader { const std::string& rootPath, const std::string& queryId, std::vector partitionIds, - velox::memory::MemoryPool* FOLLY_NONNULL pool); + velox::memory::MemoryPool* pool); - folly::SemiFuture next() override; + folly::SemiFuture> next() override; void noMoreData(bool success) override; @@ -147,7 +147,7 @@ class LocalPersistentShuffleReader : public ShuffleReader { const std::string rootPath_; const std::string queryId_; const std::vector partitionIds_; - velox::memory::MemoryPool* FOLLY_NONNULL pool_; + velox::memory::MemoryPool* pool_; // Latest read block (file) index in 'readPartitionFiles_' for 'partition_'. size_t readPartitionFileIndex_{0}; @@ -165,11 +165,11 @@ class LocalPersistentShuffleFactory : public ShuffleInterfaceFactory { std::shared_ptr createReader( const std::string& serializedStr, const int32_t partition, - velox::memory::MemoryPool* FOLLY_NONNULL pool) override; + velox::memory::MemoryPool* pool) override; std::shared_ptr createWriter( const std::string& serializedStr, - velox::memory::MemoryPool* FOLLY_NONNULL pool) override; + velox::memory::MemoryPool* pool) override; }; } // namespace facebook::presto::operators diff --git a/presto-native-execution/presto_cpp/main/operators/ShuffleInterface.h b/presto-native-execution/presto_cpp/main/operators/ShuffleInterface.h index 2cc28c0199b7f..79b4081b8f5ca 100644 --- a/presto-native-execution/presto_cpp/main/operators/ShuffleInterface.h +++ b/presto-native-execution/presto_cpp/main/operators/ShuffleInterface.h @@ -34,13 +34,21 @@ class ShuffleWriter { virtual folly::F14FastMap stats() const = 0; }; +struct ReadBatch { + std::vector rows; + velox::BufferPtr data; + + ReadBatch(std::vector&& _rows, velox::BufferPtr&& _data) + : rows{std::move(_rows)}, data{std::move(_data)} {} +}; + class ShuffleReader { public: virtual ~ShuffleReader() = default; /// Reads the next block of data. The function returns null if it has read all /// the data. The function throws if run into any error. - virtual folly::SemiFuture next() = 0; + virtual folly::SemiFuture> next() = 0; /// Tell the shuffle system the reader is done. May be called with 'success' /// true before reading all the data. This happens when a query has a LIMIT or @@ -58,7 +66,7 @@ class ShuffleInterfaceFactory { virtual std::shared_ptr createReader( const std::string& serializedShuffleInfo, - const int32_t partition, + int32_t partition, velox::memory::MemoryPool* pool) = 0; virtual std::shared_ptr createWriter( diff --git a/presto-native-execution/presto_cpp/main/operators/tests/CMakeLists.txt b/presto-native-execution/presto_cpp/main/operators/tests/CMakeLists.txt index f17600022bc3b..13095c885a03d 100644 --- a/presto-native-execution/presto_cpp/main/operators/tests/CMakeLists.txt +++ b/presto-native-execution/presto_cpp/main/operators/tests/CMakeLists.txt @@ -15,7 +15,7 @@ target_link_libraries(presto_operators_plan_builder velox_core) add_executable( presto_operators_test - PlanNodeSerdeTest.cpp UnsafeRowShuffleTest.cpp BroadcastTest.cpp + PlanNodeSerdeTest.cpp CompactRowShuffleTest.cpp BroadcastTest.cpp BinarySortableSerializerTest.cpp PlanNodeBuilderTest.cpp) add_test(presto_operators_test presto_operators_test) diff --git a/presto-native-execution/presto_cpp/main/operators/tests/UnsafeRowShuffleTest.cpp b/presto-native-execution/presto_cpp/main/operators/tests/CompactRowShuffleTest.cpp similarity index 91% rename from presto-native-execution/presto_cpp/main/operators/tests/UnsafeRowShuffleTest.cpp rename to presto-native-execution/presto_cpp/main/operators/tests/CompactRowShuffleTest.cpp index fc5e5157eade7..1e0ce9657e475 100644 --- a/presto-native-execution/presto_cpp/main/operators/tests/UnsafeRowShuffleTest.cpp +++ b/presto-native-execution/presto_cpp/main/operators/tests/CompactRowShuffleTest.cpp @@ -14,11 +14,11 @@ #include #include "folly/init/Init.h" #include "presto_cpp/external/json/nlohmann/json.hpp" +#include "presto_cpp/main/operators/CompactRowExchangeSource.h" #include "presto_cpp/main/operators/LocalPersistentShuffle.h" #include "presto_cpp/main/operators/PartitionAndSerialize.h" #include "presto_cpp/main/operators/ShuffleRead.h" #include "presto_cpp/main/operators/ShuffleWrite.h" -#include "presto_cpp/main/operators/UnsafeRowExchangeSource.h" #include "presto_cpp/main/operators/tests/PlanBuilder.h" #include "velox/common/base/tests/GTestUtils.h" #include "velox/common/testutil/TestValue.h" @@ -94,7 +94,8 @@ class TestShuffleWriter : public ShuffleWriter { maxKeyBytes_(maxKeyBytes), inProgressSizes_(numPartitions, 0), readyPartitions_( - std::make_shared>>()), + std::make_shared< + std::vector>>>()), serializedSortKeys_( std::make_shared>>()) { inProgressPartitions_.resize(numPartitions_); @@ -102,12 +103,6 @@ class TestShuffleWriter : public ShuffleWriter { serializedSortKeys_->resize(numPartitions_); } - void initialize(velox::memory::MemoryPool* pool) { - if (pool_ == nullptr) { - pool_ = pool; - } - } - void collect(int32_t partition, std::string_view key, std::string_view data) override { using TRowSize = uint32_t; @@ -115,32 +110,35 @@ class TestShuffleWriter : public ShuffleWriter { TestValue::adjust( "facebook::presto::operators::test::TestShuffleWriter::collect", this); - auto& buffer = inProgressPartitions_[partition]; + auto& readBatch = inProgressPartitions_[partition]; TRowSize rowSize = data.size(); auto size = sizeof(TRowSize) + rowSize; // Check if there is enough space in the buffer. - if (buffer && inProgressSizes_[partition] + size >= maxBytesPerPartition_) { - buffer->setSize(inProgressSizes_[partition]); - (*readyPartitions_)[partition].emplace_back(std::move(buffer)); + if (readBatch && + inProgressSizes_[partition] + size >= maxBytesPerPartition_) { + readBatch->data->setSize(inProgressSizes_[partition]); + (*readyPartitions_)[partition].emplace_back(std::move(readBatch)); inProgressPartitions_[partition].reset(); } // Allocate buffer if needed. - if (buffer == nullptr) { - buffer = AlignedBuffer::allocate(maxBytesPerPartition_, pool_); - assert(buffer != nullptr); - inProgressPartitions_[partition] = buffer; + if (readBatch == nullptr) { + auto buffer = AlignedBuffer::allocate(maxBytesPerPartition_, pool_); + VELOX_CHECK_NOT_NULL(buffer); + readBatch = std::make_unique( + std::vector{}, std::move(buffer)); + inProgressPartitions_[partition] = std::move(readBatch); inProgressSizes_[partition] = 0; } // Copy data. auto offset = inProgressSizes_[partition]; - auto rawBuffer = buffer->asMutable() + offset; + auto* rawBuffer = readBatch->data->asMutable() + offset; *(TRowSize*)(rawBuffer) = folly::Endian::big(rowSize); ::memcpy(rawBuffer + sizeof(TRowSize), data.data(), rowSize); - + readBatch->rows.push_back(data); inProgressSizes_[partition] += size; if (!key.empty()) { @@ -153,9 +151,9 @@ class TestShuffleWriter : public ShuffleWriter { // Flush in-progress buffers. for (auto i = 0; i < numPartitions_; ++i) { if (inProgressSizes_[i] > 0) { - auto& buffer = inProgressPartitions_[i]; - buffer->setSize(inProgressSizes_[i]); - (*readyPartitions_)[i].emplace_back(std::move(buffer)); + auto& readBatch = inProgressPartitions_[i]; + readBatch->data->setSize(inProgressSizes_[i]); + (*readyPartitions_)[i].emplace_back(std::move(readBatch)); inProgressPartitions_[i].reset(); } } @@ -167,7 +165,8 @@ class TestShuffleWriter : public ShuffleWriter { {exec::ExchangeClient::kBackgroundCpuTimeMs, kFakeBackgroundCpuTimeMs}}; } - std::shared_ptr>>& readyPartitions() { + std::shared_ptr>>>& + readyPartitions() { return readyPartitions_; } @@ -176,7 +175,9 @@ class TestShuffleWriter : public ShuffleWriter { } static void reset() { + LOG(ERROR) << "instance " << getInstance().use_count(); getInstance().reset(); + LOG(ERROR) << getInstance(); } /// Maintains a single shuffle write interface for testing purpose. @@ -187,9 +188,9 @@ class TestShuffleWriter : public ShuffleWriter { static std::shared_ptr createWriter( const std::string& serializedShuffleInfo, - velox::memory::MemoryPool* FOLLY_NONNULL pool) { + velox::memory::MemoryPool* pool) { std::shared_ptr& instance = getInstance(); - if (instance) { + if (instance != nullptr) { return instance; } TestShuffleInfo writeInfo = @@ -209,12 +210,13 @@ class TestShuffleWriter : public ShuffleWriter { /// Indexed by partition number. Each element represents currently being /// accumulated buffer by shuffler for a certain partition. Internal layout: /// | row-size | ..row-payload.. | row-size | ..row-payload.. | .. - std::vector inProgressPartitions_; + std::vector> inProgressPartitions_; /// Tracks the total size of each in-progress partition in /// inProgressPartitions_ std::vector inProgressSizes_; - std::shared_ptr>> readyPartitions_; + std::shared_ptr>>> + readyPartitions_; std::shared_ptr>> serializedSortKeys_; }; @@ -222,21 +224,22 @@ class TestShuffleReader : public ShuffleReader { public: TestShuffleReader( const int32_t partition, - const std::shared_ptr>>& + const std::shared_ptr< + std::vector>>>& readyPartitions) : partition_(partition), readyPartitions_(readyPartitions) {} - folly::SemiFuture next() override { + folly::SemiFuture> next() override { TestValue::adjust( "facebook::presto::operators::test::TestShuffleReader::next", this); if ((*readyPartitions_)[partition_].empty()) { - BufferPtr buffer = nullptr; - return folly::makeSemiFuture(std::move(buffer)); + return folly::makeSemiFuture(std::unique_ptr(nullptr)); } - auto buffer = (*readyPartitions_)[partition_].back(); + auto readBatch = std::move((*readyPartitions_)[partition_].back()); (*readyPartitions_)[partition_].pop_back(); - return folly::makeSemiFuture(std::move(buffer)); + return folly::makeSemiFuture>( + std::move(readBatch)); } void noMoreData(bool success) override { @@ -250,8 +253,9 @@ class TestShuffleReader : public ShuffleReader { } private: - int32_t partition_; - const std::shared_ptr>>& readyPartitions_; + const int32_t partition_; + const std::shared_ptr>>>& + readyPartitions_; }; class TestShuffleFactory : public ShuffleInterfaceFactory { @@ -261,14 +265,14 @@ class TestShuffleFactory : public ShuffleInterfaceFactory { std::shared_ptr createReader( const std::string& /* serializedShuffleInfo */, const int partition, - velox::memory::MemoryPool* FOLLY_NONNULL pool) override { + velox::memory::MemoryPool* pool) override { return std::make_shared( partition, TestShuffleWriter::getInstance()->readyPartitions()); } std::shared_ptr createWriter( const std::string& serializedShuffleInfo, - velox::memory::MemoryPool* FOLLY_NONNULL pool) override { + velox::memory::MemoryPool* pool) override { return TestShuffleWriter::createWriter(serializedShuffleInfo, pool); } }; @@ -280,13 +284,12 @@ void registerExchangeSource(const std::string& shuffleName) { const std::string& taskId, int destination, const std::shared_ptr& queue, - memory::MemoryPool* FOLLY_NONNULL pool) - -> std::shared_ptr { + memory::MemoryPool* pool) -> std::shared_ptr { if (strncmp(taskId.c_str(), "batch://", 8) == 0) { auto uri = folly::Uri(taskId); for (auto& pair : uri.getQueryParams()) { if (pair.first == "shuffleInfo") { - return std::make_shared( + return std::make_shared( taskId, destination, queue, @@ -303,7 +306,7 @@ void registerExchangeSource(const std::string& shuffleName) { } } // namespace -class UnsafeRowShuffleTest : public exec::test::OperatorTestBase { +class CompactRowShuffleTest : public exec::test::OperatorTestBase { public: std::string testShuffleInfo( uint32_t numPartitions, @@ -345,6 +348,8 @@ class UnsafeRowShuffleTest : public exec::test::OperatorTestBase { protected: void SetUp() override { exec::test::OperatorTestBase::SetUp(); + // Make sure all previously registered exchange factory are gone. + velox::exec::ExchangeSource::factories().clear(); ShuffleInterfaceFactory::registerFactory( std::string(TestShuffleFactory::kShuffleName), std::make_unique()); @@ -821,7 +826,7 @@ class UnsafeRowShuffleTest : public exec::test::OperatorTestBase { } }; -TEST_F(UnsafeRowShuffleTest, operators) { +TEST_F(CompactRowShuffleTest, operators) { auto data = makeRowVector({ makeFlatVector({1, 2, 3, 4}), makeFlatVector({10, 20, 30, 40}), @@ -844,7 +849,7 @@ TEST_F(UnsafeRowShuffleTest, operators) { TestShuffleWriter::reset(); } -DEBUG_ONLY_TEST_F(UnsafeRowShuffleTest, shuffleWriterExceptions) { +DEBUG_ONLY_TEST_F(CompactRowShuffleTest, shuffleWriterExceptions) { auto data = makeRowVector({ makeFlatVector({1, 2, 3, 4}), makeFlatVector({10, 20, 30, 40}), @@ -876,7 +881,7 @@ DEBUG_ONLY_TEST_F(UnsafeRowShuffleTest, shuffleWriterExceptions) { exec::test::waitForAllTasksToBeDeleted(); } -DEBUG_ONLY_TEST_F(UnsafeRowShuffleTest, shuffleReaderExceptions) { +DEBUG_ONLY_TEST_F(CompactRowShuffleTest, shuffleReaderExceptions) { auto data = makeRowVector({ makeFlatVector({1, 2, 3, 4}), makeFlatVector({10, 20, 30, 40}), @@ -922,7 +927,7 @@ DEBUG_ONLY_TEST_F(UnsafeRowShuffleTest, shuffleReaderExceptions) { exec::test::waitForAllTasksToBeDeleted(); } -TEST_F(UnsafeRowShuffleTest, endToEnd) { +TEST_F(CompactRowShuffleTest, endToEnd) { size_t numPartitions = 5; size_t numMapDrivers = 2; @@ -931,9 +936,7 @@ TEST_F(UnsafeRowShuffleTest, endToEnd) { makeFlatVector({10, 20, 30, 40, 50, 60}), }); - // Make sure all previously registered exchange factory are gone. - velox::exec::ExchangeSource::factories().clear(); - auto shuffleInfo = testShuffleInfo(numPartitions, 1 << 20); + const auto shuffleInfo = testShuffleInfo(numPartitions, 1 << 20); TestShuffleWriter::createWriter(shuffleInfo, pool()); registerExchangeSource(std::string(TestShuffleFactory::kShuffleName)); runShuffleTest( @@ -946,9 +949,10 @@ TEST_F(UnsafeRowShuffleTest, endToEnd) { {data}, kFakeBackgroundCpuTimeMs * Timestamp::kNanosecondsInMillisecond); TestShuffleWriter::reset(); + EXPECT_FALSE(true); } -TEST_F(UnsafeRowShuffleTest, endToEndWithSortedShuffle) { +TEST_F(CompactRowShuffleTest, endToEndWithSortedShuffle) { size_t numPartitions = 2; size_t numMapDrivers = 1; @@ -992,7 +996,7 @@ TEST_F(UnsafeRowShuffleTest, endToEndWithSortedShuffle) { TestShuffleWriter::reset(); } -TEST_F(UnsafeRowShuffleTest, endToEndWithSortedShuffleRowLimit) { +TEST_F(CompactRowShuffleTest, endToEndWithSortedShuffleRowLimit) { size_t numPartitions = 3; size_t numMapDrivers = 1; @@ -1050,7 +1054,7 @@ TEST_F(UnsafeRowShuffleTest, endToEndWithSortedShuffleRowLimit) { TestShuffleWriter::reset(); } -TEST_F(UnsafeRowShuffleTest, endToEndWithReplicateNullAndAny) { +TEST_F(CompactRowShuffleTest, endToEndWithReplicateNullAndAny) { size_t numPartitions = 9; size_t numMapDrivers = 2; @@ -1076,7 +1080,7 @@ TEST_F(UnsafeRowShuffleTest, endToEndWithReplicateNullAndAny) { TestShuffleWriter::reset(); } -TEST_F(UnsafeRowShuffleTest, replicateNullsAndAny) { +TEST_F(CompactRowShuffleTest, replicateNullsAndAny) { // No nulls. Expect to replicate first row. auto data = makeRowVector({ makeFlatVector({1, 2, 3, 4}), @@ -1106,7 +1110,7 @@ TEST_F(UnsafeRowShuffleTest, replicateNullsAndAny) { data, makeFlatVector({true, false, true, true, false})); } -TEST_F(UnsafeRowShuffleTest, persistentShuffleDeser) { +TEST_F(CompactRowShuffleTest, persistentShuffleDeser) { std::string serializedWriteInfo = "{\n" " \"rootPath\": \"abc\",\n" @@ -1168,7 +1172,7 @@ TEST_F(UnsafeRowShuffleTest, persistentShuffleDeser) { nlohmann::detail::type_error); } -TEST_F(UnsafeRowShuffleTest, persistentShuffle) { +TEST_F(CompactRowShuffleTest, persistentShuffle) { uint32_t numPartitions = 1; uint32_t numMapDrivers = 1; @@ -1201,43 +1205,43 @@ TEST_F(UnsafeRowShuffleTest, persistentShuffle) { cleanupDirectory(rootPath); } -TEST_F(UnsafeRowShuffleTest, persistentShuffleFuzz) { +TEST_F(CompactRowShuffleTest, persistentShuffleFuzz) { fuzzerTest(false, 1); fuzzerTest(false, 3); fuzzerTest(false, 7); } -TEST_F(UnsafeRowShuffleTest, persistentShuffleFuzzWithReplicateNullsAndAny) { +TEST_F(CompactRowShuffleTest, persistentShuffleFuzzWithReplicateNullsAndAny) { fuzzerTest(true, 1); fuzzerTest(true, 3); fuzzerTest(true, 7); } -TEST_F(UnsafeRowShuffleTest, partitionAndSerializeOutputByteLimit) { +TEST_F(CompactRowShuffleTest, partitionAndSerializeOutputByteLimit) { partitionAndSerializeWithThresholds(10'000, 1, 10, 10); } -TEST_F(UnsafeRowShuffleTest, partitionAndSerializeOutputRowLimit) { +TEST_F(CompactRowShuffleTest, partitionAndSerializeOutputRowLimit) { partitionAndSerializeWithThresholds(5, 1'000'000'000, 10, 2); } -TEST_F(UnsafeRowShuffleTest, partitionAndSerializeOutputRowLimitWithSort) { +TEST_F(CompactRowShuffleTest, partitionAndSerializeOutputRowLimitWithSort) { partitionAndSerializeWithThresholds(5, 1'000'000'000, 10, 2, true); } -TEST_F(UnsafeRowShuffleTest, partitionAndSerializeOutputByteLimitWithSort) { +TEST_F(CompactRowShuffleTest, partitionAndSerializeOutputByteLimitWithSort) { partitionAndSerializeWithThresholds(10'000, 100, 10, 10, true); } -TEST_F(UnsafeRowShuffleTest, partitionAndSerializeNoLimit) { +TEST_F(CompactRowShuffleTest, partitionAndSerializeNoLimit) { partitionAndSerializeWithThresholds(1'000, 1'000'000'000, 5, 1); } -TEST_F(UnsafeRowShuffleTest, partitionAndSerializeBothLimited) { +TEST_F(CompactRowShuffleTest, partitionAndSerializeBothLimited) { partitionAndSerializeWithThresholds(1, 1'000'000, 5, 5); } -TEST_F(UnsafeRowShuffleTest, partitionAndSerializeOperator) { +TEST_F(CompactRowShuffleTest, partitionAndSerializeOperator) { auto data = makeRowVector({ makeFlatVector(1'000, [](auto row) { return row; }), makeFlatVector(1'000, [](auto row) { return row * 10; }), @@ -1251,7 +1255,7 @@ TEST_F(UnsafeRowShuffleTest, partitionAndSerializeOperator) { testPartitionAndSerialize(plan, data); } -TEST_F(UnsafeRowShuffleTest, partitionAndSerializeWithLargeInput) { +TEST_F(CompactRowShuffleTest, partitionAndSerializeWithLargeInput) { auto data = makeRowVector( {makeFlatVector(20'000, [](auto row) { return row; })}); @@ -1263,7 +1267,7 @@ TEST_F(UnsafeRowShuffleTest, partitionAndSerializeWithLargeInput) { testPartitionAndSerialize(plan, data); } -TEST_F(UnsafeRowShuffleTest, partitionAndSerializeWithDifferentColumnOrder) { +TEST_F(CompactRowShuffleTest, partitionAndSerializeWithDifferentColumnOrder) { auto data = makeRowVector({ makeFlatVector(1'000, [](auto row) { return row; }), makeFlatVector(1'000, [](auto row) { return row * 10; }), @@ -1298,7 +1302,9 @@ TEST_F(UnsafeRowShuffleTest, partitionAndSerializeWithDifferentColumnOrder) { testPartitionAndSerialize(plan, expected); } -TEST_F(UnsafeRowShuffleTest, partitionAndSerializeOperatorWhenSinglePartition) { +TEST_F( + CompactRowShuffleTest, + partitionAndSerializeOperatorWhenSinglePartition) { auto data = makeRowVector({ makeFlatVector(1'000, [](auto row) { return row; }), makeFlatVector(1'000, [](auto row) { return row * 10; }), @@ -1312,7 +1318,7 @@ TEST_F(UnsafeRowShuffleTest, partitionAndSerializeOperatorWhenSinglePartition) { testPartitionAndSerialize(plan, data); } -TEST_F(UnsafeRowShuffleTest, shuffleWriterToString) { +TEST_F(CompactRowShuffleTest, shuffleWriterToString) { auto data = makeRowVector({ makeFlatVector(1'000, [](auto row) { return row; }), makeFlatVector(1'000, [](auto row) { return row * 10; }), @@ -1335,7 +1341,7 @@ TEST_F(UnsafeRowShuffleTest, shuffleWriterToString) { " -> partition:INTEGER, key:VARBINARY, data:VARBINARY\n"); } -TEST_F(UnsafeRowShuffleTest, partitionAndSerializeToString) { +TEST_F(CompactRowShuffleTest, partitionAndSerializeToString) { auto data = makeRowVector({ makeFlatVector(1'000, [](auto row) { return row; }), makeFlatVector(1'000, [](auto row) { return row * 10; }), @@ -1379,7 +1385,7 @@ class DummyShuffleInterfaceFactory : public ShuffleInterfaceFactory { } }; -TEST_F(UnsafeRowShuffleTest, shuffleInterfaceRegistration) { +TEST_F(CompactRowShuffleTest, shuffleInterfaceRegistration) { const std::string kShuffleName = "dummy-shuffle"; EXPECT_TRUE(ShuffleInterfaceFactory::registerFactory( kShuffleName, std::make_unique()));