Skip to content

Commit 2fb38be

Browse files
committed
Add RFC to add thrift support for task status/update/info
1 parent 4a28092 commit 2fb38be

File tree

1 file changed

+383
-0
lines changed

1 file changed

+383
-0
lines changed
Lines changed: 383 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,383 @@
1+
# **RFC0 for Presto**
2+
3+
See [CONTRIBUTING.md](CONTRIBUTING.md) for instructions on creating your RFC and the process surrounding it.
4+
5+
## [Thrift Serialization for TaskStatus, TaskInfo and TaskUpdateRequest]
6+
7+
Proposers
8+
9+
* Shang Ma
10+
* Vivian Hsu
11+
12+
## [Related Issues]
13+
14+
Related issues may include Github issues, PRs or other RFCs.
15+
- prestodb/presto#25020
16+
- prestodb/presto#25079
17+
18+
## Summary
19+
20+
Support thrift serialization of TaskStatus, TaskInfo, and TaskUpdateRequest classes for getTaskStatus and createOrUpdateTask APIs for both Java and C++ worker types to reduce CPU overhead
21+
22+
## Background
23+
24+
Presto coordinator sends updates to workers and workers respond with taskInfo. Both the taskUpdateRequest and taskInfo are currently serialized using JSON, which can be CPU intensive. And in the case of high task currency, this can become a bottleneck for the coordinator which in turn becomes a bottleneck for the whole cluster.
25+
26+
27+
### Goals
28+
1. Support thrift serde for TaskStatus, TaskInfo, and TaskRequestUpdate classes for both Java and C++ workers
29+
2. Maintain backward compatibility with existing JSON serialization
30+
3. Use drift IDL generator to produce the IDL file and use it to generate c++ classes for native workers
31+
* The final IDL for all thrift structs needed for taskStatus, taskInfo, and taskUpdateRequest can be automatically generated by building the "presto-thrift-spec" module in presto repo. This module will also be automatically module while building presto.
32+
* For cpp worker, there will be an extra step to generate some utility codes by using presto-native-execution/presto_cpp/main/thrift/Makefile and run a `make` command
33+
4. Allow multiple serialization formats to coexist
34+
5. Support future serialization formats without SPI changes
35+
6. Allow gradual migration from current design to new design
36+
37+
38+
## Proposed Implementation
39+
40+
### Disclaimer: Pseudo code and will be different in real implementation.
41+
### Current Architecture for Json Serde
42+
```java
43+
44+
// Use jackson annotation
45+
public class Split {
46+
@JsonProperty
47+
private final ConnectorSplit connectorSplit;
48+
// ... other fields and methods
49+
@JsonCreator
50+
public Split(...);
51+
}
52+
```
53+
54+
#### For Polymorphic Types e.g. ConnectorSplit
55+
```java
56+
57+
public class HandleJsonModule
58+
implements Module
59+
{
60+
@Override
61+
public void configure(Binder binder)
62+
{
63+
jsonBinder(binder).addModuleBinding().to(TableHandleJacksonModule.class);
64+
jsonBinder(binder).addModuleBinding().to(TableLayoutHandleJacksonModule.class);
65+
jsonBinder(binder).addModuleBinding().to(ColumnHandleJacksonModule.class);
66+
jsonBinder(binder).addModuleBinding().to(SplitJacksonModule.class);
67+
jsonBinder(binder).addModuleBinding().to(OutputTableHandleJacksonModule.class);
68+
jsonBinder(binder).addModuleBinding().to(InsertTableHandleJacksonModule.class);
69+
jsonBinder(binder).addModuleBinding().to(DeleteTableHandleJacksonModule.class);
70+
jsonBinder(binder).addModuleBinding().to(IndexHandleJacksonModule.class);
71+
jsonBinder(binder).addModuleBinding().to(TransactionHandleJacksonModule.class);
72+
jsonBinder(binder).addModuleBinding().to(PartitioningHandleJacksonModule.class);
73+
jsonBinder(binder).addModuleBinding().to(FunctionHandleJacksonModule.class);
74+
jsonBinder(binder).addModuleBinding().to(MetadataUpdateJacksonModule.class);
75+
76+
binder.bind(HandleResolver.class).in(Scopes.SINGLETON);
77+
}
78+
}
79+
80+
// A handle resolver to return the correct type info in runtime
81+
public HandleResolver()
82+
{
83+
handleResolvers.put(REMOTE_CONNECTOR_ID.toString(), new MaterializedHandleResolver(new RemoteHandleResolver()));
84+
handleResolvers.put("$system", new MaterializedHandleResolver(new SystemHandleResolver()));
85+
handleResolvers.put("$info_schema", new MaterializedHandleResolver(new InformationSchemaHandleResolver()));
86+
handleResolvers.put("$empty", new MaterializedHandleResolver(new EmptySplitHandleResolver()));
87+
88+
functionHandleResolvers.put("$static", new MaterializedFunctionHandleResolver(new BuiltInFunctionNamespaceHandleResolver()));
89+
functionHandleResolvers.put("$session", new MaterializedFunctionHandleResolver(new SessionFunctionHandleResolver()));
90+
}
91+
92+
// Register correct serde methods for different types
93+
protected AbstractTypedJacksonModule(
94+
Class<T> baseClass,
95+
Function<T, String> nameResolver,
96+
Function<String, Class<? extends T>> classResolver)
97+
{
98+
super(baseClass.getSimpleName() + "Module", Version.unknownVersion());
99+
100+
TypeIdResolver typeResolver = new InternalTypeResolver<>(nameResolver, classResolver);
101+
102+
addSerializer(baseClass, new InternalTypeSerializer<>(baseClass, typeResolver));
103+
addDeserializer(baseClass, new InternalTypeDeserializer<>(baseClass, typeResolver));
104+
}
105+
106+
// type information
107+
private static final String TYPE_PROPERTY = "@type";
108+
109+
public InternalTypeSerializer(Class<T> baseClass, TypeIdResolver typeIdResolver)
110+
{
111+
super(baseClass);
112+
this.typeSerializer = new AsPropertyTypeSerializer(typeIdResolver, null, TYPE_PROPERTY);
113+
}
114+
115+
public InternalTypeDeserializer(Class<T> baseClass, TypeIdResolver typeIdResolver)
116+
{
117+
super(baseClass);
118+
this.typeDeserializer = new AsPropertyTypeDeserializer(
119+
TypeFactory.defaultInstance().constructType(baseClass),
120+
typeIdResolver,
121+
TYPE_PROPERTY,
122+
false,
123+
null);
124+
}
125+
126+
```
127+
128+
### Option 1: Extend Current Architecture for Thrift Serde
129+
```java
130+
// Use drift annotation
131+
public class Split {
132+
@JsonProperty
133+
@ThriftField
134+
private final ConnectorSplit connectorSplit;
135+
// ... other fields and methods
136+
@JsonCreator
137+
@ThriftConstructor
138+
public Split(...);
139+
}
140+
```
141+
142+
#### For Polymorphic Types e.g. ConnectorSplit
143+
```java
144+
// Similarly, we register correct method for a give type using existing handle resolver
145+
protected AbstractTyped**Thrift**Module(
146+
Class<T> baseClass,
147+
Function<T, String> nameResolver,
148+
Function<String, Class<? extends T>> classResolver)
149+
{
150+
TypeIdResolver typeResolver = new InternalTypeResolver<>(nameResolver, classResolver);
151+
152+
add**Thrift**Serializer(baseClass, new InternalType**Thrift**Serializer<>(baseClass, typeResolver));
153+
add**Thrift**Deserializer(baseClass, new InternalType**Thrift**Deserializer<>(baseClass, typeResolver));
154+
}
155+
```
156+
157+
#### Pros
158+
- Follow existing design in the code base
159+
160+
#### Cons
161+
- If we want to change to a different binary serde, we will have to redo the process.
162+
163+
164+
165+
### Option 2 (preferred): Pluggable Serde for Polymorphic Types
166+
167+
```java
168+
import java.util.HashMap;
169+
170+
public class Split
171+
{
172+
private final ConnectorId connectorId;
173+
private final ConnectorSplit connectorSplit;
174+
// ... other fields and methods
175+
176+
public Split(...);
177+
}
178+
179+
// In presto-spi, an interface for split serde
180+
public interface Serializer<T>
181+
{
182+
String getType();
183+
184+
byte[] serialize(T object);
185+
186+
T deserialize(byte[] data);
187+
}
188+
189+
// A json serializer for hive split
190+
public class HiveSplitJsonSerializer
191+
implements Serializer<HiveSplit>
192+
{
193+
private final ObjectMapper mapper;
194+
195+
@Override
196+
public String getType()
197+
{
198+
return "json";
199+
}
200+
201+
@Override
202+
public byte[] serialize(HiveSplit split)
203+
{
204+
return mapper.writeValueAsBytes(new HiveSplitSerializable(split));
205+
}
206+
207+
@Override
208+
public HiveSplit deserialize(byte[] data)
209+
{
210+
HiveSplitSerializable serializable = mapper.readValue(data, HiveSplitSerializable.class);
211+
return serializable.toHiveSplit();
212+
}
213+
}
214+
215+
// A thrift serializer for hive split
216+
public class HiveSplitThriftSerializer
217+
implements Serializer<HiveSplit>
218+
{
219+
private final ThriftCodec<ThriftHiveSplit> codec;
220+
221+
@Override
222+
public String getType()
223+
{
224+
return "thrift";
225+
}
226+
227+
@Override
228+
public byte[] serialize(HiveSplit split)
229+
{
230+
ThriftHiveSplit thriftSplit = new ThriftHiveSplit();
231+
// ... populate fields ...
232+
return codec.serialize(thriftSplit);
233+
}
234+
235+
@Override
236+
public HiveSplit deserialize(byte[] data)
237+
{
238+
ThriftHiveSplit thriftSplit = codec.deserialize(data);
239+
return new HiveSplit(/* construct from thrift object */);
240+
}
241+
}
242+
243+
public class ConnectorManager
244+
{
245+
private synchronized void addConnectorInternal(MaterializedConnector connector)
246+
{
247+
// existing code
248+
// ...
249+
// ...
250+
connector.getSplitSeralizerProvider()
251+
.ifPresent(
252+
connectorTypeSerdeProvider ->
253+
connectorTypeSerdeManager.registerSerializer(connectorId, splitSeralizerProvider));
254+
}
255+
}
256+
257+
// Act as registry to hold the serde methods based on connector id and serde type
258+
public class ConnectorTypeSerdeManager
259+
{
260+
261+
private final Map<String, Serializer> serializers = new HashMap();
262+
263+
// Add custom serializer for a given connector type
264+
public void registerSerializer(Stirng connectorId, SerializerProvider serializerProvider) {...}
265+
public Serializer getSerializer(String connectorId) {...}
266+
}
267+
268+
269+
270+
// Use a custom codec for ConnectorSplit
271+
public class SplitCodec implements ThriftCodec<Split>
272+
{
273+
private final ConnectorTypeSerdeManager serdeManager;
274+
275+
@Override
276+
public void write(...)
277+
{
278+
TMemoryBuffer transport = new TMemoryBuffer(1024);
279+
TProtocolWriter writer = new TBinaryProtocol(transport);
280+
281+
// write the connector id/type
282+
writer.writeStructBegin(new TStruct("Split"));
283+
writer.writeFieldBegin(new TField("connectorId", TType.String, (short) 2));
284+
writer.writeString("hive");
285+
writer.writeFieldEnd();
286+
287+
// write the real data with pseudo code
288+
writer.writeBinary(ByteBuffer.wrap(serdeManager.getSerializer(HiveTransactionHandle.class).serialize(aHiveSplitObject)));
289+
writer.writeBinary(ByteBuffer.wrap(serdeManager.getSerializer(HiveSplit.class).serialize(aHiveSplitObject)));
290+
writer.writeBinary(aLifespan);
291+
writer.writeBinary(aSplitContext);
292+
writer.writeStructEnd();
293+
}
294+
295+
@Override
296+
public Split read(...)
297+
{
298+
transport = TTransport.TMemoryBuffer(serialized_data);
299+
protocol = TBinaryProtocol.TBinaryProtocol(transport);
300+
// first, we read the connector id to know what type of split we are dealing with
301+
aConnectorId.readFirstField();
302+
303+
304+
// Given the aConnectorId, we know which connector we are dealing with
305+
// say, it is a hive split, then
306+
serdeManager.getSerializer(HiveTransactionHandle.class).deserialize(reader.readNextField);
307+
serdeManager.getSerializer(HiveSplit.class).deserialize(reader.readNextField);
308+
309+
aLifespan.readNextField();
310+
aSplitContext.readNextField();
311+
}
312+
313+
}
314+
315+
// Conceptually, within the byte array from serialization, we will find
316+
* String connectId; // hive or other connector
317+
* private final byte[] data; // the real data of a serialized split
318+
319+
```
320+
321+
#### Pros
322+
- Each connector can choose its own serialization format
323+
- The internal details of how serialization is handled are hidden
324+
- Connectors can evolve their serialization format independently
325+
- New connectors can adopt newer, more efficient serialization formats without waiting for the entire system to upgrade
326+
- Existing connectors can migrate to better formats without forcing other connectors to change
327+
- Performance optimizations can be made on a per-connector basis
328+
329+
#### Cons
330+
- This design is different from the existing Jackson serde flow in the code base.
331+
332+
333+
### Q & A
334+
1. What modules are involved
335+
* presto-spi
336+
* presto-main-base
337+
* presto-main
338+
* presto-hive
339+
2. Any new terminologies/concepts/SQL language additions
340+
* N/A
341+
3. Method/class/interface contracts which you deem fit for implementation.
342+
* See above code example
343+
4. Code flow using bullet points or pseudo code as applicable
344+
* See above code example
345+
5. Any new user facing metrics that can be shown on CLI or UI.
346+
* N/A
347+
348+
## [Optional] Metrics
349+
350+
How can we measure the impact of this feature?
351+
1. taskUpdateSerializedCpuNanos
352+
2. taskUpdateDeliveredWallTimeNanos
353+
3. CPU usage for task update serde
354+
355+
## [Optional] Other Approaches Considered
356+
1. See Option 1
357+
358+
## Adoption Plan
359+
360+
### Rollout
361+
* As the first step, we will use drift to annotate all primitive types within those 3 classes mentioned before while keep complicated data types, e.g. Split, MetadataUpdate, TableWriteInfo as json
362+
* During the second step, we will add thrift support for those complicated data classes using one of the two options proposed above.
363+
364+
- What impact (if any) will there be on existing users? Are there any new session parameters, configurations, SPI updates, client API updates, or SQL grammar?
365+
* the thrift serde will be disabled by default and can be enabled by a config
366+
- If we are changing behaviour how will we phase out the older behaviour?
367+
* We will NOT maintain two serde scheme at the same time. Basically, if the feature toggle for thrift serde is enabled, only thrift serde will be available for APIs including getTaskStatus and createOrUpdateTask. And if the thrift is not working, we should let the system fail so that the developer can get a clean signal. But the json serde will still be working if the feature toggle is disabled.
368+
- If we need special migration tools, describe them here.
369+
* N/A
370+
- When will we remove the existing behaviour, if applicable.
371+
* N/A
372+
- How should this feature be taught to new and existing users? Basically mention if documentation changes/new blog are needed?
373+
* This feature will be documented in the Presto documentation.
374+
- What related issues do you consider out of scope for this RFC that could be addressed in the future independently of the solution that comes out of this RFC?
375+
* N/A
376+
377+
## Test Plan
378+
379+
How do we ensure the feature works as expected? Mention if any functional tests/integration tests are needed. Special mention for product-test changes. If any PoC has been done already, please mention the relevant test results here that you think will bolster your case of getting this RFC approved.
380+
381+
- A PoC for step 1 about primitive type can be found from the following 2 PRs:
382+
* prestodb/presto#25020
383+
* prestodb/presto#25079

0 commit comments

Comments
 (0)