Skip to content

Commit 51ef4df

Browse files
committed
Add RFC to add thrift support for task status/update/info
1 parent 4a28092 commit 51ef4df

File tree

1 file changed

+311
-0
lines changed

1 file changed

+311
-0
lines changed
Lines changed: 311 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,311 @@
1+
# **RFC0 for Presto**
2+
3+
See [CONTRIBUTING.md](CONTRIBUTING.md) for instructions on creating your RFC and the process surrounding it.
4+
5+
## [Thrift Serialization for TaskStatus, TaskInfo and TaskUpdateRequest]
6+
7+
Proposers
8+
9+
* Shang Ma
10+
* Vivian Hsu
11+
12+
## [Related Issues]
13+
14+
Related issues may include Github issues, PRs or other RFCs.
15+
- prestodb/presto#25020
16+
- prestodb/presto#25079
17+
18+
## Summary
19+
20+
Support thrift serialization of TaskStatus, TaskInfo, and TaskUpdateRequest classes for getTaskStatus and createOrUpdateTask APIs for both Java and C++ worker types to reduce CPU overhead
21+
22+
## Background
23+
24+
Presto coordinator sends updates to workers and workers respond with taskInfo. Both the taskUpdateRequest and taskInfo are currently serialized using JSON, which can be CPU intensive. And in the case of high task currency, this can become a bottleneck for the coordinator which in turn becomes a bottleneck for the whole cluster.
25+
26+
27+
### [Optional] Goals
28+
1. Support thrift serde for TaskStatus, TaskInfo, and TaskRequestUpdate classes for both Java and C++ workers
29+
2. Maintain backward compatibility with existing JSON serialization
30+
3. Use drift IDL generator to produce the IDL file and use it to generate c++ classes for native workers
31+
4. Allow multiple serialization formats to coexist
32+
5. Support future serialization formats without SPI changes
33+
6. Allow gradual migration from current design to new design
34+
35+
36+
## Proposed Implementation
37+
38+
### Current Architecture for Json Serde
39+
```java
40+
41+
// Use jackson annotation
42+
public class Split {
43+
@JsonProperty
44+
private final ConnectorSplit connectorSplit;
45+
// ... other fields and methods
46+
@JsonCreator
47+
public Split(...);
48+
}
49+
```
50+
51+
#### For Polymorphic Types e.g. ConnectorSplit
52+
```java
53+
// A handle resolver to return the correct type info in runtime
54+
public HandleResolver()
55+
{
56+
handleResolvers.put(REMOTE_CONNECTOR_ID.toString(), new MaterializedHandleResolver(new RemoteHandleResolver()));
57+
handleResolvers.put("$system", new MaterializedHandleResolver(new SystemHandleResolver()));
58+
handleResolvers.put("$info_schema", new MaterializedHandleResolver(new InformationSchemaHandleResolver()));
59+
handleResolvers.put("$empty", new MaterializedHandleResolver(new EmptySplitHandleResolver()));
60+
61+
functionHandleResolvers.put("$static", new MaterializedFunctionHandleResolver(new BuiltInFunctionNamespaceHandleResolver()));
62+
functionHandleResolvers.put("$session", new MaterializedFunctionHandleResolver(new SessionFunctionHandleResolver()));
63+
}
64+
65+
// Register correct serde methods for different types
66+
protected AbstractTypedJacksonModule(
67+
Class<T> baseClass,
68+
Function<T, String> nameResolver,
69+
Function<String, Class<? extends T>> classResolver)
70+
{
71+
super(baseClass.getSimpleName() + "Module", Version.unknownVersion());
72+
73+
TypeIdResolver typeResolver = new InternalTypeResolver<>(nameResolver, classResolver);
74+
75+
addSerializer(baseClass, new InternalTypeSerializer<>(baseClass, typeResolver));
76+
addDeserializer(baseClass, new InternalTypeDeserializer<>(baseClass, typeResolver));
77+
}
78+
79+
// A class to bind the two above together
80+
public class SplitJacksonModule
81+
extends AbstractTypedJacksonModule<ConnectorSplit>
82+
{
83+
@Inject
84+
public SplitJacksonModule(HandleResolver handleResolver)
85+
{
86+
super(ConnectorSplit.class,
87+
handleResolver::getId,
88+
handleResolver::getSplitClass);
89+
}
90+
}
91+
```
92+
93+
### Option 1: Extend Current Architecture for Thrift Serde
94+
```java
95+
// Use drift annotation
96+
public class Split {
97+
@JsonProperty
98+
@ThriftField
99+
private final ConnectorSplit connectorSplit;
100+
// ... other fields and methods
101+
@JsonCreator
102+
@ThriftConstructor
103+
public Split(...);
104+
}
105+
```
106+
107+
#### For Polymorphic Types e.g. ConnectorSplit
108+
```java
109+
// Similarly, we register correct method for a give type using existing handle resolver
110+
protected AbstractTyped**Thrift**Module(
111+
Class<T> baseClass,
112+
Function<T, String> nameResolver,
113+
Function<String, Class<? extends T>> classResolver)
114+
{
115+
TypeIdResolver typeResolver = new InternalTypeResolver<>(nameResolver, classResolver);
116+
117+
add**Thrift**Serializer(baseClass, new InternalType**Thrift**Serializer<>(baseClass, typeResolver));
118+
add**Thrift**Deserializer(baseClass, new InternalType**Thrift**Deserializer<>(baseClass, typeResolver));
119+
}
120+
```
121+
122+
#### Pros
123+
- Follow existing design in the code base
124+
125+
#### Cons
126+
- If we want to change to a different binary serde, we will have to redo the process.
127+
128+
129+
130+
### Option 2: Pluggable Serde for Polymorphic Types
131+
132+
```java
133+
import java.util.HashMap;
134+
135+
public class Split
136+
{
137+
private final ConnectorId connectorId;
138+
private final ConnectorSplit connectorSplit;
139+
// ... other fields and methods
140+
141+
public Split(...);
142+
}
143+
144+
// In presto-spi, an interface for split serde
145+
public interface Serializable<T>
146+
{
147+
String getType();
148+
149+
byte[] serialize(T object);
150+
151+
T deserialize(byte[] data);
152+
}
153+
154+
// A json serializer for hive split
155+
public class HiveSplitJsonSerializer
156+
implements Serializable<HiveSplit>
157+
{
158+
private final ObjectMapper mapper;
159+
160+
@Override
161+
public String getType()
162+
{
163+
return "json";
164+
}
165+
166+
@Override
167+
public byte[] serialize(HiveSplit split)
168+
{
169+
return mapper.writeValueAsBytes(new HiveSplitSerializable(split));
170+
}
171+
172+
@Override
173+
public HiveSplit deserialize(byte[] data)
174+
{
175+
HiveSplitSerializable serializable = mapper.readValue(data, HiveSplitSerializable.class);
176+
return serializable.toHiveSplit();
177+
}
178+
}
179+
180+
// A thrift serializer for hive split
181+
public class HiveSplitThriftSerializer
182+
implements Serializable<HiveSplit>
183+
{
184+
private final ThriftCodec<ThriftHiveSplit> codec;
185+
186+
@Override
187+
public String getType()
188+
{
189+
return "thrift";
190+
}
191+
192+
@Override
193+
public byte[] serialize(HiveSplit split)
194+
{
195+
ThriftHiveSplit thriftSplit = new ThriftHiveSplit();
196+
// ... populate fields ...
197+
return codec.serialize(thriftSplit);
198+
}
199+
200+
@Override
201+
public HiveSplit deserialize(byte[] data)
202+
{
203+
ThriftHiveSplit thriftSplit = codec.deserialize(data);
204+
return new HiveSplit(/* construct from thrift object */);
205+
}
206+
}
207+
208+
// A registry to hold the serde methods based on connector id and serde type
209+
public class ConnectorSplitSerializerRegistry
210+
{
211+
212+
private final Map<String, SplitSerializer> serializers = new HashMap();
213+
214+
// One custom serializer per connector type
215+
public void registerSerializer(Stirng connectorId, SplitSerializer serializer) {...}
216+
public SplitSerializer getSerializer(String connectorId) {...}
217+
}
218+
219+
// Register the correct serde method within the corresponding connector factory
220+
public class HiveConnectorFactory implements ConnectorFactory {
221+
@Override
222+
public Connector create(...) {
223+
// Register serializers
224+
serializerRegistry.registerSerializer(getName(), new HiveSplitThriftSerializer());
225+
// ... create connector ...
226+
}
227+
}
228+
229+
230+
// Use a custom codec for ConnectorSplit
231+
public class ConnectorSplitCodec implements ThriftCodec<ConnectorSplit>
232+
{
233+
private final ConnectorSplitSerializerRegistry registry;
234+
235+
@Override
236+
public void write(...);
237+
238+
@Override
239+
public ConnectorSplit read(...);
240+
241+
}
242+
243+
// With in the byte array from serialization, we will find
244+
* String connectId; // hive or other connector
245+
* private final byte[] data;
246+
247+
```
248+
249+
#### Pros
250+
- Each connector can choose its own serialization format
251+
- The internal details of how serialization is handled are hidden
252+
- Connectors can evolve their serialization format independently
253+
- New connectors can adopt newer, more efficient serialization formats without waiting for the entire system to upgrade
254+
- Existing connectors can migrate to better formats without forcing other connectors to change
255+
- Performance optimizations can be made on a per-connector basis
256+
257+
#### Cons
258+
- This design is different from the existing Jackson serde flow in the code base.
259+
260+
261+
### Q & A
262+
1. What modules are involved
263+
* presto-spi
264+
* presto-main-base
265+
* presto-main
266+
* presto-hive
267+
2. Any new terminologies/concepts/SQL language additions
268+
* N/A
269+
3. Method/class/interface contracts which you deem fit for implementation.
270+
* See above code example
271+
4. Code flow using bullet points or pseudo code as applicable
272+
* See above code example
273+
5. Any new user facing metrics that can be shown on CLI or UI.
274+
* N/A
275+
276+
## [Optional] Metrics
277+
278+
How can we measure the impact of this feature?
279+
1. taskUpdateSerializedCpuNanos
280+
2. taskUpdateDeliveredWallTimeNanos
281+
3. CPU usage for task update serde
282+
283+
## [Optional] Other Approaches Considered
284+
1. See Option 1
285+
286+
## Adoption Plan
287+
288+
### Rollout
289+
* As the first step, we will use drift to annotate all primitive types within those 3 classes mentioned before while keep complicated data types, e.g. Split, MetadataUpdate, TableWriteInfo as json
290+
* During the first step, we will add thrift support for those complicated data classes using one of the two options proposed above.
291+
292+
- What impact (if any) will there be on existing users? Are there any new session parameters, configurations, SPI updates, client API updates, or SQL grammar?
293+
* the thrift serde will be disabled by default and can be enabled by a config
294+
- If we are changing behaviour how will we phase out the older behaviour?
295+
* N/A
296+
- If we need special migration tools, describe them here.
297+
* N/A
298+
- When will we remove the existing behaviour, if applicable.
299+
* N/A
300+
- How should this feature be taught to new and existing users? Basically mention if documentation changes/new blog are needed?
301+
* This feature will be documented in the Presto documentation.
302+
- What related issues do you consider out of scope for this RFC that could be addressed in the future independently of the solution that comes out of this RFC?
303+
* N/A
304+
305+
## Test Plan
306+
307+
How do we ensure the feature works as expected? Mention if any functional tests/integration tests are needed. Special mention for product-test changes. If any PoC has been done already, please mention the relevant test results here that you think will bolster your case of getting this RFC approved.
308+
309+
- A PoC for step 1 about primitive type can be found from the following 2 PRs:
310+
* prestodb/presto#25020
311+
* prestodb/presto#25079

0 commit comments

Comments
 (0)