Skip to content
5 changes: 5 additions & 0 deletions axiom/cli/Console.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ DEFINE_string(

DEFINE_bool(debug, false, "Enable debug mode");

DEFINE_bool(sample_joins, false, "Enable join sampling");
DEFINE_bool(sample_filters, false, "Enable filter sampling");

using namespace facebook::velox;

namespace axiom::sql {
Expand Down Expand Up @@ -222,6 +225,8 @@ void Console::runNoThrow(std::string_view sql, bool isInteractive) {
.splitTargetBytes = FLAGS_split_target_bytes,
.optimizerTraceFlags = FLAGS_optimizer_trace,
.debugMode = FLAGS_debug,
.sampleJoins = FLAGS_sample_joins,
.sampleFilters = FLAGS_sample_filters,
};

decltype(runner_.parseMultiple(sql, options)) statements;
Expand Down
9 changes: 8 additions & 1 deletion axiom/cli/SqlQueryRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ std::string SqlQueryRunner::runExplain(
case presto::ExplainStatement::Type::kExecutable:
return optimize(logicalPlan, newQuery(options), options).toString();
}
VELOX_UNREACHABLE();
}

namespace {
Expand Down Expand Up @@ -317,14 +318,20 @@ optimizer::PlanAndStats SqlQueryRunner::optimize(

auto session = std::make_shared<Session>(queryCtx->queryId());

optimizer::OptimizerOptions optimizerOptions{
.sampleJoins = options.sampleJoins,
.sampleFilters = options.sampleFilters,
.traceFlags = options.optimizerTraceFlags,
};

optimizer::Optimization optimization(
session,
*logicalPlan,
*schema_,
*history_,
queryCtx,
evaluator,
{.traceFlags = options.optimizerTraceFlags},
optimizerOptions,
opts);

if (checkDerivedTable && !checkDerivedTable(*optimization.rootDt())) {
Expand Down
6 changes: 6 additions & 0 deletions axiom/cli/SqlQueryRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ class SqlQueryRunner {

/// If true, EXPLAIN ANALYZE output includes custom operator stats.
bool debugMode{false};

/// If true, enable join sampling.
bool sampleJoins{false};

/// If true, enable filter sampling.
bool sampleFilters{false};
};

/// Runs a single SQL statement and returns the result.
Expand Down
62 changes: 62 additions & 0 deletions axiom/connectors/ConnectorMetadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@

#include "axiom/connectors/ConnectorMetadata.h"

#include <fmt/format.h>
#include <folly/String.h>
#include <sstream>

namespace facebook::axiom::connector {
namespace {

Expand Down Expand Up @@ -226,4 +230,62 @@ void ConnectorMetadata::unregisterMetadata(std::string_view connectorId) {
metadataRegistry().erase(connectorId);
}

std::string ColumnStatistics::toString() const {
std::vector<std::string> parts;

if (!name.empty()) {
parts.push_back(fmt::format("name={}", name));
}

if (nonNull) {
parts.push_back("nonNull=true");
}

if (nullPct != 0) {
parts.push_back(fmt::format("nullPct={}", nullPct));
}

if (min.has_value()) {
std::ostringstream oss;
oss << min.value();
parts.push_back("min=" + oss.str());
}

if (max.has_value()) {
std::ostringstream oss;
oss << max.value();
parts.push_back("max=" + oss.str());
}

if (maxLength.has_value()) {
parts.push_back(fmt::format("maxLength={}", maxLength.value()));
}

if (ascendingPct.has_value()) {
parts.push_back(fmt::format("ascendingPct={}", ascendingPct.value()));
}

if (descendingPct.has_value()) {
parts.push_back(fmt::format("descendingPct={}", descendingPct.value()));
}

if (avgLength.has_value()) {
parts.push_back(fmt::format("avgLength={}", avgLength.value()));
}

if (numDistinct.has_value()) {
parts.push_back(fmt::format("numDistinct={}", numDistinct.value()));
}

if (numValues != 0) {
parts.push_back(fmt::format("numValues={}", numValues));
}

if (!children.empty()) {
parts.push_back(fmt::format("children={}", children.size()));
}

return fmt::format("<{}>", folly::join(", ", parts));
}

} // namespace facebook::axiom::connector
16 changes: 16 additions & 0 deletions axiom/connectors/ConnectorMetadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,22 @@ struct ColumnStatistics {
/// map, may have one element for each key. In all cases, stats may be
/// missing.
std::vector<ColumnStatistics> children;

/// Returns a string representation of the statistics in the form
/// <field1=value1,...> containing only fields that have values.
std::string toString() const;
};

/// Abstract representation of statistics per data covered by a PartitionHandle.
struct PartitionStatistics {
int32_t numRows{0};
int64_t numFiles{0};

/// Column names.
std::vector<std::string> columns;

/// Column statistics, 1:1 to column names.
std::vector<ColumnStatistics> columnStatistics;
};

/// Base class for column. The column's name and type are immutable but the
Expand Down
25 changes: 25 additions & 0 deletions axiom/connectors/ConnectorSplitManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,25 @@ struct SplitOptions {
class PartitionHandle {
public:
virtual ~PartitionHandle() = default;

virtual std::string toString() const {
return "PartitionHandle";
}

/// Returns the partition path string in Hive format (e.g.,
/// "ds=2023-01-01/product=p1"). Returns an empty string for unpartitioned
/// tables.
virtual const std::string& partition() const {
static std::string empty;
return empty;
}
};

using PartitionHandlePtr = std::shared_ptr<const PartitionHandle>;

struct PartitionStatistics;
using PartitionStatisticsPtr = std::shared_ptr<PartitionStatistics>;

class ConnectorSplitManager {
public:
virtual ~ConnectorSplitManager() = default;
Expand All @@ -73,6 +88,16 @@ class ConnectorSplitManager {
const ConnectorSessionPtr& session,
const velox::connector::ConnectorTableHandlePtr& tableHandle) = 0;

/// Returns per-partition statistics for the 'partitions' and
/// 'columns'. This is a separate function because split enumeration
/// for reading files transfers less data. Typically use only a
/// subset of the partitions to get stats.
virtual std::vector<PartitionStatisticsPtr> getPartitionStatistics(
std::span<const PartitionHandlePtr> partitions,
const std::vector<std::string>& columns) {
return {};
}

/// Returns a SplitSource that covers the contents of 'partitions'. The set of
/// partitions is exposed separately so that the caller may process the
/// partitions in a specific order or distribute them to specific nodes in a
Expand Down
41 changes: 41 additions & 0 deletions axiom/connectors/hive/HiveConnectorMetadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,35 @@
#include "velox/exec/TableWriter.h"
#include "velox/expression/ExprConstants.h"

#include <fmt/format.h>
#include <folly/String.h>
#include <gflags/gflags.h>
#include <algorithm>

DEFINE_int64(
hive_max_sample_size,
64 << 20,
"Maximum bytes to read from any one file during sampling");

namespace facebook::axiom::connector::hive {

std::string HivePartitionHandle::makePartitionString(
const folly::F14FastMap<std::string, std::optional<std::string>>& keys) {
if (keys.empty()) {
return "";
}
std::vector<std::string> parts;
parts.reserve(keys.size());
for (const auto& [key, value] : keys) {
if (value.has_value()) {
parts.push_back(fmt::format("{}={}", key, value.value()));
} else {
parts.push_back(fmt::format("{}=__HIVE_DEFAULT_PARTITION__", key));
}
}
return folly::join("/", parts);
}

const PartitionType* HivePartitionType::copartition(
const PartitionType& other) const {
if (const auto* otherPartitionType = other.as<HivePartitionType>()) {
Expand Down Expand Up @@ -380,4 +407,18 @@ void HiveConnectorMetadata::validateOptions(
}
}

std::string HivePartitionHandle::toString() const {
std::string result = partition();

// Append bucket number if present
if (tableBucketNumber.has_value()) {
if (!result.empty()) {
result += ", ";
}
result += fmt::format("buckets={}", tableBucketNumber.value());
}

return fmt::format("<hive partition: {}>", result);
}

} // namespace facebook::axiom::connector::hive
25 changes: 24 additions & 1 deletion axiom/connectors/hive/HiveConnectorMetadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

#pragma once

#include <fmt/format.h>
#include <folly/String.h>
#include "axiom/connectors/ConnectorMetadata.h"
#include "velox/connectors/hive/HiveConnector.h"
#include "velox/connectors/hive/HiveDataSink.h"
Expand All @@ -33,11 +35,24 @@ struct HivePartitionHandle : public PartitionHandle {
folly::F14FastMap<std::string, std::optional<std::string>> partitionKeys,
std::optional<int32_t> tableBucketNumber)
: partitionKeys(std::move(partitionKeys)),
tableBucketNumber(tableBucketNumber) {}
tableBucketNumber(tableBucketNumber),
partition_(makePartitionString(this->partitionKeys)) {}

std::string toString() const override;

const folly::F14FastMap<std::string, std::optional<std::string>>
partitionKeys;
const std::optional<int32_t> tableBucketNumber;

const std::string& partition() const override {
return partition_;
}

private:
static std::string makePartitionString(
const folly::F14FastMap<std::string, std::optional<std::string>>& keys);

const std::string partition_;
};

/// For Hive, 'partition' means 'bucket'.
Expand All @@ -61,6 +76,10 @@ class HivePartitionType : public connector::PartitionType {

std::string toString() const override;

int32_t numPartitions() const {
return numPartitions_;
}

private:
const int32_t numPartitions_;
const std::vector<velox::TypePtr> partitionKeyTypes_;
Expand Down Expand Up @@ -125,6 +144,10 @@ class HiveTableLayout : public TableLayout {
return partitionType_.has_value() ? &partitionType_.value() : nullptr;
}

std::span<const Column* const> discretePredicateColumns() const override {
return hivePartitionColumns_;
}

/// Returns SerDe parameters for this layout. Default implementation returns
/// empty map. Derived classes can override to provide actual parameters.
virtual const std::unordered_map<std::string, std::string>& serdeParameters()
Expand Down
Loading
Loading