Skip to content

Commit 3705024

Browse files
authored
feat: Support connector factory registeration in Pre… (#26312)
``` == NO RELEASE NOTE == ```
1 parent e3fa9d8 commit 3705024

File tree

5 files changed

+307
-56
lines changed

5 files changed

+307
-56
lines changed

presto-native-execution/presto_cpp/main/connectors/Registration.cpp

Lines changed: 40 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -33,65 +33,21 @@ namespace {
3333
constexpr char const* kHiveHadoop2ConnectorName = "hive-hadoop2";
3434
constexpr char const* kIcebergConnectorName = "iceberg";
3535

36-
const std::unordered_map<
37-
std::string,
38-
const std::shared_ptr<velox::connector::ConnectorFactory>>&
39-
connectorFactories() {
40-
static const std::unordered_map<
41-
std::string,
42-
const std::shared_ptr<velox::connector::ConnectorFactory>>
43-
factories = {
44-
{velox::connector::hive::HiveConnectorFactory::kHiveConnectorName,
45-
std::make_shared<velox::connector::hive::HiveConnectorFactory>()},
46-
{kHiveHadoop2ConnectorName,
47-
std::make_shared<velox::connector::hive::HiveConnectorFactory>(
48-
kHiveHadoop2ConnectorName)},
49-
{velox::connector::tpch::TpchConnectorFactory::kTpchConnectorName,
50-
std::make_shared<velox::connector::tpch::TpchConnectorFactory>()},
51-
{kIcebergConnectorName,
52-
std::make_shared<velox::connector::hive::HiveConnectorFactory>(
53-
kIcebergConnectorName)},
54-
#ifdef PRESTO_ENABLE_ARROW_FLIGHT_CONNECTOR
55-
{ArrowFlightConnectorFactory::kArrowFlightConnectorName,
56-
std::make_shared<ArrowFlightConnectorFactory>()},
57-
#endif
58-
};
59-
return factories;
60-
}
61-
6236
} // namespace
6337

64-
velox::connector::ConnectorFactory* getConnectorFactory(
65-
const std::string& connectorName) {
66-
{
67-
#ifdef PRESTO_ENABLE_CUDF
68-
if (velox::cudf_velox::CudfConfig::getInstance().enabled) {
69-
if (connectorName ==
70-
velox::connector::hive::HiveConnectorFactory::kHiveConnectorName) {
71-
static const auto factory = std::make_shared<
72-
velox::cudf_velox::connector::hive::CudfHiveConnectorFactory>();
73-
return factory.get();
74-
}
75-
if (connectorName == kHiveHadoop2ConnectorName) {
76-
static const auto factory = std::make_shared<
77-
velox::cudf_velox::connector::hive::CudfHiveConnectorFactory>(
78-
kHiveHadoop2ConnectorName);
79-
return factory.get();
80-
}
81-
}
82-
#endif
83-
auto it = connectorFactories().find(connectorName);
84-
if (it != connectorFactories().end()) {
85-
return it->second.get();
86-
}
87-
}
88-
if (!velox::connector::hasConnectorFactory(connectorName)) {
89-
VELOX_FAIL("ConnectorFactory with name '{}' not registered", connectorName);
38+
std::vector<std::string> listConnectorFactories() {
39+
std::vector<std::string> names;
40+
const auto& factories = detail::connectorFactories();
41+
names.reserve(factories.size());
42+
for (const auto& [name, _] : factories) {
43+
names.push_back(name);
9044
}
91-
return velox::connector::getConnectorFactory(connectorName).get();
45+
return names;
9246
}
9347

9448
void registerConnectors() {
49+
registerConnectorFactories();
50+
9551
registerPrestoToVeloxConnector(std::make_unique<HivePrestoToVeloxConnector>(
9652
velox::connector::hive::HiveConnectorFactory::kHiveConnectorName));
9753
registerPrestoToVeloxConnector(
@@ -118,4 +74,35 @@ void registerConnectors() {
11874
ArrowFlightConnectorFactory::kArrowFlightConnectorName));
11975
#endif
12076
}
77+
78+
void registerConnectorFactories() {
79+
// Register all connector factories using the facebook::presto namespace
80+
// factory registry
81+
82+
// Register Hive connector factory
83+
facebook::presto::registerConnectorFactory(
84+
std::make_shared<facebook::velox::connector::hive::HiveConnectorFactory>());
85+
86+
// Register Hive Hadoop2 connector factory
87+
facebook::presto::registerConnectorFactory(
88+
std::make_shared<facebook::velox::connector::hive::HiveConnectorFactory>(
89+
kHiveHadoop2ConnectorName));
90+
91+
// Register TPCH connector factory
92+
facebook::presto::registerConnectorFactory(
93+
std::make_shared<facebook::velox::connector::tpch::TpchConnectorFactory>());
94+
95+
// Register Iceberg connector factory (using Hive implementation)
96+
facebook::presto::registerConnectorFactory(
97+
std::make_shared<facebook::velox::connector::hive::HiveConnectorFactory>(
98+
kIcebergConnectorName));
99+
100+
#ifdef PRESTO_ENABLE_ARROW_FLIGHT_CONNECTOR
101+
// Note: ArrowFlightConnectorFactory would need to be implemented in Presto
102+
// namespace For now, keep the Velox version
103+
facebook::presto::registerConnectorFactory(
104+
std::make_shared<ArrowFlightConnectorFactory>());
105+
#endif
106+
}
107+
121108
} // namespace facebook::presto

presto-native-execution/presto_cpp/main/connectors/Registration.h

Lines changed: 64 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,79 @@
1313
*/
1414
#pragma once
1515

16+
#include <memory>
1617
#include <string>
18+
#include <unordered_map>
19+
#include <vector>
20+
21+
#include "folly/Executor.h"
22+
#include "velox/connectors/Connector.h"
1723

1824
// Forward declaration for ConnectorFactory.
1925
namespace facebook::velox::connector {
2026
class ConnectorFactory;
2127
} // namespace facebook::velox::connector
2228

2329
namespace facebook::presto {
30+
using facebook::velox::connector::ConnectorFactory;
31+
32+
namespace detail {
33+
inline std::unordered_map<std::string, std::shared_ptr<ConnectorFactory>>&
34+
connectorFactories() {
35+
static std::unordered_map<std::string, std::shared_ptr<ConnectorFactory>>
36+
factories;
37+
return factories;
38+
}
39+
} // namespace detail
40+
41+
/// Adds a factory for creating connectors to the registry using connector
42+
/// name as the key. Throws if factory with the same name is already present.
43+
/// Always returns true. The return value makes it easy to use with
44+
/// FB_ANONYMOUS_VARIABLE.
45+
inline bool registerConnectorFactory(
46+
std::shared_ptr<ConnectorFactory> factory) {
47+
const bool ok = detail::connectorFactories()
48+
.insert({factory->connectorName(), factory})
49+
.second;
50+
VELOX_CHECK(
51+
ok,
52+
"ConnectorFactory with name '{}' is already registered",
53+
factory->connectorName());
54+
return true;
55+
}
56+
57+
/// Returns true if a connector with the specified name has been registered,
58+
/// false otherwise.
59+
inline bool hasConnectorFactory(const std::string& connectorName) {
60+
return detail::connectorFactories().count(connectorName) == 1;
61+
}
62+
63+
/// Unregister a connector factory by name.
64+
/// Returns true if a connector with the specified name has been
65+
/// unregistered, false otherwise.
66+
inline bool unregisterConnectorFactory(const std::string& connectorName) {
67+
const auto count = detail::connectorFactories().erase(connectorName);
68+
return count == 1;
69+
}
70+
71+
/// Returns a factory for creating connectors with the specified name.
72+
/// Throws if factory doesn't exist.
73+
inline std::shared_ptr<ConnectorFactory> getConnectorFactory(
74+
const std::string& connectorName) {
75+
auto it = detail::connectorFactories().find(connectorName);
76+
VELOX_CHECK(
77+
it != detail::connectorFactories().end(),
78+
"ConnectorFactory with name '{}' not registered",
79+
connectorName);
80+
return it->second;
81+
}
82+
83+
/// Returns a list of all registered connector factory names.
84+
std::vector<std::string> listConnectorFactories();
2485

25-
velox::connector::ConnectorFactory* getConnectorFactory(
26-
const std::string& connectorName);
86+
/// Registers all connector factories using the facebook::presto namespace
87+
/// factory registry.
88+
void registerConnectorFactories();
2789

2890
void registerConnectors();
2991

presto-native-execution/presto_cpp/main/tests/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
add_executable(
1313
presto_server_test
1414
AnnouncerTest.cpp
15+
ConnectorTest.cpp
1516
CoordinatorDiscovererTest.cpp
1617
HttpServerWrapper.cpp
1718
PeriodicMemoryCheckerTest.cpp

0 commit comments

Comments
 (0)