Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
#include "presto_cpp/main/operators/LocalPersistentShuffle.h"
#include "presto_cpp/main/operators/PartitionAndSerialize.h"
#include "presto_cpp/main/operators/ShuffleRead.h"
#include "presto_cpp/main/operators/UnsafeRowExchangeSource.h"
#include "presto_cpp/main/operators/CompactRowExchangeSource.h"
#include "presto_cpp/main/types/PrestoToVeloxQueryPlan.h"
#include "presto_cpp/main/types/VeloxPlanConversion.h"
#include "velox/common/base/Counters.h"
Expand Down Expand Up @@ -451,7 +451,7 @@ void PrestoServer::run() {
});

velox::exec::ExchangeSource::registerFactory(
operators::UnsafeRowExchangeSource::createExchangeSource);
operators::CompactRowExchangeSource::createExchangeSource);

// Batch broadcast exchange source.
velox::exec::ExchangeSource::registerFactory(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ add_library(
PartitionAndSerialize.cpp
ShuffleRead.cpp
ShuffleWrite.cpp
UnsafeRowExchangeSource.cpp
CompactRowExchangeSource.cpp
LocalPersistentShuffle.cpp
BroadcastWrite.cpp
BroadcastFactory.cpp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#include <folly/Uri.h>

#include "presto_cpp/main/common/Configs.h"
#include "presto_cpp/main/operators/UnsafeRowExchangeSource.h"
#include "presto_cpp/main/operators/CompactRowExchangeSource.h"
#include "velox/serializers/RowSerializer.h"

namespace facebook::presto::operators {
Expand All @@ -29,53 +29,48 @@ namespace facebook::presto::operators {
VELOX_FAIL("ShuffleReader::{} failed: {}", methodName, e.what()); \
}

folly::SemiFuture<UnsafeRowExchangeSource::Response>
UnsafeRowExchangeSource::request(
folly::SemiFuture<CompactRowExchangeSource::Response>
CompactRowExchangeSource::request(
uint32_t /*maxBytes*/,
std::chrono::microseconds /*maxWait*/) {
auto nextBatch = [this]() {
return std::move(shuffleReader_->next())
.deferValue([this](velox::BufferPtr buffer) {
.deferValue([this](std::unique_ptr<ReadBatch> batch) {
std::vector<velox::ContinuePromise> promises;
int64_t totalBytes{0};

{
std::lock_guard<std::mutex> l(queue_->mutex());
if (buffer == nullptr) {
if (batch == nullptr) {
atEnd_ = true;
queue_->enqueueLocked(nullptr, promises);
} else {
totalBytes = buffer->size();
totalBytes = batch->data->size();
VELOX_CHECK_LE(totalBytes, std::numeric_limits<int32_t>::max());
++numBatches_;
auto ioBuf = folly::IOBuf::wrapBuffer(
buffer->as<char>(), buffer->size());
queue_->enqueueLocked(
std::make_unique<velox::exec::SerializedPage>(
std::move(ioBuf),
[buffer](auto& /*unused*/) {}),
std::make_unique<CompactRowBatch>(
std::move(batch)),
promises);
}
}

for (auto& promise : promises) {
promise.setValue();
}

return folly::makeFuture(Response{totalBytes, atEnd_});
})
.deferError(
[](folly::exception_wrapper e) mutable
-> UnsafeRowExchangeSource::Response {
-> CompactRowExchangeSource::Response {
VELOX_FAIL("ShuffleReader::{} failed: {}", "next", e.what());
});
};

CALL_SHUFFLE(return nextBatch(), "next");
}

folly::SemiFuture<UnsafeRowExchangeSource::Response>
UnsafeRowExchangeSource::requestDataSizes(
folly::SemiFuture<CompactRowExchangeSource::Response>
CompactRowExchangeSource::requestDataSizes(
std::chrono::microseconds /*maxWait*/) {
std::vector<int64_t> remainingBytes;
if (!atEnd_) {
Expand All @@ -87,7 +82,7 @@ UnsafeRowExchangeSource::requestDataSizes(
return folly::makeSemiFuture(Response{0, atEnd_, std::move(remainingBytes)});
}

folly::F14FastMap<std::string, int64_t> UnsafeRowExchangeSource::stats() const {
folly::F14FastMap<std::string, int64_t> CompactRowExchangeSource::stats() const {
return shuffleReader_->stats();
}

Expand All @@ -106,29 +101,29 @@ std::optional<std::string> getSerializedShuffleInfo(folly::Uri& uri) {

// static
std::shared_ptr<velox::exec::ExchangeSource>
UnsafeRowExchangeSource::createExchangeSource(
CompactRowExchangeSource::createExchangeSource(
const std::string& url,
int32_t destination,
const std::shared_ptr<velox::exec::ExchangeQueue>& queue,
velox::memory::MemoryPool* FOLLY_NONNULL pool) {
velox::memory::MemoryPool* pool) {
if (::strncmp(url.c_str(), "batch://", 8) != 0) {
return nullptr;
}

auto uri = folly::Uri(url);
auto serializedShuffleInfo = getSerializedShuffleInfo(uri);
const auto serializedShuffleInfo = getSerializedShuffleInfo(uri);
// Not shuffle exchange source.
if (!serializedShuffleInfo.has_value()) {
return nullptr;
}

auto shuffleName = SystemConfig::instance()->shuffleName();
const auto shuffleName = SystemConfig::instance()->shuffleName();
VELOX_CHECK(
!shuffleName.empty(),
"shuffle.name is not provided in config.properties to create a shuffle "
"interface.");
auto shuffleFactory = ShuffleInterfaceFactory::factory(shuffleName);
return std::make_shared<UnsafeRowExchangeSource>(
return std::make_shared<CompactRowExchangeSource>(
uri.host(),
destination,
queue,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,39 @@
*/
#pragma once

#include "presto_cpp/main/operators/ShuffleInterface.h"
#include "presto_cpp/main/operators/ShuffleWrite.h"
#include "velox/core/PlanNode.h"
#include "velox/exec/Exchange.h"
#include "velox/exec/Operator.h"

namespace facebook::presto::operators {

class UnsafeRowExchangeSource : public velox::exec::ExchangeSource {
class CompactRowBatch : public velox::exec::SerializedPage {
public:
UnsafeRowExchangeSource(
explicit CompactRowBatch(
std::unique_ptr<ReadBatch> rowBatch)
: velox::exec::
SerializedPage{folly::IOBuf::wrapBuffer(
rowBatch->data->as<char>(), rowBatch->data->size()), nullptr, rowBatch->rows.size()},
rowBatch_{std::move(rowBatch)} {}

const std::vector<std::string_view>& rows() const {
return rowBatch_->rows;
}

private:
const std::unique_ptr<ReadBatch> rowBatch_;
};

class CompactRowExchangeSource : public velox::exec::ExchangeSource {
public:
CompactRowExchangeSource(
const std::string& taskId,
int destination,
const std::shared_ptr<velox::exec::ExchangeQueue>& queue,
const std::shared_ptr<ShuffleReader>& shuffleReader,
velox::memory::MemoryPool* FOLLY_NONNULL pool)
velox::memory::MemoryPool* pool)
: ExchangeSource(taskId, destination, queue, pool),
shuffleReader_(shuffleReader) {}

Expand All @@ -54,7 +72,7 @@ class UnsafeRowExchangeSource : public velox::exec::ExchangeSource {
const std::string& url,
int32_t destination,
const std::shared_ptr<velox::exec::ExchangeQueue>& queue,
velox::memory::MemoryPool* FOLLY_NONNULL pool);
velox::memory::MemoryPool* pool);

private:
const std::shared_ptr<ShuffleReader> shuffleReader_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ LocalPersistentShuffleWriter::LocalPersistentShuffleWriter(
uint32_t shuffleId,
uint32_t numPartitions,
uint64_t maxBytesPerPartition,
velox::memory::MemoryPool* FOLLY_NONNULL pool)
velox::memory::MemoryPool* pool)
: threadId_(std::this_thread::get_id()),
pool_(pool),
numPartitions_(numPartitions),
Expand Down Expand Up @@ -153,29 +153,51 @@ LocalPersistentShuffleReader::LocalPersistentShuffleReader(
const std::string& rootPath,
const std::string& queryId,
std::vector<std::string> partitionIds,
velox::memory::MemoryPool* FOLLY_NONNULL pool)
velox::memory::MemoryPool* pool)
: rootPath_(rootPath),
queryId_(queryId),
partitionIds_(std::move(partitionIds)),
pool_(pool) {
fileSystem_ = velox::filesystems::getFileSystem(rootPath_, nullptr);
}

folly::SemiFuture<BufferPtr> LocalPersistentShuffleReader::next() {
folly::SemiFuture<std::unique_ptr<ReadBatch>> LocalPersistentShuffleReader::next() {
using TRowSize = uint32_t;

if (readPartitionFiles_.empty()) {
readPartitionFiles_ = getReadPartitionFiles();
}

if (readPartitionFileIndex_ >= readPartitionFiles_.size()) {
return folly::makeSemiFuture<BufferPtr>(BufferPtr{});
return folly::makeSemiFuture(std::unique_ptr<ReadBatch>{});
}

const auto filename = readPartitionFiles_[readPartitionFileIndex_];
auto file = fileSystem_->openFileForRead(filename);
auto buffer = AlignedBuffer::allocate<char>(file->size(), pool_, 0);
file->pread(0, file->size(), buffer->asMutable<void>());
++readPartitionFileIndex_;
return folly::makeSemiFuture<BufferPtr>(std::move(buffer));

// Parse the buffer to extract individual rows.
// Each row is stored as: | row-size (4 bytes) | row-data (row-size bytes) |
std::vector<std::string_view> rows;
const char* data = buffer->as<char>();
size_t offset = 0;
const size_t totalSize = buffer->size();

while (offset + sizeof(TRowSize) <= totalSize) {
// Read row size (stored in big endian).
const TRowSize rowSize = folly::Endian::big(*(TRowSize*)(data + offset));
offset += sizeof(TRowSize);

VELOX_CHECK_LE(offset + rowSize, totalSize, "Invalid row data: row size");
// Create a Row with empty key and the row data as value.
rows.emplace_back(std::string_view{data + offset, rowSize});
offset += rowSize;
}

return folly::makeSemiFuture<std::unique_ptr<ReadBatch>>(
std::make_unique<ReadBatch>(std::move(rows), std::move(buffer)));
}

void LocalPersistentShuffleReader::noMoreData(bool success) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class LocalPersistentShuffleWriter : public ShuffleWriter {
uint32_t shuffleId,
uint32_t numPartitions,
uint64_t maxBytesPerPartition,
velox::memory::MemoryPool* FOLLY_NONNULL pool);
velox::memory::MemoryPool* pool);

void collect(
int32_t partition,
Expand Down Expand Up @@ -109,7 +109,7 @@ class LocalPersistentShuffleWriter : public ShuffleWriter {

// Used to make sure files created by this thread have unique names.
const std::thread::id threadId_;
velox::memory::MemoryPool* FOLLY_NONNULL pool_;
velox::memory::MemoryPool* pool_;
const uint32_t numPartitions_;
const uint64_t maxBytesPerPartition_;
// The top directory of the shuffle files and its file system.
Expand All @@ -129,9 +129,9 @@ class LocalPersistentShuffleReader : public ShuffleReader {
const std::string& rootPath,
const std::string& queryId,
std::vector<std::string> partitionIds,
velox::memory::MemoryPool* FOLLY_NONNULL pool);
velox::memory::MemoryPool* pool);

folly::SemiFuture<velox::BufferPtr> next() override;
folly::SemiFuture<std::unique_ptr<ReadBatch>> next() override;

void noMoreData(bool success) override;

Expand All @@ -147,7 +147,7 @@ class LocalPersistentShuffleReader : public ShuffleReader {
const std::string rootPath_;
const std::string queryId_;
const std::vector<std::string> partitionIds_;
velox::memory::MemoryPool* FOLLY_NONNULL pool_;
velox::memory::MemoryPool* pool_;

// Latest read block (file) index in 'readPartitionFiles_' for 'partition_'.
size_t readPartitionFileIndex_{0};
Expand All @@ -165,11 +165,11 @@ class LocalPersistentShuffleFactory : public ShuffleInterfaceFactory {
std::shared_ptr<ShuffleReader> createReader(
const std::string& serializedStr,
const int32_t partition,
velox::memory::MemoryPool* FOLLY_NONNULL pool) override;
velox::memory::MemoryPool* pool) override;

std::shared_ptr<ShuffleWriter> createWriter(
const std::string& serializedStr,
velox::memory::MemoryPool* FOLLY_NONNULL pool) override;
velox::memory::MemoryPool* pool) override;
};

} // namespace facebook::presto::operators
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,21 @@ class ShuffleWriter {
virtual folly::F14FastMap<std::string, int64_t> stats() const = 0;
};

struct ReadBatch {
std::vector<std::string_view> rows;
velox::BufferPtr data;

ReadBatch(std::vector<std::string_view>&& _rows, velox::BufferPtr&& _data)
: rows{std::move(_rows)}, data{std::move(_data)} {}
};

class ShuffleReader {
public:
virtual ~ShuffleReader() = default;

/// Reads the next block of data. The function returns null if it has read all
/// the data. The function throws if run into any error.
virtual folly::SemiFuture<velox::BufferPtr> next() = 0;
virtual folly::SemiFuture<std::unique_ptr<ReadBatch>> next() = 0;

/// Tell the shuffle system the reader is done. May be called with 'success'
/// true before reading all the data. This happens when a query has a LIMIT or
Expand All @@ -58,7 +66,7 @@ class ShuffleInterfaceFactory {

virtual std::shared_ptr<ShuffleReader> createReader(
const std::string& serializedShuffleInfo,
const int32_t partition,
int32_t partition,
velox::memory::MemoryPool* pool) = 0;

virtual std::shared_ptr<ShuffleWriter> createWriter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ target_link_libraries(presto_operators_plan_builder velox_core)

add_executable(
presto_operators_test
PlanNodeSerdeTest.cpp UnsafeRowShuffleTest.cpp BroadcastTest.cpp
PlanNodeSerdeTest.cpp CompactRowShuffleTest.cpp BroadcastTest.cpp
BinarySortableSerializerTest.cpp PlanNodeBuilderTest.cpp)

add_test(presto_operators_test presto_operators_test)
Expand Down
Loading
Loading