Skip to content

Commit abfc002

Browse files
committed
Add RFC to add thrift support for task status/update/info
1 parent 4a28092 commit abfc002

File tree

1 file changed

+312
-0
lines changed

1 file changed

+312
-0
lines changed
Lines changed: 312 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,312 @@
1+
# **RFC0 for Presto**
2+
3+
See [CONTRIBUTING.md](CONTRIBUTING.md) for instructions on creating your RFC and the process surrounding it.
4+
5+
## [Thrift Serialization for TaskStatus, TaskInfo and TaskUpdateRequest]
6+
7+
Proposers
8+
9+
* Shang Ma
10+
* Vivian Hsu
11+
12+
## [Related Issues]
13+
14+
Related issues may include Github issues, PRs or other RFCs.
15+
- prestodb/presto#25020
16+
- prestodb/presto#25079
17+
18+
## Summary
19+
20+
Support thrift serialization of TaskStatus, TaskInfo, and TaskUpdateRequest classes for getTaskStatus and createOrUpdateTask APIs for both Java and C++ worker types to reduce CPU overhead
21+
22+
## Background
23+
24+
Presto coordinator sends updates to workers and workers respond with taskInfo. Both the taskUpdateRequest and taskInfo are currently serialized using JSON, which can be CPU intensive. And in the case of high task currency, this can become a bottleneck for the coordinator which in turn becomes a bottleneck for the whole cluster.
25+
26+
27+
### [Optional] Goals
28+
1. Support thrift serde for TaskStatus, TaskInfo, and TaskRequestUpdate classes for both Java and C++ workers
29+
2. Maintain backward compatibility with existing JSON serialization
30+
3. Use drift IDL generator to produce the IDL file and use it to generate c++ classes for native workers
31+
4. Allow multiple serialization formats to coexist
32+
5. Support future serialization formats without SPI changes
33+
6. Allow gradual migration from current design to new design
34+
35+
36+
## Proposed Implementation
37+
38+
### Disclaimer: Pseudo code and will be different in real implementation.
39+
### Current Architecture for Json Serde
40+
```java
41+
42+
// Use jackson annotation
43+
public class Split {
44+
@JsonProperty
45+
private final ConnectorSplit connectorSplit;
46+
// ... other fields and methods
47+
@JsonCreator
48+
public Split(...);
49+
}
50+
```
51+
52+
#### For Polymorphic Types e.g. ConnectorSplit
53+
```java
54+
// A handle resolver to return the correct type info in runtime
55+
public HandleResolver()
56+
{
57+
handleResolvers.put(REMOTE_CONNECTOR_ID.toString(), new MaterializedHandleResolver(new RemoteHandleResolver()));
58+
handleResolvers.put("$system", new MaterializedHandleResolver(new SystemHandleResolver()));
59+
handleResolvers.put("$info_schema", new MaterializedHandleResolver(new InformationSchemaHandleResolver()));
60+
handleResolvers.put("$empty", new MaterializedHandleResolver(new EmptySplitHandleResolver()));
61+
62+
functionHandleResolvers.put("$static", new MaterializedFunctionHandleResolver(new BuiltInFunctionNamespaceHandleResolver()));
63+
functionHandleResolvers.put("$session", new MaterializedFunctionHandleResolver(new SessionFunctionHandleResolver()));
64+
}
65+
66+
// Register correct serde methods for different types
67+
protected AbstractTypedJacksonModule(
68+
Class<T> baseClass,
69+
Function<T, String> nameResolver,
70+
Function<String, Class<? extends T>> classResolver)
71+
{
72+
super(baseClass.getSimpleName() + "Module", Version.unknownVersion());
73+
74+
TypeIdResolver typeResolver = new InternalTypeResolver<>(nameResolver, classResolver);
75+
76+
addSerializer(baseClass, new InternalTypeSerializer<>(baseClass, typeResolver));
77+
addDeserializer(baseClass, new InternalTypeDeserializer<>(baseClass, typeResolver));
78+
}
79+
80+
// A class to bind the two above together
81+
public class SplitJacksonModule
82+
extends AbstractTypedJacksonModule<ConnectorSplit>
83+
{
84+
@Inject
85+
public SplitJacksonModule(HandleResolver handleResolver)
86+
{
87+
super(ConnectorSplit.class,
88+
handleResolver::getId,
89+
handleResolver::getSplitClass);
90+
}
91+
}
92+
```
93+
94+
### Option 1: Extend Current Architecture for Thrift Serde
95+
```java
96+
// Use drift annotation
97+
public class Split {
98+
@JsonProperty
99+
@ThriftField
100+
private final ConnectorSplit connectorSplit;
101+
// ... other fields and methods
102+
@JsonCreator
103+
@ThriftConstructor
104+
public Split(...);
105+
}
106+
```
107+
108+
#### For Polymorphic Types e.g. ConnectorSplit
109+
```java
110+
// Similarly, we register correct method for a give type using existing handle resolver
111+
protected AbstractTyped**Thrift**Module(
112+
Class<T> baseClass,
113+
Function<T, String> nameResolver,
114+
Function<String, Class<? extends T>> classResolver)
115+
{
116+
TypeIdResolver typeResolver = new InternalTypeResolver<>(nameResolver, classResolver);
117+
118+
add**Thrift**Serializer(baseClass, new InternalType**Thrift**Serializer<>(baseClass, typeResolver));
119+
add**Thrift**Deserializer(baseClass, new InternalType**Thrift**Deserializer<>(baseClass, typeResolver));
120+
}
121+
```
122+
123+
#### Pros
124+
- Follow existing design in the code base
125+
126+
#### Cons
127+
- If we want to change to a different binary serde, we will have to redo the process.
128+
129+
130+
131+
### Option 2: Pluggable Serde for Polymorphic Types
132+
133+
```java
134+
import java.util.HashMap;
135+
136+
public class Split
137+
{
138+
private final ConnectorId connectorId;
139+
private final ConnectorSplit connectorSplit;
140+
// ... other fields and methods
141+
142+
public Split(...);
143+
}
144+
145+
// In presto-spi, an interface for split serde
146+
public interface Serializable<T>
147+
{
148+
String getType();
149+
150+
byte[] serialize(T object);
151+
152+
T deserialize(byte[] data);
153+
}
154+
155+
// A json serializer for hive split
156+
public class HiveSplitJsonSerializer
157+
implements Serializable<HiveSplit>
158+
{
159+
private final ObjectMapper mapper;
160+
161+
@Override
162+
public String getType()
163+
{
164+
return "json";
165+
}
166+
167+
@Override
168+
public byte[] serialize(HiveSplit split)
169+
{
170+
return mapper.writeValueAsBytes(new HiveSplitSerializable(split));
171+
}
172+
173+
@Override
174+
public HiveSplit deserialize(byte[] data)
175+
{
176+
HiveSplitSerializable serializable = mapper.readValue(data, HiveSplitSerializable.class);
177+
return serializable.toHiveSplit();
178+
}
179+
}
180+
181+
// A thrift serializer for hive split
182+
public class HiveSplitThriftSerializer
183+
implements Serializable<HiveSplit>
184+
{
185+
private final ThriftCodec<ThriftHiveSplit> codec;
186+
187+
@Override
188+
public String getType()
189+
{
190+
return "thrift";
191+
}
192+
193+
@Override
194+
public byte[] serialize(HiveSplit split)
195+
{
196+
ThriftHiveSplit thriftSplit = new ThriftHiveSplit();
197+
// ... populate fields ...
198+
return codec.serialize(thriftSplit);
199+
}
200+
201+
@Override
202+
public HiveSplit deserialize(byte[] data)
203+
{
204+
ThriftHiveSplit thriftSplit = codec.deserialize(data);
205+
return new HiveSplit(/* construct from thrift object */);
206+
}
207+
}
208+
209+
// A registry to hold the serde methods based on connector id and serde type
210+
public class ConnectorSplitSerializerRegistry
211+
{
212+
213+
private final Map<String, SplitSerializer> serializers = new HashMap();
214+
215+
// One custom serializer per connector type
216+
public void registerSerializer(Stirng connectorId, SplitSerializer serializer) {...}
217+
public SplitSerializer getSerializer(String connectorId) {...}
218+
}
219+
220+
// Register the correct serde method within the corresponding connector factory
221+
public class HiveConnectorFactory implements ConnectorFactory {
222+
@Override
223+
public Connector create(...) {
224+
// Register serializers
225+
serializerRegistry.registerSerializer(getName(), new HiveSplitThriftSerializer());
226+
// ... create connector ...
227+
}
228+
}
229+
230+
231+
// Use a custom codec for ConnectorSplit
232+
public class ConnectorSplitCodec implements ThriftCodec<ConnectorSplit>
233+
{
234+
private final ConnectorSplitSerializerRegistry registry;
235+
236+
@Override
237+
public void write(...);
238+
239+
@Override
240+
public ConnectorSplit read(...);
241+
242+
}
243+
244+
// With in the byte array from serialization, we will find
245+
* String connectId; // hive or other connector
246+
* private final byte[] data;
247+
248+
```
249+
250+
#### Pros
251+
- Each connector can choose its own serialization format
252+
- The internal details of how serialization is handled are hidden
253+
- Connectors can evolve their serialization format independently
254+
- New connectors can adopt newer, more efficient serialization formats without waiting for the entire system to upgrade
255+
- Existing connectors can migrate to better formats without forcing other connectors to change
256+
- Performance optimizations can be made on a per-connector basis
257+
258+
#### Cons
259+
- This design is different from the existing Jackson serde flow in the code base.
260+
261+
262+
### Q & A
263+
1. What modules are involved
264+
* presto-spi
265+
* presto-main-base
266+
* presto-main
267+
* presto-hive
268+
2. Any new terminologies/concepts/SQL language additions
269+
* N/A
270+
3. Method/class/interface contracts which you deem fit for implementation.
271+
* See above code example
272+
4. Code flow using bullet points or pseudo code as applicable
273+
* See above code example
274+
5. Any new user facing metrics that can be shown on CLI or UI.
275+
* N/A
276+
277+
## [Optional] Metrics
278+
279+
How can we measure the impact of this feature?
280+
1. taskUpdateSerializedCpuNanos
281+
2. taskUpdateDeliveredWallTimeNanos
282+
3. CPU usage for task update serde
283+
284+
## [Optional] Other Approaches Considered
285+
1. See Option 1
286+
287+
## Adoption Plan
288+
289+
### Rollout
290+
* As the first step, we will use drift to annotate all primitive types within those 3 classes mentioned before while keep complicated data types, e.g. Split, MetadataUpdate, TableWriteInfo as json
291+
* During the first step, we will add thrift support for those complicated data classes using one of the two options proposed above.
292+
293+
- What impact (if any) will there be on existing users? Are there any new session parameters, configurations, SPI updates, client API updates, or SQL grammar?
294+
* the thrift serde will be disabled by default and can be enabled by a config
295+
- If we are changing behaviour how will we phase out the older behaviour?
296+
* N/A
297+
- If we need special migration tools, describe them here.
298+
* N/A
299+
- When will we remove the existing behaviour, if applicable.
300+
* N/A
301+
- How should this feature be taught to new and existing users? Basically mention if documentation changes/new blog are needed?
302+
* This feature will be documented in the Presto documentation.
303+
- What related issues do you consider out of scope for this RFC that could be addressed in the future independently of the solution that comes out of this RFC?
304+
* N/A
305+
306+
## Test Plan
307+
308+
How do we ensure the feature works as expected? Mention if any functional tests/integration tests are needed. Special mention for product-test changes. If any PoC has been done already, please mention the relevant test results here that you think will bolster your case of getting this RFC approved.
309+
310+
- A PoC for step 1 about primitive type can be found from the following 2 PRs:
311+
* prestodb/presto#25020
312+
* prestodb/presto#25079

0 commit comments

Comments
 (0)