30
30
#include " velox/exec/OperatorUtils.h"
31
31
#include " velox/exec/SortBuffer.h"
32
32
33
- #include < boost/lexical_cast.hpp>
34
- #include < boost/uuid/uuid_generators.hpp>
35
- #include < boost/uuid/uuid_io.hpp>
36
-
37
33
using facebook::velox::common::testutil::TestValue;
38
34
39
35
namespace facebook ::velox::connector::hive {
@@ -95,14 +91,12 @@ std::vector<column_index_t> getPartitionChannels(
95
91
96
92
// Returns the column indices of non-partition data columns.
97
93
std::vector<column_index_t > getNonPartitionChannels (
98
- const std::vector<column_index_t >& partitionChannels,
99
- const column_index_t childrenSize) {
94
+ const std::shared_ptr<const HiveInsertTableHandle>& insertTableHandle) {
100
95
std::vector<column_index_t > dataChannels;
101
- dataChannels.reserve (childrenSize - partitionChannels.size ());
102
96
103
- for (column_index_t i = 0 ; i < childrenSize; i++) {
104
- if ( std::find (partitionChannels. cbegin (), partitionChannels. cend (), i) ==
105
- partitionChannels. cend ()) {
97
+ for (column_index_t i = 0 ; i < insertTableHandle-> inputColumns (). size ();
98
+ i++) {
99
+ if (!insertTableHandle-> inputColumns ()[i]-> isPartitionKey ()) {
106
100
dataChannels.push_back (i);
107
101
}
108
102
}
@@ -119,10 +113,6 @@ std::string makePartitionDirectory(
119
113
return tableDirectory;
120
114
}
121
115
122
- std::string makeUuid () {
123
- return boost::lexical_cast<std::string>(boost::uuids::random_generator ()());
124
- }
125
-
126
116
std::unordered_map<LocationHandle::TableType, std::string> tableTypeNames () {
127
117
return {
128
118
{LocationHandle::TableType::kNew , " kNew" },
@@ -383,7 +373,8 @@ HiveDataSink::HiveDataSink(
383
373
? createBucketFunction(
384
374
*insertTableHandle->bucketProperty (),
385
375
inputType)
386
- : nullptr) {}
376
+ : nullptr,
377
+ getNonPartitionChannels(insertTableHandle)) {}
387
378
388
379
HiveDataSink::HiveDataSink (
389
380
RowTypePtr inputType,
@@ -392,7 +383,8 @@ HiveDataSink::HiveDataSink(
392
383
CommitStrategy commitStrategy,
393
384
const std::shared_ptr<const HiveConfig>& hiveConfig,
394
385
uint32_t bucketCount,
395
- std::unique_ptr<core::PartitionFunction> bucketFunction)
386
+ std::unique_ptr<core::PartitionFunction> bucketFunction,
387
+ const std::vector<column_index_t >& dataChannels)
396
388
: inputType_(std::move(inputType)),
397
389
insertTableHandle_(std::move(insertTableHandle)),
398
390
connectorQueryCtx_(connectorQueryCtx),
@@ -412,8 +404,7 @@ HiveDataSink::HiveDataSink(
412
404
hiveConfig_->isPartitionPathAsLowerCase(
413
405
connectorQueryCtx->sessionProperties ()))
414
406
: nullptr),
415
- dataChannels_(
416
- getNonPartitionChannels (partitionChannels_, inputType_->size ())),
407
+ dataChannels_(dataChannels),
417
408
bucketCount_(static_cast <int32_t >(bucketCount)),
418
409
bucketFunction_(std::move(bucketFunction)),
419
410
writerFactory_(
@@ -489,6 +480,8 @@ void HiveDataSink::appendData(RowVectorPtr input) {
489
480
input->childAt (i)->loadedVector ();
490
481
}
491
482
483
+ splitInputRowsAndEnsureWriters (input);
484
+
492
485
// All inputs belong to a single non-bucketed partition. The partition id
493
486
// must be zero.
494
487
if (!isBucketed () && partitionIdGenerator_->numPartitions () == 1 ) {
@@ -497,8 +490,6 @@ void HiveDataSink::appendData(RowVectorPtr input) {
497
490
return ;
498
491
}
499
492
500
- splitInputRowsAndEnsureWriters ();
501
-
502
493
for (auto index = 0 ; index < writers_.size (); ++index) {
503
494
const vector_size_t partitionSize = partitionSizes_[index];
504
495
if (partitionSize == 0 ) {
@@ -670,30 +661,33 @@ bool HiveDataSink::finish() {
670
661
std::vector<std::string> HiveDataSink::close () {
671
662
setState (State::kClosed );
672
663
closeInternal ();
664
+ return commitMessage ();
665
+ }
673
666
667
+ std::vector<std::string> HiveDataSink::commitMessage () const {
674
668
std::vector<std::string> partitionUpdates;
675
669
partitionUpdates.reserve (writerInfo_.size ());
676
670
for (int i = 0 ; i < writerInfo_.size (); ++i) {
677
671
const auto & info = writerInfo_.at (i);
678
672
VELOX_CHECK_NOT_NULL (info);
679
673
// clang-format off
680
- auto partitionUpdateJson = folly::toJson (
681
- folly::dynamic::object
682
- (" name" , info->writerParameters .partitionName ().value_or (" " ))
683
- (" updateMode" ,
684
- HiveWriterParameters::updateModeToString (
685
- info->writerParameters .updateMode ()))
686
- (" writePath" , info->writerParameters .writeDirectory ())
687
- (" targetPath" , info->writerParameters .targetDirectory ())
688
- (" fileWriteInfos" , folly::dynamic::array (
689
- folly::dynamic::object
690
- (" writeFileName" , info->writerParameters .writeFileName ())
691
- (" targetFileName" , info->writerParameters .targetFileName ())
692
- (" fileSize" , ioStats_.at (i)->rawBytesWritten ())))
693
- (" rowCount" , info->numWrittenRows )
694
- (" inMemoryDataSizeInBytes" , info->inputSizeInBytes )
695
- (" onDiskDataSizeInBytes" , ioStats_.at (i)->rawBytesWritten ())
696
- (" containsNumberedFileNames" , true ));
674
+ auto partitionUpdateJson = folly::toJson (
675
+ folly::dynamic::object
676
+ (" name" , info->writerParameters .partitionName ().value_or (" " ))
677
+ (" updateMode" ,
678
+ HiveWriterParameters::updateModeToString (
679
+ info->writerParameters .updateMode ()))
680
+ (" writePath" , info->writerParameters .writeDirectory ())
681
+ (" targetPath" , info->writerParameters .targetDirectory ())
682
+ (" fileWriteInfos" , folly::dynamic::array (
683
+ folly::dynamic::object
684
+ (" writeFileName" , info->writerParameters .writeFileName ())
685
+ (" targetFileName" , info->writerParameters .targetFileName ())
686
+ (" fileSize" , ioStats_.at (i)->rawBytesWritten ())))
687
+ (" rowCount" , info->numWrittenRows )
688
+ (" inMemoryDataSizeInBytes" , info->inputSizeInBytes )
689
+ (" onDiskDataSizeInBytes" , ioStats_.at (i)->rawBytesWritten ())
690
+ (" containsNumberedFileNames" , true ));
697
691
// clang-format on
698
692
partitionUpdates.push_back (partitionUpdateJson);
699
693
}
@@ -740,11 +734,7 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) {
740
734
VELOX_CHECK_EQ (writers_.size (), writerInfo_.size ());
741
735
VELOX_CHECK_EQ (writerIndexMap_.size (), writerInfo_.size ());
742
736
743
- std::optional<std::string> partitionName;
744
- if (isPartitioned ()) {
745
- partitionName =
746
- partitionIdGenerator_->partitionName (id.partitionId .value ());
747
- }
737
+ std::optional<std::string> partitionName = getPartitionName (id);
748
738
749
739
// Without explicitly setting flush policy, the default memory based flush
750
740
// policy is used.
@@ -831,15 +821,23 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) {
831
821
options);
832
822
writer = maybeCreateBucketSortWriter (std::move (writer));
833
823
writers_.emplace_back (std::move (writer));
834
- // Extends the buffer used for partition rows calculations.
835
- partitionSizes_.emplace_back (0 );
836
- partitionRows_.emplace_back (nullptr );
837
- rawPartitionRows_.emplace_back (nullptr );
824
+
825
+ extendBuffersForPartitionedTables ();
838
826
839
827
writerIndexMap_.emplace (id, writers_.size () - 1 );
840
828
return writerIndexMap_[id];
841
829
}
842
830
831
+ std::optional<std::string> HiveDataSink::getPartitionName (
832
+ const HiveWriterId& id) const {
833
+ std::optional<std::string> partitionName;
834
+ if (isPartitioned ()) {
835
+ partitionName =
836
+ partitionIdGenerator_->partitionName (id.partitionId .value ());
837
+ }
838
+ return partitionName;
839
+ }
840
+
843
841
std::unique_ptr<facebook::velox::dwio::common::Writer>
844
842
HiveDataSink::maybeCreateBucketSortWriter (
845
843
std::unique_ptr<facebook::velox::dwio::common::Writer> writer) {
@@ -867,6 +865,13 @@ HiveDataSink::maybeCreateBucketSortWriter(
867
865
sortWriterFinishTimeSliceLimitMs_);
868
866
}
869
867
868
+ void HiveDataSink::extendBuffersForPartitionedTables () {
869
+ // Extends the buffer used for partition rows calculations.
870
+ partitionSizes_.emplace_back (0 );
871
+ partitionRows_.emplace_back (nullptr );
872
+ rawPartitionRows_.emplace_back (nullptr );
873
+ }
874
+
870
875
HiveWriterId HiveDataSink::getWriterId (size_t row) const {
871
876
std::optional<int32_t > partitionId;
872
877
if (isPartitioned ()) {
@@ -881,7 +886,25 @@ HiveWriterId HiveDataSink::getWriterId(size_t row) const {
881
886
return HiveWriterId{partitionId, bucketId};
882
887
}
883
888
884
- void HiveDataSink::splitInputRowsAndEnsureWriters () {
889
+ void HiveDataSink::updatePartitionRows (
890
+ uint32_t index,
891
+ vector_size_t numRows,
892
+ vector_size_t row) {
893
+ VELOX_DCHECK_LT (index, partitionSizes_.size ());
894
+ VELOX_DCHECK_EQ (partitionSizes_.size (), partitionRows_.size ());
895
+ VELOX_DCHECK_EQ (partitionRows_.size (), rawPartitionRows_.size ());
896
+ if (FOLLY_UNLIKELY (partitionRows_[index] == nullptr ) ||
897
+ (partitionRows_[index]->capacity () < numRows * sizeof (vector_size_t ))) {
898
+ partitionRows_[index] =
899
+ allocateIndices (numRows, connectorQueryCtx_->memoryPool ());
900
+ rawPartitionRows_[index] =
901
+ partitionRows_[index]->asMutable <vector_size_t >();
902
+ }
903
+ rawPartitionRows_[index][partitionSizes_[index]] = row;
904
+ ++partitionSizes_[index];
905
+ }
906
+
907
+ void HiveDataSink::splitInputRowsAndEnsureWriters (RowVectorPtr /* input */ ) {
885
908
VELOX_CHECK (isPartitioned () || isBucketed ());
886
909
if (isBucketed () && isPartitioned ()) {
887
910
VELOX_CHECK_EQ (bucketIds_.size (), partitionIds_.size ());
@@ -895,18 +918,7 @@ void HiveDataSink::splitInputRowsAndEnsureWriters() {
895
918
const auto id = getWriterId (row);
896
919
const uint32_t index = ensureWriter (id);
897
920
898
- VELOX_DCHECK_LT (index, partitionSizes_.size ());
899
- VELOX_DCHECK_EQ (partitionSizes_.size (), partitionRows_.size ());
900
- VELOX_DCHECK_EQ (partitionRows_.size (), rawPartitionRows_.size ());
901
- if (FOLLY_UNLIKELY (partitionRows_[index] == nullptr ) ||
902
- (partitionRows_[index]->capacity () < numRows * sizeof (vector_size_t ))) {
903
- partitionRows_[index] =
904
- allocateIndices (numRows, connectorQueryCtx_->memoryPool ());
905
- rawPartitionRows_[index] =
906
- partitionRows_[index]->asMutable <vector_size_t >();
907
- }
908
- rawPartitionRows_[index][partitionSizes_[index]] = row;
909
- ++partitionSizes_[index];
921
+ updatePartitionRows (index, numRows, row);
910
922
}
911
923
912
924
for (uint32_t i = 0 ; i < partitionSizes_.size (); ++i) {
@@ -917,6 +929,15 @@ void HiveDataSink::splitInputRowsAndEnsureWriters() {
917
929
}
918
930
}
919
931
932
+ std::string HiveDataSink::makePartitionDirectory (
933
+ const std::string& tableDirectory,
934
+ const std::optional<std::string>& partitionSubdirectory) const {
935
+ if (partitionSubdirectory.has_value ()) {
936
+ return fs::path (tableDirectory) / partitionSubdirectory.value ();
937
+ }
938
+ return tableDirectory;
939
+ }
940
+
920
941
HiveWriterParameters HiveDataSink::getWriterParameters (
921
942
const std::optional<std::string>& partition,
922
943
std::optional<uint32_t > bucketId) const {
0 commit comments