Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "presto_cpp/main/connectors/PrestoToVeloxConnectorUtils.h"

#include "presto_cpp/presto_protocol/connector/iceberg/IcebergConnectorProtocol.h"
#include "velox/connectors/hive/iceberg/IcebergDataSink.h"
#include "velox/connectors/hive/iceberg/IcebergSplit.h"
#include "velox/type/fbhive/HiveTypeParser.h"

Expand Down Expand Up @@ -274,4 +275,74 @@ IcebergPrestoToVeloxConnector::createConnectorProtocol() const {
return std::make_unique<protocol::iceberg::IcebergConnectorProtocol>();
}

std::unique_ptr<velox::connector::ConnectorInsertTableHandle>
IcebergPrestoToVeloxConnector::toVeloxInsertTableHandle(
const protocol::CreateHandle* createHandle,
const TypeParser& typeParser) const {
auto icebergOutputTableHandle =
std::dynamic_pointer_cast<protocol::iceberg::IcebergOutputTableHandle>(
createHandle->handle.connectorHandle);

VELOX_CHECK_NOT_NULL(
icebergOutputTableHandle,
"Unexpected output table handle type {}",
createHandle->handle.connectorHandle->_type);

const auto inputColumns =
toHiveColumns(icebergOutputTableHandle->inputColumns, typeParser);

return std::make_unique<
velox::connector::hive::iceberg::IcebergInsertTableHandle>(
inputColumns,
std::make_shared<velox::connector::hive::LocationHandle>(
fmt::format("{}/data", icebergOutputTableHandle->outputPath),
fmt::format("{}/data", icebergOutputTableHandle->outputPath),
velox::connector::hive::LocationHandle::TableType::kNew),
toVeloxFileFormat(icebergOutputTableHandle->fileFormat),
std::optional(
toFileCompressionKind(icebergOutputTableHandle->compressionCodec)));
}

std::unique_ptr<velox::connector::ConnectorInsertTableHandle>
IcebergPrestoToVeloxConnector::toVeloxInsertTableHandle(
const protocol::InsertHandle* insertHandle,
const TypeParser& typeParser) const {
auto icebergInsertTableHandle =
std::dynamic_pointer_cast<protocol::iceberg::IcebergInsertTableHandle>(
insertHandle->handle.connectorHandle);

VELOX_CHECK_NOT_NULL(
icebergInsertTableHandle,
"Unexpected insert table handle type {}",
insertHandle->handle.connectorHandle->_type);

const auto inputColumns =
toHiveColumns(icebergInsertTableHandle->inputColumns, typeParser);

return std::make_unique<
velox::connector::hive::iceberg::IcebergInsertTableHandle>(
inputColumns,
std::make_shared<velox::connector::hive::LocationHandle>(
fmt::format("{}/data", icebergInsertTableHandle->outputPath),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fmt::format("{}/data", icebergInsertTableHandle->outputPath) seems repeated at multiple places in the code. Abstract an inline function for this and reuse.

fmt::format("{}/data", icebergInsertTableHandle->outputPath),
velox::connector::hive::LocationHandle::TableType::kExisting),
toVeloxFileFormat(icebergInsertTableHandle->fileFormat),
std::optional(
toFileCompressionKind(icebergInsertTableHandle->compressionCodec)));
}

std::vector<velox::connector::hive::HiveColumnHandlePtr>
IcebergPrestoToVeloxConnector::toHiveColumns(
const protocol::List<protocol::iceberg::IcebergColumnHandle>& inputColumns,
const TypeParser& typeParser) const {
std::vector<velox::connector::hive::HiveColumnHandlePtr> hiveColumns;
hiveColumns.reserve(inputColumns.size());
for (const auto& columnHandle : inputColumns) {
hiveColumns.emplace_back(
std::dynamic_pointer_cast<velox::connector::hive::HiveColumnHandle>(
std::shared_ptr(toVeloxColumnHandle(&columnHandle, typeParser))));
}
return hiveColumns;
}

} // namespace facebook::presto
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#pragma once

#include "presto_cpp/main/connectors/PrestoToVeloxConnector.h"
#include "presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.h"

namespace facebook::presto {

Expand All @@ -40,6 +41,22 @@ class IcebergPrestoToVeloxConnector final : public PrestoToVeloxConnector {

std::unique_ptr<protocol::ConnectorProtocol> createConnectorProtocol()
const final;

std::unique_ptr<velox::connector::ConnectorInsertTableHandle>
toVeloxInsertTableHandle(
const protocol::CreateHandle* createHandle,
const TypeParser& typeParser) const final;

std::unique_ptr<velox::connector::ConnectorInsertTableHandle>
toVeloxInsertTableHandle(
const protocol::InsertHandle* insertHandle,
const TypeParser& typeParser) const final;

private:
std::vector<velox::connector::hive::HiveColumnHandlePtr> toHiveColumns(
const protocol::List<protocol::iceberg::IcebergColumnHandle>&
inputColumns,
const TypeParser& typeParser) const;
};

} // namespace facebook::presto
Original file line number Diff line number Diff line change
Expand Up @@ -150,26 +150,6 @@ dwio::common::FileFormat toFileFormat(
}
}

velox::common::CompressionKind toFileCompressionKind(
const protocol::hive::HiveCompressionCodec& hiveCompressionCodec) {
switch (hiveCompressionCodec) {
case protocol::hive::HiveCompressionCodec::SNAPPY:
return velox::common::CompressionKind::CompressionKind_SNAPPY;
case protocol::hive::HiveCompressionCodec::GZIP:
return velox::common::CompressionKind::CompressionKind_GZIP;
case protocol::hive::HiveCompressionCodec::LZ4:
return velox::common::CompressionKind::CompressionKind_LZ4;
case protocol::hive::HiveCompressionCodec::ZSTD:
return velox::common::CompressionKind::CompressionKind_ZSTD;
case protocol::hive::HiveCompressionCodec::NONE:
return velox::common::CompressionKind::CompressionKind_NONE;
default:
VELOX_UNSUPPORTED(
"Unsupported file compression format: {}.",
toJsonString(hiveCompressionCodec));
}
}

velox::connector::hive::HiveBucketProperty::Kind toHiveBucketPropertyKind(
protocol::hive::BucketFunctionType bucketFuncType) {
switch (bucketFuncType) {
Expand Down Expand Up @@ -422,6 +402,26 @@ std::unique_ptr<connector::ConnectorTableHandle> toHiveTableHandle(
finalTableParameters);
}

velox::common::CompressionKind toFileCompressionKind(
const protocol::hive::HiveCompressionCodec& hiveCompressionCodec) {
switch (hiveCompressionCodec) {
case protocol::hive::HiveCompressionCodec::SNAPPY:
return velox::common::CompressionKind::CompressionKind_SNAPPY;
case protocol::hive::HiveCompressionCodec::GZIP:
return velox::common::CompressionKind::CompressionKind_GZIP;
case protocol::hive::HiveCompressionCodec::LZ4:
return velox::common::CompressionKind::CompressionKind_LZ4;
case protocol::hive::HiveCompressionCodec::ZSTD:
return velox::common::CompressionKind::CompressionKind_ZSTD;
case protocol::hive::HiveCompressionCodec::NONE:
return velox::common::CompressionKind::CompressionKind_NONE;
default:
VELOX_UNSUPPORTED(
"Unsupported file compression format: {}.",
toJsonString(hiveCompressionCodec));
}
}

std::unique_ptr<velox::connector::ConnectorSplit>
HivePrestoToVeloxConnector::toVeloxSplit(
const protocol::ConnectorId& catalogId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ std::unique_ptr<velox::connector::ConnectorTableHandle> toHiveTableHandle(
const VeloxExprConverter& exprConverter,
const TypeParser& typeParser);

velox::common::CompressionKind toFileCompressionKind(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you are separating a HiveToPrestoVeloxConnector file then this can be moved there as well.

const protocol::hive::HiveCompressionCodec& hiveCompressionCodec);

class PrestoToVeloxConnector {
public:
virtual ~PrestoToVeloxConnector() = default;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public enum QueryRunnerType
public static final String REMOTE_FUNCTION_CATALOG_NAME = "remote";
public static final String HIVE_DATA = "hive_data";

protected static final String ICEBERG_DEFAULT_STORAGE_FORMAT = "PARQUET";
public static final String ICEBERG_DEFAULT_STORAGE_FORMAT = "PARQUET";

private static final Logger log = Logger.get(PrestoNativeQueryRunnerUtils.class);
private static final String DEFAULT_STORAGE_FORMAT = "DWRF";
Expand Down
Loading
Loading