Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions axiom/connectors/ConnectorMetadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ class Column {
return latestStats_;
}

ColumnStatistics* mutableStats() {
ColumnStatistics* mutableStats() const {
std::lock_guard<std::mutex> l(mutex_);
if (!latestStats_) {
allStats_.push_back(std::make_unique<ColumnStatistics>());
Expand All @@ -145,7 +145,7 @@ class Column {
}

/// Sets statistics. May be called multiple times if table contents change.
void setStats(std::unique_ptr<ColumnStatistics> stats) {
void setStats(std::unique_ptr<ColumnStatistics> stats) const {
std::lock_guard<std::mutex> l(mutex_);
allStats_.push_back(std::move(stats));
latestStats_ = allStats_.back().get();
Expand Down Expand Up @@ -183,16 +183,20 @@ class Column {
const bool hidden_;
const velox::Variant defaultValue_;

// The latest element added to 'allStats_'.
velox::tsan_atomic<ColumnStatistics*> latestStats_{nullptr};
// The latest element added to 'allStats_'. Mutable because statistics are
// logically separate from the column's identity (name, type) and may be
// updated even on a const Column.
mutable velox::tsan_atomic<ColumnStatistics*> latestStats_{nullptr};

// All statistics recorded for this column. Old values can be purged when the
// containing Schema is not in use.
std::vector<std::unique_ptr<ColumnStatistics>> allStats_;
// containing Schema is not in use. Mutable for the same reason as
// latestStats_.
mutable std::vector<std::unique_ptr<ColumnStatistics>> allStats_;

private:
// Serializes changes to statistics.
std::mutex mutex_;
// Serializes changes to statistics. Mutable for the same reason as
// latestStats_.
mutable std::mutex mutex_;
};

class Table;
Expand Down
70 changes: 31 additions & 39 deletions axiom/connectors/hive/LocalHiveConnectorMetadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,13 +177,14 @@ std::vector<SplitSource::SplitAndGroup> LocalHiveSplitSource::getSplits(

LocalHiveConnectorMetadata::LocalHiveConnectorMetadata(
velox::connector::hive::HiveConnector* hiveConnector)
: HiveConnectorMetadata(hiveConnector), splitManager_(this) {}
: HiveConnectorMetadata(hiveConnector), splitManager_(this) {
initialize();
}

void LocalHiveConnectorMetadata::reinitialize() {
std::lock_guard<std::mutex> l(mutex_);
tables_.clear();
initialize();
initialized_ = true;
}

void LocalHiveConnectorMetadata::initialize() {
Expand All @@ -198,22 +199,14 @@ void LocalHiveConnectorMetadata::initialize() {
readTables(path);
}

void LocalHiveConnectorMetadata::ensureInitialized() const {
std::lock_guard<std::mutex> l(mutex_);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this function was removed, do we still need the mutex?

I thought this was to guard against the case where multiple threads invoke ensureInitialized(). Could you double check

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hdikeman Thanks for the comment.
I noticed that mutex_ is not only used here, it is used in many other places too such as findTable, dropTable etc.

if (initialized_) {
return;
}
const_cast<LocalHiveConnectorMetadata*>(this)->initialize();
initialized_ = true;
}

std::shared_ptr<velox::core::QueryCtx> LocalHiveConnectorMetadata::makeQueryCtx(
const std::string& queryId) {
const std::string& queryId) const {
std::unordered_map<std::string, std::string> config;
std::unordered_map<std::string, std::shared_ptr<velox::config::ConfigBase>>
connectorConfigs;
connectorConfigs[hiveConnector_->connectorId()] =
std::const_pointer_cast<velox::config::ConfigBase>(hiveConfig_->config());
std::make_shared<velox::config::ConfigBase>(
hiveConfig_->config()->rawConfigsCopy());

return velox::core::QueryCtx::create(
hiveConnector_->executor(),
Expand Down Expand Up @@ -269,8 +262,13 @@ std::pair<int64_t, int64_t> LocalHiveTableLayout::sample(
std::vector<ColumnStatistics>* statistics) const {
VELOX_CHECK(extraFilters.empty());

auto connectorQueryCtx = reinterpret_cast<LocalHiveConnectorMetadata*>(
ConnectorMetadata::metadata(connector()))
->connectorQueryCtx();

std::vector<std::unique_ptr<StatisticsBuilder>> builders;
auto result = sample(handle, pct, scanType, fields, allocator, &builders);
auto result = sample(
handle, pct, scanType, fields, allocator, &builders, connectorQueryCtx);
if (!statistics) {
return result;
}
Expand All @@ -292,7 +290,9 @@ std::pair<int64_t, int64_t> LocalHiveTableLayout::sample(
velox::RowTypePtr scanType,
const std::vector<velox::common::Subfield>& fields,
velox::HashStringAllocator* allocator,
std::vector<std::unique_ptr<StatisticsBuilder>>* statsBuilders) const {
std::vector<std::unique_ptr<StatisticsBuilder>>* statsBuilders,
const std::shared_ptr<velox::connector::ConnectorQueryCtx>&
connectorQueryCtx) const {
StatisticsBuilderOptions options = {
.maxStringLength = 100, .countDistincts = true, .allocator = allocator};

Expand All @@ -318,10 +318,6 @@ std::pair<int64_t, int64_t> LocalHiveTableLayout::sample(

const auto outputType = ROW(std::move(names), std::move(types));

auto connectorQueryCtx = reinterpret_cast<LocalHiveConnectorMetadata*>(
ConnectorMetadata::metadata(connector()))
->connectorQueryCtx();

const auto maxRowsToScan = table().numRows() * (pct / 100);

int64_t passingRows = 0;
Expand Down Expand Up @@ -364,7 +360,7 @@ std::pair<int64_t, int64_t> LocalHiveTableLayout::sample(

void LocalTable::makeDefaultLayout(
std::vector<std::unique_ptr<const FileInfo>> files,
LocalHiveConnectorMetadata& metadata) {
const LocalHiveConnectorMetadata& metadata) {
if (!layouts_.empty()) {
// The table already has a layout made from a schema file.
reinterpret_cast<LocalHiveTableLayout*>(layouts_[0].get())
Expand Down Expand Up @@ -904,7 +900,7 @@ void LocalHiveConnectorMetadata::loadTable(
VELOX_CHECK_NOT_NULL(column, "Column not found: {}", name);

if (auto readerStats = reader->columnStatistics(i)) {
auto* stats = const_cast<Column*>(column)->mutableStats();
auto* stats = column->mutableStats();
stats->numValues += readerStats->getNumberOfValues().value_or(0);

const auto numValues = readerStats->getNumberOfValues();
Expand All @@ -923,7 +919,7 @@ void LocalHiveConnectorMetadata::loadTable(
// Set pct to sample ~100K rows.
pct = 100 * 100'000 / table->numRows();
}
table->sampleNumDistincts(pct, schemaPool_.get());
table->sampleNumDistincts(pct, schemaPool_.get(), *this);
}

namespace {
Expand Down Expand Up @@ -980,7 +976,8 @@ LocalTable::LocalTable(

void LocalTable::sampleNumDistincts(
float samplePct,
velox::memory::MemoryPool* pool) {
velox::memory::MemoryPool* pool,
const LocalHiveConnectorMetadata& metadata) {
std::vector<velox::common::Subfield> fields;
fields.reserve(type()->size());
for (auto i = 0; i < type()->size(); ++i) {
Expand All @@ -991,19 +988,14 @@ void LocalTable::sampleNumDistincts(
auto allocator = std::make_unique<velox::HashStringAllocator>(pool);
auto* layout = layouts_[0].get();

auto* metadata = ConnectorMetadata::metadata(layout->connector());

std::vector<velox::connector::ColumnHandlePtr> columns;
columns.reserve(type()->size());
for (auto i = 0; i < type()->size(); ++i) {
columns.push_back(layout->createColumnHandle(
/*session=*/nullptr, type()->nameOf(i)));
}

auto* localHiveMetadata =
dynamic_cast<const LocalHiveConnectorMetadata*>(metadata);
auto& evaluator =
*localHiveMetadata->connectorQueryCtx()->expressionEvaluator();
auto& evaluator = *metadata.connectorQueryCtx()->expressionEvaluator();

std::vector<velox::core::TypedExprPtr> ignore;
auto handle = layout->createTableHandle(
Expand All @@ -1014,13 +1006,19 @@ void LocalTable::sampleNumDistincts(

std::vector<std::unique_ptr<StatisticsBuilder>> statsBuilders;
auto [sampled, passed] = localLayout->sample(
handle, samplePct, type(), fields, allocator.get(), &statsBuilders);
handle,
samplePct,
type(),
fields,
allocator.get(),
&statsBuilders,
metadata.connectorQueryCtx());

numSampledRows_ = sampled;
for (auto i = 0; i < statsBuilders.size(); ++i) {
if (statsBuilders[i]) {
auto* column = findColumn(type()->nameOf(i));
ColumnStatistics& stats = *const_cast<Column*>(column)->mutableStats();
const auto* column = findColumn(type()->nameOf(i));
ColumnStatistics& stats = *column->mutableStats();
statsBuilders[i]->build(stats);
auto estimate = stats.numDistinct;
int64_t approxNumDistinct =
Expand Down Expand Up @@ -1053,16 +1051,13 @@ void LocalTable::sampleNumDistincts(
}
}

const_cast<Column*>(findColumn(type()->nameOf(i)))
->mutableStats()
->numDistinct = approxNumDistinct;
column->mutableStats()->numDistinct = approxNumDistinct;
}
}
}
}

TablePtr LocalHiveConnectorMetadata::findTable(std::string_view name) {
ensureInitialized();
std::lock_guard<std::mutex> l(mutex_);
return findTableLocked(name);
}
Expand Down Expand Up @@ -1168,7 +1163,6 @@ TablePtr LocalHiveConnectorMetadata::createTable(
const velox::RowTypePtr& rowType,
const folly::F14FastMap<std::string, velox::Variant>& options) {
validateOptions(options);
ensureInitialized();
auto path = tablePath(tableName);
if (dirExists(path)) {
VELOX_USER_FAIL("Table {} already exists", tableName);
Expand Down Expand Up @@ -1269,8 +1263,6 @@ bool LocalHiveConnectorMetadata::dropTable(
const ConnectorSessionPtr& /* session */,
std::string_view tableName,
bool ifExists) {
ensureInitialized();

std::lock_guard<std::mutex> l(mutex_);
if (!tables_.contains(tableName)) {
if (ifExists) {
Expand Down
20 changes: 10 additions & 10 deletions axiom/connectors/hive/LocalHiveConnectorMetadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,9 @@ class LocalHiveTableLayout : public HiveTableLayout {
velox::RowTypePtr scanType,
const std::vector<velox::common::Subfield>& fields,
velox::HashStringAllocator* allocator,
std::vector<std::unique_ptr<StatisticsBuilder>>* statsBuilders) const;
std::vector<std::unique_ptr<StatisticsBuilder>>* statsBuilders,
const std::shared_ptr<velox::connector::ConnectorQueryCtx>&
connectorQueryCtx) const;

private:
std::vector<std::unique_ptr<const FileInfo>> files_;
Expand All @@ -165,7 +167,7 @@ class LocalTable : public HiveTable {

void makeDefaultLayout(
std::vector<std::unique_ptr<const FileInfo>> files,
LocalHiveConnectorMetadata& metadata);
const LocalHiveConnectorMetadata& metadata);

uint64_t numRows() const override {
return numRows_;
Expand All @@ -177,7 +179,10 @@ class LocalTable : public HiveTable {

/// Samples 'samplePct' % rows of the table and sets the num distincts
/// estimate for the columns. uses 'pool' for temporary data.
void sampleNumDistincts(float samplePct, velox::memory::MemoryPool* pool);
void sampleNumDistincts(
float samplePct,
velox::memory::MemoryPool* pool,
const LocalHiveConnectorMetadata& metadata);

private:
// Serializes initialization, e.g. exportedColumns_.
Expand All @@ -202,7 +207,6 @@ class LocalHiveConnectorMetadata : public HiveConnectorMetadata {
TablePtr findTable(std::string_view name) override;

ConnectorSplitManager* splitManager() override {
ensureInitialized();
return &splitManager_;
}

Expand All @@ -229,12 +233,11 @@ class LocalHiveConnectorMetadata : public HiveConnectorMetadata {
/// tables for name resolution.
const folly::F14FastMap<std::string, std::shared_ptr<LocalTable>>& tables()
const {
ensureInitialized();
return tables_;
}

std::shared_ptr<velox::core::QueryCtx> makeQueryCtx(
const std::string& queryId);
const std::string& queryId) const;

TablePtr createTable(
const ConnectorSessionPtr& session,
Expand Down Expand Up @@ -273,10 +276,8 @@ class LocalHiveConnectorMetadata : public HiveConnectorMetadata {
void reloadTableFromPath(std::string_view tableName);

private:
// Used to lazy initialize this in ensureInitialized() and to implement
// reinitialize().
// Used to initialize this in constructor and to implement reinitialize().
void initialize();
void ensureInitialized() const override;
void makeQueryCtx();
void makeConnectorQueryCtx();
void readTables(std::string_view path);
Expand All @@ -285,7 +286,6 @@ class LocalHiveConnectorMetadata : public HiveConnectorMetadata {
std::shared_ptr<LocalTable> findTableLocked(std::string_view name) const;

mutable std::mutex mutex_;
mutable bool initialized_{false};
std::shared_ptr<velox::memory::MemoryPool> rootPool_{
velox::memory::memoryManager()->addRootPool()};
std::shared_ptr<velox::memory::MemoryPool> schemaPool_;
Expand Down