Skip to content

Commit 724242f

Browse files
committed
Add RFC to add thrift support for task status/update/info
1 parent 4a28092 commit 724242f

File tree

1 file changed

+385
-0
lines changed

1 file changed

+385
-0
lines changed
Lines changed: 385 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,385 @@
1+
# **RFC0 for Presto**
2+
3+
See [CONTRIBUTING.md](CONTRIBUTING.md) for instructions on creating your RFC and the process surrounding it.
4+
5+
## [Thrift Serialization for TaskStatus, TaskInfo and TaskUpdateRequest]
6+
7+
Proposers
8+
9+
* Shang Ma
10+
* Vivian Hsu
11+
12+
## [Related Issues]
13+
14+
Related issues may include Github issues, PRs or other RFCs.
15+
- prestodb/presto#25020
16+
- prestodb/presto#25079
17+
18+
## Summary
19+
20+
Support thrift serialization of TaskStatus, TaskInfo, and TaskUpdateRequest classes for getTaskStatus and createOrUpdateTask APIs for both Java and C++ worker types to reduce CPU overhead
21+
22+
## Background
23+
24+
Presto coordinator sends updates to workers and workers respond with taskInfo. Both the taskUpdateRequest and taskInfo are currently serialized using JSON, which can be CPU intensive. And in the case of high task currency, this can become a bottleneck for the coordinator which in turn becomes a bottleneck for the whole cluster.
25+
26+
27+
### [Optional] Goals
28+
1. Support thrift serde for TaskStatus, TaskInfo, and TaskRequestUpdate classes for both Java and C++ workers
29+
2. Maintain backward compatibility with existing JSON serialization
30+
3. Use drift IDL generator to produce the IDL file and use it to generate c++ classes for native workers
31+
4. Allow multiple serialization formats to coexist
32+
5. Support future serialization formats without SPI changes
33+
6. Allow gradual migration from current design to new design
34+
35+
36+
## Proposed Implementation
37+
38+
### Disclaimer: Pseudo code and will be different in real implementation.
39+
### Current Architecture for Json Serde
40+
```java
41+
42+
// Use jackson annotation
43+
public class Split {
44+
@JsonProperty
45+
private final ConnectorSplit connectorSplit;
46+
// ... other fields and methods
47+
@JsonCreator
48+
public Split(...);
49+
}
50+
```
51+
52+
#### For Polymorphic Types e.g. ConnectorSplit
53+
```java
54+
55+
public class HandleJsonModule
56+
implements Module
57+
{
58+
@Override
59+
public void configure(Binder binder)
60+
{
61+
jsonBinder(binder).addModuleBinding().to(TableHandleJacksonModule.class);
62+
jsonBinder(binder).addModuleBinding().to(TableLayoutHandleJacksonModule.class);
63+
jsonBinder(binder).addModuleBinding().to(ColumnHandleJacksonModule.class);
64+
jsonBinder(binder).addModuleBinding().to(SplitJacksonModule.class);
65+
jsonBinder(binder).addModuleBinding().to(OutputTableHandleJacksonModule.class);
66+
jsonBinder(binder).addModuleBinding().to(InsertTableHandleJacksonModule.class);
67+
jsonBinder(binder).addModuleBinding().to(DeleteTableHandleJacksonModule.class);
68+
jsonBinder(binder).addModuleBinding().to(IndexHandleJacksonModule.class);
69+
jsonBinder(binder).addModuleBinding().to(TransactionHandleJacksonModule.class);
70+
jsonBinder(binder).addModuleBinding().to(PartitioningHandleJacksonModule.class);
71+
jsonBinder(binder).addModuleBinding().to(FunctionHandleJacksonModule.class);
72+
jsonBinder(binder).addModuleBinding().to(MetadataUpdateJacksonModule.class);
73+
74+
binder.bind(HandleResolver.class).in(Scopes.SINGLETON);
75+
}
76+
}
77+
78+
// A handle resolver to return the correct type info in runtime
79+
public HandleResolver()
80+
{
81+
handleResolvers.put(REMOTE_CONNECTOR_ID.toString(), new MaterializedHandleResolver(new RemoteHandleResolver()));
82+
handleResolvers.put("$system", new MaterializedHandleResolver(new SystemHandleResolver()));
83+
handleResolvers.put("$info_schema", new MaterializedHandleResolver(new InformationSchemaHandleResolver()));
84+
handleResolvers.put("$empty", new MaterializedHandleResolver(new EmptySplitHandleResolver()));
85+
86+
functionHandleResolvers.put("$static", new MaterializedFunctionHandleResolver(new BuiltInFunctionNamespaceHandleResolver()));
87+
functionHandleResolvers.put("$session", new MaterializedFunctionHandleResolver(new SessionFunctionHandleResolver()));
88+
}
89+
90+
// Register correct serde methods for different types
91+
protected AbstractTypedJacksonModule(
92+
Class<T> baseClass,
93+
Function<T, String> nameResolver,
94+
Function<String, Class<? extends T>> classResolver)
95+
{
96+
super(baseClass.getSimpleName() + "Module", Version.unknownVersion());
97+
98+
TypeIdResolver typeResolver = new InternalTypeResolver<>(nameResolver, classResolver);
99+
100+
addSerializer(baseClass, new InternalTypeSerializer<>(baseClass, typeResolver));
101+
addDeserializer(baseClass, new InternalTypeDeserializer<>(baseClass, typeResolver));
102+
}
103+
104+
// A class to bind the two above together
105+
public class SplitJacksonModule
106+
extends AbstractTypedJacksonModule<ConnectorSplit>
107+
{
108+
@Inject
109+
public SplitJacksonModule(HandleResolver handleResolver)
110+
{
111+
super(ConnectorSplit.class,
112+
handleResolver::getId,
113+
handleResolver::getSplitClass);
114+
}
115+
}
116+
```
117+
118+
### Option 1: Extend Current Architecture for Thrift Serde
119+
```java
120+
// Use drift annotation
121+
public class Split {
122+
@JsonProperty
123+
@ThriftField
124+
private final ConnectorSplit connectorSplit;
125+
// ... other fields and methods
126+
@JsonCreator
127+
@ThriftConstructor
128+
public Split(...);
129+
}
130+
```
131+
132+
#### For Polymorphic Types e.g. ConnectorSplit
133+
```java
134+
// Similarly, we register correct method for a give type using existing handle resolver
135+
protected AbstractTyped**Thrift**Module(
136+
Class<T> baseClass,
137+
Function<T, String> nameResolver,
138+
Function<String, Class<? extends T>> classResolver)
139+
{
140+
TypeIdResolver typeResolver = new InternalTypeResolver<>(nameResolver, classResolver);
141+
142+
add**Thrift**Serializer(baseClass, new InternalType**Thrift**Serializer<>(baseClass, typeResolver));
143+
add**Thrift**Deserializer(baseClass, new InternalType**Thrift**Deserializer<>(baseClass, typeResolver));
144+
}
145+
```
146+
147+
#### Pros
148+
- Follow existing design in the code base
149+
150+
#### Cons
151+
- If we want to change to a different binary serde, we will have to redo the process.
152+
153+
154+
155+
### Option 2: Pluggable Serde for Polymorphic Types
156+
157+
```java
158+
import java.util.HashMap;
159+
160+
public class Split
161+
{
162+
private final ConnectorId connectorId;
163+
private final ConnectorSplit connectorSplit;
164+
// ... other fields and methods
165+
166+
public Split(...);
167+
}
168+
169+
// In presto-spi, an interface for split serde
170+
public interface Serializer<T>
171+
{
172+
String getType();
173+
174+
byte[] serialize(T object);
175+
176+
T deserialize(byte[] data);
177+
}
178+
179+
// A json serializer for hive split
180+
public class HiveSplitJsonSerializer
181+
implements Serializer<HiveSplit>
182+
{
183+
private final ObjectMapper mapper;
184+
185+
@Override
186+
public String getType()
187+
{
188+
return "json";
189+
}
190+
191+
@Override
192+
public byte[] serialize(HiveSplit split)
193+
{
194+
return mapper.writeValueAsBytes(new HiveSplitSerializable(split));
195+
}
196+
197+
@Override
198+
public HiveSplit deserialize(byte[] data)
199+
{
200+
HiveSplitSerializable serializable = mapper.readValue(data, HiveSplitSerializable.class);
201+
return serializable.toHiveSplit();
202+
}
203+
}
204+
205+
// A thrift serializer for hive split
206+
public class HiveSplitThriftSerializer
207+
implements Serializer<HiveSplit>
208+
{
209+
private final ThriftCodec<ThriftHiveSplit> codec;
210+
211+
@Override
212+
public String getType()
213+
{
214+
return "thrift";
215+
}
216+
217+
@Override
218+
public byte[] serialize(HiveSplit split)
219+
{
220+
ThriftHiveSplit thriftSplit = new ThriftHiveSplit();
221+
// ... populate fields ...
222+
return codec.serialize(thriftSplit);
223+
}
224+
225+
@Override
226+
public HiveSplit deserialize(byte[] data)
227+
{
228+
ThriftHiveSplit thriftSplit = codec.deserialize(data);
229+
return new HiveSplit(/* construct from thrift object */);
230+
}
231+
}
232+
233+
public class ConnectorManager
234+
{
235+
private synchronized void addConnectorInternal(MaterializedConnector connector)
236+
{
237+
// existing code
238+
// ...
239+
// ...
240+
connector.getSplitSeralizerProvider()
241+
.ifPresent(
242+
connectorTypeSerdeProvider ->
243+
connectorTypeSerdeManager.registerSerializer(connectorId, splitSeralizerProvider));
244+
}
245+
}
246+
247+
// Act as registry to hold the serde methods based on connector id and serde type
248+
public class ConnectorTypeSerdeManager
249+
{
250+
251+
private final Map<String, Serializer> serializers = new HashMap();
252+
253+
// Add custom serializer for a given connector type
254+
public void registerSerializer(Stirng connectorId, SerializerProvider serializerProvider) {...}
255+
public Serializer getSerializer(String connectorId) {...}
256+
}
257+
258+
// Register the correct serde method within the corresponding connector factory
259+
public class HiveMetadata implements TransactionalMetadata {
260+
261+
private final Serializer splitSerializer;
262+
263+
public HiveMetadata(...)
264+
{
265+
this.splitSerializer = new HiveSplitThriftSerializer();
266+
}
267+
268+
public Serializer getSplitSerializer()
269+
{
270+
return this.splitSerializer;
271+
}
272+
}
273+
274+
275+
// Use a custom codec for ConnectorSplit
276+
public class SplitCodec implements ThriftCodec<Split>
277+
{
278+
private final ConnectorTypeSerdeManager serdeManager;
279+
280+
@Override
281+
public void write(...)
282+
{
283+
TMemoryBuffer transport = new TMemoryBuffer(1024);
284+
TProtocolWriter writer = new TBinaryProtocol(transport);
285+
286+
// write the connector id/type
287+
writer.writeStructBegin(new TStruct("Split"));
288+
writer.writeFieldBegin(new TField("connectorId", TType.String, (short) 2));
289+
writer.writeString("hive");
290+
writer.writeFieldEnd();
291+
292+
// write the real data with pseudo code
293+
writer.writeBinary(ByteBuffer.wrap(serdeManager.getSerializer(HiveTransactionHandle.class).serialize(aHiveSplitObject)));
294+
writer.writeBinary(ByteBuffer.wrap(serdeManager.getSerializer(HiveSplit.class).serialize(aHiveSplitObject)));
295+
writer.writeBinary(aLifespan);
296+
writer.writeBinary(aSplitContext);
297+
writer.writeStructEnd();
298+
}
299+
300+
@Override
301+
public Split read(...)
302+
{
303+
// first, we read the connector id to know what type of split we are dealing with
304+
reader.read(connectorId);
305+
306+
// say, it is a hive split, then
307+
reader.read(aHiveTransactionHandle);
308+
reader.read(aHiveSplit);
309+
310+
// lastly, we read the rest
311+
reader.read(aLifespan);
312+
reader.read(aSplitContext);
313+
}
314+
315+
}
316+
317+
// Conceptually, within the byte array from serialization, we will find
318+
* String connectId; // hive or other connector
319+
* private final byte[] data; // the real data of a serialized split
320+
321+
```
322+
323+
#### Pros
324+
- Each connector can choose its own serialization format
325+
- The internal details of how serialization is handled are hidden
326+
- Connectors can evolve their serialization format independently
327+
- New connectors can adopt newer, more efficient serialization formats without waiting for the entire system to upgrade
328+
- Existing connectors can migrate to better formats without forcing other connectors to change
329+
- Performance optimizations can be made on a per-connector basis
330+
331+
#### Cons
332+
- This design is different from the existing Jackson serde flow in the code base.
333+
334+
335+
### Q & A
336+
1. What modules are involved
337+
* presto-spi
338+
* presto-main-base
339+
* presto-main
340+
* presto-hive
341+
2. Any new terminologies/concepts/SQL language additions
342+
* N/A
343+
3. Method/class/interface contracts which you deem fit for implementation.
344+
* See above code example
345+
4. Code flow using bullet points or pseudo code as applicable
346+
* See above code example
347+
5. Any new user facing metrics that can be shown on CLI or UI.
348+
* N/A
349+
350+
## [Optional] Metrics
351+
352+
How can we measure the impact of this feature?
353+
1. taskUpdateSerializedCpuNanos
354+
2. taskUpdateDeliveredWallTimeNanos
355+
3. CPU usage for task update serde
356+
357+
## [Optional] Other Approaches Considered
358+
1. See Option 1
359+
360+
## Adoption Plan
361+
362+
### Rollout
363+
* As the first step, we will use drift to annotate all primitive types within those 3 classes mentioned before while keep complicated data types, e.g. Split, MetadataUpdate, TableWriteInfo as json
364+
* During the second step, we will add thrift support for those complicated data classes using one of the two options proposed above.
365+
366+
- What impact (if any) will there be on existing users? Are there any new session parameters, configurations, SPI updates, client API updates, or SQL grammar?
367+
* the thrift serde will be disabled by default and can be enabled by a config
368+
- If we are changing behaviour how will we phase out the older behaviour?
369+
* N/A
370+
- If we need special migration tools, describe them here.
371+
* N/A
372+
- When will we remove the existing behaviour, if applicable.
373+
* N/A
374+
- How should this feature be taught to new and existing users? Basically mention if documentation changes/new blog are needed?
375+
* This feature will be documented in the Presto documentation.
376+
- What related issues do you consider out of scope for this RFC that could be addressed in the future independently of the solution that comes out of this RFC?
377+
* N/A
378+
379+
## Test Plan
380+
381+
How do we ensure the feature works as expected? Mention if any functional tests/integration tests are needed. Special mention for product-test changes. If any PoC has been done already, please mention the relevant test results here that you think will bolster your case of getting this RFC approved.
382+
383+
- A PoC for step 1 about primitive type can be found from the following 2 PRs:
384+
* prestodb/presto#25020
385+
* prestodb/presto#25079

0 commit comments

Comments
 (0)