Skip to content

Commit fe45398

Browse files
shangm2Presto CUDF CI
authored andcommitted
Add support to provide thrift codec for connector specific fields (prestodb#25242)
## Description TODO: Clean up the json field prestodb#25671 To keep the trunk cleaner and also move this pr forward, [we had a offline discussion and agree to update this pr with the following items](https://prestodb.slack.com/archives/C091L9VBAFR/p1752721296470699?thread_ts=1752715197.618069&cid=C091L9VBAFR): 1. Keep ConnectorSplit and ConnectorTransactionHandle in ConnectorCodecProvider. 2. Remove Hive implementation, in particular Hive*Split and Hive*TransactionHandle removing mixed JSON/Thrift support 3. Make ExecutionWriterTarget a Union and add support in `ConnectorCodecProvider` for related handles . 4. Deprecate MetadataUpdate. The original description: 1. Add thrift support for split and transaction handle 2. but only activated if the feature toggle is on and a proper connector specific codec is provided 3. **We will be using byte array for serialization for now and will iterate on the _interface definition within SPI_ to avoid unnecessary allocations for better performance.** 4. Instructions on how to use the thrift .idl file can be found in the [rfc](prestodb/rfcs#38). <img width="1035" height="427" alt="Screenshot 2025-07-16 at 07 58 01" src="https://github.com/user-attachments/assets/7f043ce7-7faa-43be-be8d-62d4ebdc81ee" /> 5. cpp side changes: prestodb#25595 ## Motivation and Context prestodb/rfcs#38 ## Impact <!---Describe any public API or user-facing feature change or any performance impact--> ## Test Plan 1. passed verifier run ## Contributor checklist - [ ] Please make sure your submission complies with our [contributing guide](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md), in particular [code style](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md#code-style) and [commit standards](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md#commit-standards). - [ ] PR description addresses the issue accurately and concisely. If the change is non-trivial, a GitHub Issue is referenced. - [ ] Documented new properties (with its default value), SQL syntax, functions, or other functionality. - [ ] If release notes are required, they follow the [release notes guidelines](https://github.com/prestodb/presto/wiki/Release-Notes-Guidelines). - [ ] Adequate tests were added if applicable. - [ ] CI passed. ## Release Notes Please follow [release notes guidelines](https://github.com/prestodb/presto/wiki/Release-Notes-Guidelines) and fill in the release notes below. ``` == RELEASE NOTES == General Changes * Improve efficiency of coordinator by supporting thrift codec for connector-specific data. ```
1 parent 433b17c commit fe45398

File tree

62 files changed

+2198
-299
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

62 files changed

+2198
-299
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@
5252
<dep.okhttp.version>4.12.0</dep.okhttp.version>
5353
<dep.jdbi3.version>3.4.0</dep.jdbi3.version>
5454
<dep.oracle.version>19.3.0.0</dep.oracle.version>
55-
<dep.drift.version>1.43</dep.drift.version>
55+
<dep.drift.version>1.45</dep.drift.version>
5656
<!-- Changing joda version changes tzdata which must match deployed JVM tzdata
5757
Do not change this without also making sure it matches -->
5858
<dep.joda.version>2.13.1</dep.joda.version>

presto-hdfs-core/src/main/java/com/facebook/presto/hive/CacheQuotaRequirement.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@
1313
*/
1414
package com.facebook.presto.hive;
1515

16+
import com.facebook.drift.annotations.ThriftConstructor;
17+
import com.facebook.drift.annotations.ThriftField;
18+
import com.facebook.drift.annotations.ThriftStruct;
1619
import com.fasterxml.jackson.annotation.JsonCreator;
1720
import com.fasterxml.jackson.annotation.JsonProperty;
1821
import io.airlift.units.DataSize;
@@ -24,6 +27,7 @@
2427
import static com.google.common.base.MoreObjects.toStringHelper;
2528
import static java.util.Objects.requireNonNull;
2629

30+
@ThriftStruct
2731
public class CacheQuotaRequirement
2832
{
2933
public static final CacheQuotaRequirement NO_CACHE_REQUIREMENT = new CacheQuotaRequirement(GLOBAL, Optional.empty());
@@ -32,6 +36,7 @@ public class CacheQuotaRequirement
3236
private final Optional<DataSize> quota;
3337

3438
@JsonCreator
39+
@ThriftConstructor
3540
public CacheQuotaRequirement(
3641
@JsonProperty("cacheQuotaScope") CacheQuotaScope cacheQuotaScope,
3742
@JsonProperty("quota") Optional<DataSize> quota)
@@ -41,12 +46,14 @@ public CacheQuotaRequirement(
4146
}
4247

4348
@JsonProperty
49+
@ThriftField(1)
4450
public CacheQuotaScope getCacheQuotaScope()
4551
{
4652
return cacheQuotaScope;
4753
}
4854

4955
@JsonProperty
56+
@ThriftField(2)
5057
public Optional<DataSize> getQuota()
5158
{
5259
return quota;

presto-hdfs-core/src/main/java/com/facebook/presto/hive/CacheQuotaScope.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,28 @@
1313
*/
1414
package com.facebook.presto.hive;
1515

16+
import com.facebook.drift.annotations.ThriftEnum;
17+
import com.facebook.drift.annotations.ThriftEnumValue;
18+
19+
@ThriftEnum
1620
public enum CacheQuotaScope
1721
{
18-
GLOBAL, SCHEMA, TABLE, PARTITION
22+
GLOBAL(0),
23+
SCHEMA(1),
24+
TABLE(2),
25+
PARTITION(3),
26+
/**/;
27+
28+
private final int value;
29+
30+
CacheQuotaScope(int value)
31+
{
32+
this.value = value;
33+
}
34+
35+
@ThriftEnumValue
36+
public int getValue()
37+
{
38+
return value;
39+
}
1940
}
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.connector;
15+
16+
import com.facebook.drift.codec.ThriftCodecManager;
17+
import com.facebook.presto.spi.ConnectorCodec;
18+
import com.facebook.presto.spi.ConnectorDeleteTableHandle;
19+
import com.facebook.presto.spi.ConnectorId;
20+
import com.facebook.presto.spi.ConnectorInsertTableHandle;
21+
import com.facebook.presto.spi.ConnectorOutputTableHandle;
22+
import com.facebook.presto.spi.ConnectorSplit;
23+
import com.facebook.presto.spi.ConnectorTableHandle;
24+
import com.facebook.presto.spi.ConnectorTableLayoutHandle;
25+
import com.facebook.presto.spi.connector.ConnectorCodecProvider;
26+
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
27+
import com.facebook.presto.thrift.RemoteCodecProvider;
28+
import com.google.inject.Provider;
29+
30+
import javax.inject.Inject;
31+
32+
import java.util.Map;
33+
import java.util.Optional;
34+
import java.util.concurrent.ConcurrentHashMap;
35+
36+
import static com.facebook.presto.operator.ExchangeOperator.REMOTE_CONNECTOR_ID;
37+
import static java.util.Objects.requireNonNull;
38+
39+
public class ConnectorCodecManager
40+
{
41+
private final Map<String, ConnectorCodecProvider> connectorCodecProviders = new ConcurrentHashMap<>();
42+
43+
@Inject
44+
public ConnectorCodecManager(Provider<ThriftCodecManager> thriftCodecManagerProvider)
45+
{
46+
requireNonNull(thriftCodecManagerProvider, "thriftCodecManager is null");
47+
48+
connectorCodecProviders.put(REMOTE_CONNECTOR_ID.toString(), new RemoteCodecProvider(thriftCodecManagerProvider));
49+
}
50+
51+
public void addConnectorCodecProvider(ConnectorId connectorId, ConnectorCodecProvider connectorCodecProvider)
52+
{
53+
requireNonNull(connectorId, "connectorId is null");
54+
requireNonNull(connectorCodecProvider, "connectorThriftCodecProvider is null");
55+
connectorCodecProviders.put(connectorId.getCatalogName(), connectorCodecProvider);
56+
}
57+
58+
public Optional<ConnectorCodec<ConnectorSplit>> getConnectorSplitCodec(String connectorId)
59+
{
60+
requireNonNull(connectorId, "connectorId is null");
61+
return Optional.ofNullable(connectorCodecProviders.get(connectorId)).flatMap(ConnectorCodecProvider::getConnectorSplitCodec);
62+
}
63+
64+
public Optional<ConnectorCodec<ConnectorTransactionHandle>> getTransactionHandleCodec(String connectorId)
65+
{
66+
requireNonNull(connectorId, "connectorId is null");
67+
return Optional.ofNullable(connectorCodecProviders.get(connectorId)).flatMap(ConnectorCodecProvider::getConnectorTransactionHandleCodec);
68+
}
69+
70+
public Optional<ConnectorCodec<ConnectorOutputTableHandle>> getOutputTableHandleCodec(String connectorId)
71+
{
72+
requireNonNull(connectorId, "connectorId is null");
73+
return Optional.ofNullable(connectorCodecProviders.get(connectorId)).flatMap(ConnectorCodecProvider::getConnectorOutputTableHandleCodec);
74+
}
75+
76+
public Optional<ConnectorCodec<ConnectorInsertTableHandle>> getInsertTableHandleCodec(String connectorId)
77+
{
78+
requireNonNull(connectorId, "connectorId is null");
79+
return Optional.ofNullable(connectorCodecProviders.get(connectorId)).flatMap(ConnectorCodecProvider::getConnectorInsertTableHandleCodec);
80+
}
81+
82+
public Optional<ConnectorCodec<ConnectorDeleteTableHandle>> getDeleteTableHandleCodec(String connectorId)
83+
{
84+
requireNonNull(connectorId, "connectorId is null");
85+
return Optional.ofNullable(connectorCodecProviders.get(connectorId)).flatMap(ConnectorCodecProvider::getConnectorDeleteTableHandleCodec);
86+
}
87+
88+
public Optional<ConnectorCodec<ConnectorTableLayoutHandle>> getTableLayoutHandleCodec(String connectorId)
89+
{
90+
requireNonNull(connectorId, "connectorId is null");
91+
return Optional.ofNullable(connectorCodecProviders.get(connectorId)).flatMap(ConnectorCodecProvider::getConnectorTableLayoutHandleCodec);
92+
}
93+
94+
public Optional<ConnectorCodec<ConnectorTableHandle>> getTableHandleCodec(String connectorId)
95+
{
96+
requireNonNull(connectorId, "connectorId is null");
97+
return Optional.ofNullable(connectorCodecProviders.get(connectorId)).flatMap(ConnectorCodecProvider::getConnectorTableHandleCodec);
98+
}
99+
}

presto-main-base/src/main/java/com/facebook/presto/connector/ConnectorManager.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
4141
import com.facebook.presto.spi.connector.Connector;
4242
import com.facebook.presto.spi.connector.ConnectorAccessControl;
43+
import com.facebook.presto.spi.connector.ConnectorCodecProvider;
4344
import com.facebook.presto.spi.connector.ConnectorContext;
4445
import com.facebook.presto.spi.connector.ConnectorFactory;
4546
import com.facebook.presto.spi.connector.ConnectorIndexProvider;
@@ -118,6 +119,7 @@ public class ConnectorManager
118119
private final FilterStatsCalculator filterStatsCalculator;
119120
private final BlockEncodingSerde blockEncodingSerde;
120121
private final ConnectorSystemConfig connectorSystemConfig;
122+
private final ConnectorCodecManager connectorCodecManager;
121123

122124
@GuardedBy("this")
123125
private final ConcurrentMap<String, ConnectorFactory> connectorFactories = new ConcurrentHashMap<>();
@@ -151,7 +153,8 @@ public ConnectorManager(
151153
DeterminismEvaluator determinismEvaluator,
152154
FilterStatsCalculator filterStatsCalculator,
153155
BlockEncodingSerde blockEncodingSerde,
154-
FeaturesConfig featuresConfig)
156+
FeaturesConfig featuresConfig,
157+
ConnectorCodecManager connectorCodecManager)
155158
{
156159
this.metadataManager = requireNonNull(metadataManager, "metadataManager is null");
157160
this.catalogManager = requireNonNull(catalogManager, "catalogManager is null");
@@ -176,6 +179,7 @@ public ConnectorManager(
176179
this.filterStatsCalculator = requireNonNull(filterStatsCalculator, "filterStatsCalculator is null");
177180
this.blockEncodingSerde = requireNonNull(blockEncodingSerde, "blockEncodingSerde is null");
178181
this.connectorSystemConfig = () -> featuresConfig.isNativeExecutionEnabled();
182+
this.connectorCodecManager = requireNonNull(connectorCodecManager, "connectorThriftCodecManager is null");
179183
}
180184

181185
@PreDestroy
@@ -303,7 +307,7 @@ private synchronized void addConnectorInternal(MaterializedConnector connector)
303307
connector.getPlanOptimizerProvider()
304308
.ifPresent(planOptimizerProvider -> connectorPlanOptimizerManager.addPlanOptimizerProvider(connectorId, planOptimizerProvider));
305309
}
306-
310+
connector.getConnectorCodecProvider().ifPresent(connectorCodecProvider -> connectorCodecManager.addConnectorCodecProvider(connectorId, connectorCodecProvider));
307311
metadataManager.getProcedureRegistry().addProcedures(connectorId, connector.getProcedures());
308312

309313
connector.getAccessControl()
@@ -392,6 +396,7 @@ private static class MaterializedConnector
392396
private final Optional<ConnectorIndexProvider> indexProvider;
393397
private final Optional<ConnectorNodePartitioningProvider> partitioningProvider;
394398
private final Optional<ConnectorPlanOptimizerProvider> planOptimizerProvider;
399+
private final Optional<ConnectorCodecProvider> connectorCodecProvider;
395400
private final Optional<ConnectorAccessControl> accessControl;
396401
private final List<PropertyMetadata<?>> sessionProperties;
397402
private final List<PropertyMetadata<?>> tableProperties;
@@ -472,6 +477,15 @@ public MaterializedConnector(ConnectorId connectorId, Connector connector)
472477
}
473478
this.planOptimizerProvider = Optional.ofNullable(planOptimizerProvider);
474479

480+
ConnectorCodecProvider connectorCodecProvider = null;
481+
try {
482+
connectorCodecProvider = connector.getConnectorCodecProvider();
483+
requireNonNull(connectorCodecProvider, format("Connector %s returned null connector specific codec provider", connectorId));
484+
}
485+
catch (UnsupportedOperationException ignored) {
486+
}
487+
this.connectorCodecProvider = Optional.ofNullable(connectorCodecProvider);
488+
475489
ConnectorAccessControl accessControl = null;
476490
try {
477491
accessControl = connector.getAccessControl();
@@ -580,5 +594,10 @@ public List<PropertyMetadata<?>> getAnalyzeProperties()
580594
{
581595
return analyzeProperties;
582596
}
597+
598+
public Optional<ConnectorCodecProvider> getConnectorCodecProvider()
599+
{
600+
return connectorCodecProvider;
601+
}
583602
}
584603
}

presto-main-base/src/main/java/com/facebook/presto/execution/Location.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,24 +13,30 @@
1313
*/
1414
package com.facebook.presto.execution;
1515

16+
import com.facebook.drift.annotations.ThriftConstructor;
17+
import com.facebook.drift.annotations.ThriftField;
18+
import com.facebook.drift.annotations.ThriftStruct;
1619
import com.fasterxml.jackson.annotation.JsonCreator;
1720
import com.fasterxml.jackson.annotation.JsonProperty;
1821

1922
import java.net.URI;
2023

2124
import static java.util.Objects.requireNonNull;
2225

26+
@ThriftStruct
2327
public class Location
2428
{
2529
private final String location;
2630

2731
@JsonCreator
32+
@ThriftConstructor
2833
public Location(@JsonProperty("location") String location)
2934
{
3035
this.location = requireNonNull(location, "location is null");
3136
}
3237

3338
@JsonProperty
39+
@ThriftField(1)
3440
public String getLocation()
3541
{
3642
return location;

0 commit comments

Comments
 (0)