Skip to content

Commit 40112f8

Browse files
PingLiuPingLakehouse Engine Bot
authored andcommitted
Support insert data into iceberg table. Co-authored-by [email protected]
Alchemy-item: [feat(iceberg): Add support for writing iceberg tables](#13 (comment)) commit 1/1 - b406bf0
1 parent e00643b commit 40112f8

20 files changed

+1241
-91
lines changed

velox/connectors/hive/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ velox_add_library(
3434

3535
velox_link_libraries(
3636
velox_hive_connector
37-
PUBLIC velox_hive_iceberg_splitreader
37+
PUBLIC velox_hive_iceberg_connector
3838
PRIVATE velox_common_io velox_connector velox_dwio_catalog_fbhive
3939
velox_hive_partition_function)
4040

velox/connectors/hive/HiveConnector.cpp

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include "velox/connectors/hive/HiveDataSink.h"
2121
#include "velox/connectors/hive/HiveDataSource.h"
2222
#include "velox/connectors/hive/HivePartitionFunction.h"
23+
#include "velox/connectors/hive/iceberg/IcebergDataSink.h"
2324

2425
#include <boost/lexical_cast.hpp>
2526
#include <memory>
@@ -87,17 +88,29 @@ std::unique_ptr<DataSink> HiveConnector::createDataSink(
8788
ConnectorInsertTableHandlePtr connectorInsertTableHandle,
8889
ConnectorQueryCtx* connectorQueryCtx,
8990
CommitStrategy commitStrategy) {
90-
auto hiveInsertHandle =
91-
std::dynamic_pointer_cast<const HiveInsertTableHandle>(
92-
connectorInsertTableHandle);
93-
VELOX_CHECK_NOT_NULL(
94-
hiveInsertHandle, "Hive connector expecting hive write handle!");
95-
return std::make_unique<HiveDataSink>(
96-
inputType,
97-
hiveInsertHandle,
98-
connectorQueryCtx,
99-
commitStrategy,
100-
hiveConfig_);
91+
if (auto icebergInsertHandle =
92+
std::dynamic_pointer_cast<const iceberg::IcebergInsertTableHandle>(
93+
connectorInsertTableHandle)) {
94+
return std::make_unique<iceberg::IcebergDataSink>(
95+
inputType,
96+
icebergInsertHandle,
97+
connectorQueryCtx,
98+
commitStrategy,
99+
hiveConfig_);
100+
} else {
101+
auto hiveInsertHandle =
102+
std::dynamic_pointer_cast<const HiveInsertTableHandle>(
103+
connectorInsertTableHandle);
104+
105+
VELOX_CHECK_NOT_NULL(
106+
hiveInsertHandle, "Hive connector expecting hive write handle!");
107+
return std::make_unique<HiveDataSink>(
108+
inputType,
109+
hiveInsertHandle,
110+
connectorQueryCtx,
111+
commitStrategy,
112+
hiveConfig_);
113+
}
101114
}
102115

103116
std::unique_ptr<core::PartitionFunction> HivePartitionFunctionSpec::create(

velox/connectors/hive/HiveConnectorUtil.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@
2323
#include "velox/expression/Expr.h"
2424
#include "velox/expression/ExprToSubfieldFilter.h"
2525

26+
#include <boost/lexical_cast.hpp>
27+
#include <boost/uuid/uuid_generators.hpp>
28+
#include <boost/uuid/uuid_io.hpp>
29+
2630
namespace facebook::velox::connector::hive {
2731
namespace {
2832

@@ -925,4 +929,9 @@ core::TypedExprPtr extractFiltersFromRemainingFilter(
925929
}
926930
return expr;
927931
}
932+
933+
std::string makeUuid() {
934+
return boost::lexical_cast<std::string>(boost::uuids::random_generator()());
935+
}
936+
928937
} // namespace facebook::velox::connector::hive

velox/connectors/hive/HiveConnectorUtil.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,4 +114,6 @@ core::TypedExprPtr extractFiltersFromRemainingFilter(
114114
common::SubfieldFilters& filters,
115115
double& sampleRate);
116116

117+
std::string makeUuid();
118+
117119
} // namespace facebook::velox::connector::hive

velox/connectors/hive/HiveDataSink.cpp

Lines changed: 80 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,6 @@
3030
#include "velox/exec/OperatorUtils.h"
3131
#include "velox/exec/SortBuffer.h"
3232

33-
#include <boost/lexical_cast.hpp>
34-
#include <boost/uuid/uuid_generators.hpp>
35-
#include <boost/uuid/uuid_io.hpp>
36-
3733
using facebook::velox::common::testutil::TestValue;
3834

3935
namespace facebook::velox::connector::hive {
@@ -95,14 +91,12 @@ std::vector<column_index_t> getPartitionChannels(
9591

9692
// Returns the column indices of non-partition data columns.
9793
std::vector<column_index_t> getNonPartitionChannels(
98-
const std::vector<column_index_t>& partitionChannels,
99-
const column_index_t childrenSize) {
94+
const std::shared_ptr<const HiveInsertTableHandle>& insertTableHandle) {
10095
std::vector<column_index_t> dataChannels;
101-
dataChannels.reserve(childrenSize - partitionChannels.size());
10296

103-
for (column_index_t i = 0; i < childrenSize; i++) {
104-
if (std::find(partitionChannels.cbegin(), partitionChannels.cend(), i) ==
105-
partitionChannels.cend()) {
97+
for (column_index_t i = 0; i < insertTableHandle->inputColumns().size();
98+
i++) {
99+
if (!insertTableHandle->inputColumns()[i]->isPartitionKey()) {
106100
dataChannels.push_back(i);
107101
}
108102
}
@@ -119,10 +113,6 @@ std::string makePartitionDirectory(
119113
return tableDirectory;
120114
}
121115

122-
std::string makeUuid() {
123-
return boost::lexical_cast<std::string>(boost::uuids::random_generator()());
124-
}
125-
126116
std::unordered_map<LocationHandle::TableType, std::string> tableTypeNames() {
127117
return {
128118
{LocationHandle::TableType::kNew, "kNew"},
@@ -383,7 +373,8 @@ HiveDataSink::HiveDataSink(
383373
? createBucketFunction(
384374
*insertTableHandle->bucketProperty(),
385375
inputType)
386-
: nullptr) {}
376+
: nullptr,
377+
getNonPartitionChannels(insertTableHandle)) {}
387378

388379
HiveDataSink::HiveDataSink(
389380
RowTypePtr inputType,
@@ -392,7 +383,8 @@ HiveDataSink::HiveDataSink(
392383
CommitStrategy commitStrategy,
393384
const std::shared_ptr<const HiveConfig>& hiveConfig,
394385
uint32_t bucketCount,
395-
std::unique_ptr<core::PartitionFunction> bucketFunction)
386+
std::unique_ptr<core::PartitionFunction> bucketFunction,
387+
const std::vector<column_index_t>& dataChannels)
396388
: inputType_(std::move(inputType)),
397389
insertTableHandle_(std::move(insertTableHandle)),
398390
connectorQueryCtx_(connectorQueryCtx),
@@ -412,8 +404,7 @@ HiveDataSink::HiveDataSink(
412404
hiveConfig_->isPartitionPathAsLowerCase(
413405
connectorQueryCtx->sessionProperties()))
414406
: nullptr),
415-
dataChannels_(
416-
getNonPartitionChannels(partitionChannels_, inputType_->size())),
407+
dataChannels_(dataChannels),
417408
bucketCount_(static_cast<int32_t>(bucketCount)),
418409
bucketFunction_(std::move(bucketFunction)),
419410
writerFactory_(
@@ -489,6 +480,8 @@ void HiveDataSink::appendData(RowVectorPtr input) {
489480
input->childAt(i)->loadedVector();
490481
}
491482

483+
splitInputRowsAndEnsureWriters(input);
484+
492485
// All inputs belong to a single non-bucketed partition. The partition id
493486
// must be zero.
494487
if (!isBucketed() && partitionIdGenerator_->numPartitions() == 1) {
@@ -497,8 +490,6 @@ void HiveDataSink::appendData(RowVectorPtr input) {
497490
return;
498491
}
499492

500-
splitInputRowsAndEnsureWriters();
501-
502493
for (auto index = 0; index < writers_.size(); ++index) {
503494
const vector_size_t partitionSize = partitionSizes_[index];
504495
if (partitionSize == 0) {
@@ -670,30 +661,33 @@ bool HiveDataSink::finish() {
670661
std::vector<std::string> HiveDataSink::close() {
671662
setState(State::kClosed);
672663
closeInternal();
664+
return commitMessage();
665+
}
673666

667+
std::vector<std::string> HiveDataSink::commitMessage() const {
674668
std::vector<std::string> partitionUpdates;
675669
partitionUpdates.reserve(writerInfo_.size());
676670
for (int i = 0; i < writerInfo_.size(); ++i) {
677671
const auto& info = writerInfo_.at(i);
678672
VELOX_CHECK_NOT_NULL(info);
679673
// clang-format off
680-
auto partitionUpdateJson = folly::toJson(
681-
folly::dynamic::object
682-
("name", info->writerParameters.partitionName().value_or(""))
683-
("updateMode",
684-
HiveWriterParameters::updateModeToString(
685-
info->writerParameters.updateMode()))
686-
("writePath", info->writerParameters.writeDirectory())
687-
("targetPath", info->writerParameters.targetDirectory())
688-
("fileWriteInfos", folly::dynamic::array(
689-
folly::dynamic::object
690-
("writeFileName", info->writerParameters.writeFileName())
691-
("targetFileName", info->writerParameters.targetFileName())
692-
("fileSize", ioStats_.at(i)->rawBytesWritten())))
693-
("rowCount", info->numWrittenRows)
694-
("inMemoryDataSizeInBytes", info->inputSizeInBytes)
695-
("onDiskDataSizeInBytes", ioStats_.at(i)->rawBytesWritten())
696-
("containsNumberedFileNames", true));
674+
auto partitionUpdateJson = folly::toJson(
675+
folly::dynamic::object
676+
("name", info->writerParameters.partitionName().value_or(""))
677+
("updateMode",
678+
HiveWriterParameters::updateModeToString(
679+
info->writerParameters.updateMode()))
680+
("writePath", info->writerParameters.writeDirectory())
681+
("targetPath", info->writerParameters.targetDirectory())
682+
("fileWriteInfos", folly::dynamic::array(
683+
folly::dynamic::object
684+
("writeFileName", info->writerParameters.writeFileName())
685+
("targetFileName", info->writerParameters.targetFileName())
686+
("fileSize", ioStats_.at(i)->rawBytesWritten())))
687+
("rowCount", info->numWrittenRows)
688+
("inMemoryDataSizeInBytes", info->inputSizeInBytes)
689+
("onDiskDataSizeInBytes", ioStats_.at(i)->rawBytesWritten())
690+
("containsNumberedFileNames", true));
697691
// clang-format on
698692
partitionUpdates.push_back(partitionUpdateJson);
699693
}
@@ -740,11 +734,7 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) {
740734
VELOX_CHECK_EQ(writers_.size(), writerInfo_.size());
741735
VELOX_CHECK_EQ(writerIndexMap_.size(), writerInfo_.size());
742736

743-
std::optional<std::string> partitionName;
744-
if (isPartitioned()) {
745-
partitionName =
746-
partitionIdGenerator_->partitionName(id.partitionId.value());
747-
}
737+
std::optional<std::string> partitionName = getPartitionName(id);
748738

749739
// Without explicitly setting flush policy, the default memory based flush
750740
// policy is used.
@@ -831,15 +821,23 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) {
831821
options);
832822
writer = maybeCreateBucketSortWriter(std::move(writer));
833823
writers_.emplace_back(std::move(writer));
834-
// Extends the buffer used for partition rows calculations.
835-
partitionSizes_.emplace_back(0);
836-
partitionRows_.emplace_back(nullptr);
837-
rawPartitionRows_.emplace_back(nullptr);
824+
825+
extendBuffersForPartitionedTables();
838826

839827
writerIndexMap_.emplace(id, writers_.size() - 1);
840828
return writerIndexMap_[id];
841829
}
842830

831+
std::optional<std::string> HiveDataSink::getPartitionName(
832+
const HiveWriterId& id) const {
833+
std::optional<std::string> partitionName;
834+
if (isPartitioned()) {
835+
partitionName =
836+
partitionIdGenerator_->partitionName(id.partitionId.value());
837+
}
838+
return partitionName;
839+
}
840+
843841
std::unique_ptr<facebook::velox::dwio::common::Writer>
844842
HiveDataSink::maybeCreateBucketSortWriter(
845843
std::unique_ptr<facebook::velox::dwio::common::Writer> writer) {
@@ -867,6 +865,13 @@ HiveDataSink::maybeCreateBucketSortWriter(
867865
sortWriterFinishTimeSliceLimitMs_);
868866
}
869867

868+
void HiveDataSink::extendBuffersForPartitionedTables() {
869+
// Extends the buffer used for partition rows calculations.
870+
partitionSizes_.emplace_back(0);
871+
partitionRows_.emplace_back(nullptr);
872+
rawPartitionRows_.emplace_back(nullptr);
873+
}
874+
870875
HiveWriterId HiveDataSink::getWriterId(size_t row) const {
871876
std::optional<int32_t> partitionId;
872877
if (isPartitioned()) {
@@ -881,7 +886,25 @@ HiveWriterId HiveDataSink::getWriterId(size_t row) const {
881886
return HiveWriterId{partitionId, bucketId};
882887
}
883888

884-
void HiveDataSink::splitInputRowsAndEnsureWriters() {
889+
void HiveDataSink::updatePartitionRows(
890+
uint32_t index,
891+
vector_size_t numRows,
892+
vector_size_t row) {
893+
VELOX_DCHECK_LT(index, partitionSizes_.size());
894+
VELOX_DCHECK_EQ(partitionSizes_.size(), partitionRows_.size());
895+
VELOX_DCHECK_EQ(partitionRows_.size(), rawPartitionRows_.size());
896+
if (FOLLY_UNLIKELY(partitionRows_[index] == nullptr) ||
897+
(partitionRows_[index]->capacity() < numRows * sizeof(vector_size_t))) {
898+
partitionRows_[index] =
899+
allocateIndices(numRows, connectorQueryCtx_->memoryPool());
900+
rawPartitionRows_[index] =
901+
partitionRows_[index]->asMutable<vector_size_t>();
902+
}
903+
rawPartitionRows_[index][partitionSizes_[index]] = row;
904+
++partitionSizes_[index];
905+
}
906+
907+
void HiveDataSink::splitInputRowsAndEnsureWriters(RowVectorPtr /* input */) {
885908
VELOX_CHECK(isPartitioned() || isBucketed());
886909
if (isBucketed() && isPartitioned()) {
887910
VELOX_CHECK_EQ(bucketIds_.size(), partitionIds_.size());
@@ -895,18 +918,7 @@ void HiveDataSink::splitInputRowsAndEnsureWriters() {
895918
const auto id = getWriterId(row);
896919
const uint32_t index = ensureWriter(id);
897920

898-
VELOX_DCHECK_LT(index, partitionSizes_.size());
899-
VELOX_DCHECK_EQ(partitionSizes_.size(), partitionRows_.size());
900-
VELOX_DCHECK_EQ(partitionRows_.size(), rawPartitionRows_.size());
901-
if (FOLLY_UNLIKELY(partitionRows_[index] == nullptr) ||
902-
(partitionRows_[index]->capacity() < numRows * sizeof(vector_size_t))) {
903-
partitionRows_[index] =
904-
allocateIndices(numRows, connectorQueryCtx_->memoryPool());
905-
rawPartitionRows_[index] =
906-
partitionRows_[index]->asMutable<vector_size_t>();
907-
}
908-
rawPartitionRows_[index][partitionSizes_[index]] = row;
909-
++partitionSizes_[index];
921+
updatePartitionRows(index, numRows, row);
910922
}
911923

912924
for (uint32_t i = 0; i < partitionSizes_.size(); ++i) {
@@ -917,6 +929,15 @@ void HiveDataSink::splitInputRowsAndEnsureWriters() {
917929
}
918930
}
919931

932+
std::string HiveDataSink::makePartitionDirectory(
933+
const std::string& tableDirectory,
934+
const std::optional<std::string>& partitionSubdirectory) const {
935+
if (partitionSubdirectory.has_value()) {
936+
return fs::path(tableDirectory) / partitionSubdirectory.value();
937+
}
938+
return tableDirectory;
939+
}
940+
920941
HiveWriterParameters HiveDataSink::getWriterParameters(
921942
const std::optional<std::string>& partition,
922943
std::optional<uint32_t> bucketId) const {

0 commit comments

Comments
 (0)