|
| 1 | +# **RFC0 for Presto** |
| 2 | + |
| 3 | +See [CONTRIBUTING.md](CONTRIBUTING.md) for instructions on creating your RFC and the process surrounding it. |
| 4 | + |
| 5 | +## [Thrift Serialization for TaskStatus, TaskInfo and TaskUpdateRequest] |
| 6 | + |
| 7 | +Proposers |
| 8 | + |
| 9 | +* Shang Ma |
| 10 | +* Vivian Hsu |
| 11 | + |
| 12 | +## [Related Issues] |
| 13 | + |
| 14 | +Related issues may include Github issues, PRs or other RFCs. |
| 15 | +- prestodb/presto#25020 |
| 16 | +- prestodb/presto#25079 |
| 17 | + |
| 18 | +## Summary |
| 19 | + |
| 20 | +Support thrift serialization of TaskStatus, TaskInfo, and TaskUpdateRequest classes for getTaskStatus and createOrUpdateTask APIs for both Java and C++ worker types to reduce CPU overhead |
| 21 | + |
| 22 | +## Background |
| 23 | + |
| 24 | +Presto coordinator sends updates to workers and workers respond with taskInfo. Both the taskUpdateRequest and taskInfo are currently serialized using JSON, which can be CPU intensive. And in the case of high task currency, this can become a bottleneck for the coordinator which in turn becomes a bottleneck for the whole cluster. |
| 25 | + |
| 26 | + |
| 27 | +### [Optional] Goals |
| 28 | +1. Support thrift serde for TaskStatus, TaskInfo, and TaskRequestUpdate classes for both Java and C++ workers |
| 29 | +2. Maintain backward compatibility with existing JSON serialization |
| 30 | +3. Use drift IDL generator to produce the IDL file and use it to generate c++ classes for native workers |
| 31 | +4. Allow multiple serialization formats to coexist |
| 32 | +5. Support future serialization formats without SPI changes |
| 33 | +6. Allow gradual migration from current design to new design |
| 34 | + |
| 35 | + |
| 36 | +## Proposed Implementation |
| 37 | + |
| 38 | +### Current Architecture for Json Serde |
| 39 | +```java |
| 40 | + |
| 41 | +// Use jackson annotation |
| 42 | +public class Split { |
| 43 | + @JsonProperty |
| 44 | + private final ConnectorSplit connectorSplit; |
| 45 | + // ... other fields and methods |
| 46 | + @JsonCreator |
| 47 | + public Split(...); |
| 48 | +} |
| 49 | +``` |
| 50 | + |
| 51 | +#### For Polymorphic Types e.g. ConnectorSplit |
| 52 | +```java |
| 53 | +// A handle resolver to return the correct type info in runtime |
| 54 | +public HandleResolver() |
| 55 | +{ |
| 56 | + handleResolvers.put(REMOTE_CONNECTOR_ID.toString(), new MaterializedHandleResolver(new RemoteHandleResolver())); |
| 57 | + handleResolvers.put("$system", new MaterializedHandleResolver(new SystemHandleResolver())); |
| 58 | + handleResolvers.put("$info_schema", new MaterializedHandleResolver(new InformationSchemaHandleResolver())); |
| 59 | + handleResolvers.put("$empty", new MaterializedHandleResolver(new EmptySplitHandleResolver())); |
| 60 | + |
| 61 | + functionHandleResolvers.put("$static", new MaterializedFunctionHandleResolver(new BuiltInFunctionNamespaceHandleResolver())); |
| 62 | + functionHandleResolvers.put("$session", new MaterializedFunctionHandleResolver(new SessionFunctionHandleResolver())); |
| 63 | +} |
| 64 | + |
| 65 | +// Register correct serde methods for different types |
| 66 | +protected AbstractTypedJacksonModule( |
| 67 | + Class<T> baseClass, |
| 68 | + Function<T, String> nameResolver, |
| 69 | + Function<String, Class<? extends T>> classResolver) |
| 70 | +{ |
| 71 | + super(baseClass.getSimpleName() + "Module", Version.unknownVersion()); |
| 72 | + |
| 73 | + TypeIdResolver typeResolver = new InternalTypeResolver<>(nameResolver, classResolver); |
| 74 | + |
| 75 | + addSerializer(baseClass, new InternalTypeSerializer<>(baseClass, typeResolver)); |
| 76 | + addDeserializer(baseClass, new InternalTypeDeserializer<>(baseClass, typeResolver)); |
| 77 | +} |
| 78 | + |
| 79 | +// A class to bind the two above together |
| 80 | +public class SplitJacksonModule |
| 81 | + extends AbstractTypedJacksonModule<ConnectorSplit> |
| 82 | +{ |
| 83 | + @Inject |
| 84 | + public SplitJacksonModule(HandleResolver handleResolver) |
| 85 | + { |
| 86 | + super(ConnectorSplit.class, |
| 87 | + handleResolver::getId, |
| 88 | + handleResolver::getSplitClass); |
| 89 | + } |
| 90 | +} |
| 91 | +``` |
| 92 | + |
| 93 | +### Option 1: Extend Current Architecture for Thrift Serde |
| 94 | +```java |
| 95 | +// Use drift annotation |
| 96 | +public class Split { |
| 97 | + @JsonProperty |
| 98 | + @ThriftField |
| 99 | + private final ConnectorSplit connectorSplit; |
| 100 | + // ... other fields and methods |
| 101 | + @JsonCreator |
| 102 | + @ThriftConstructor |
| 103 | + public Split(...); |
| 104 | +} |
| 105 | +``` |
| 106 | + |
| 107 | +#### For Polymorphic Types e.g. ConnectorSplit |
| 108 | +```java |
| 109 | +// Similarly, we register correct method for a give type using existing handle resolver |
| 110 | +protected AbstractTyped**Thrift**Module( |
| 111 | + Class<T> baseClass, |
| 112 | + Function<T, String> nameResolver, |
| 113 | + Function<String, Class<? extends T>> classResolver) |
| 114 | +{ |
| 115 | + TypeIdResolver typeResolver = new InternalTypeResolver<>(nameResolver, classResolver); |
| 116 | + |
| 117 | + add**Thrift**Serializer(baseClass, new InternalType**Thrift**Serializer<>(baseClass, typeResolver)); |
| 118 | + add**Thrift**Deserializer(baseClass, new InternalType**Thrift**Deserializer<>(baseClass, typeResolver)); |
| 119 | +} |
| 120 | +``` |
| 121 | + |
| 122 | +#### Pros |
| 123 | +- Follow existing design in the code base |
| 124 | + |
| 125 | +#### Cons |
| 126 | +- If we want to change to a different binary serde, we will have to redo the process. |
| 127 | + |
| 128 | + |
| 129 | + |
| 130 | +### Option 2: Pluggable Serde for Polymorphic Types |
| 131 | + |
| 132 | +```java |
| 133 | +import java.util.HashMap; |
| 134 | + |
| 135 | +public class Split |
| 136 | +{ |
| 137 | + private final ConnectorId connectorId; |
| 138 | + private final ConnectorSplit connectorSplit; |
| 139 | + // ... other fields and methods |
| 140 | + |
| 141 | + public Split(...); |
| 142 | +} |
| 143 | + |
| 144 | +// In presto-spi, an interface for split serde |
| 145 | +public interface ConnectorSplitSerializer<T extends ConnectorSplit> |
| 146 | +{ |
| 147 | + String getType(); |
| 148 | + |
| 149 | + byte[] serialize(T split); |
| 150 | + |
| 151 | + T deserialize(byte[] data); |
| 152 | +} |
| 153 | + |
| 154 | +// A json serializer for hive split |
| 155 | +public class HiveSplitJsonSerializer |
| 156 | + implements ConnectorSplitSerializer<HiveSplit> |
| 157 | +{ |
| 158 | + private final ObjectMapper mapper; |
| 159 | + |
| 160 | + @Override |
| 161 | + public String getType() |
| 162 | + { |
| 163 | + return "json"; |
| 164 | + } |
| 165 | + |
| 166 | + @Override |
| 167 | + public byte[] serialize(HiveSplit split) |
| 168 | + { |
| 169 | + return mapper.writeValueAsBytes(new HiveSplitSerializable(split)); |
| 170 | + } |
| 171 | + |
| 172 | + @Override |
| 173 | + public HiveSplit deserialize(byte[] data) |
| 174 | + { |
| 175 | + HiveSplitSerializable serializable = mapper.readValue(data, HiveSplitSerializable.class); |
| 176 | + return serializable.toHiveSplit(); |
| 177 | + } |
| 178 | +} |
| 179 | + |
| 180 | +// A thrift serializer for hive split |
| 181 | +public class HiveSplitThriftSerializer |
| 182 | + implements ConnectorSplitSerializer<HiveSplit> |
| 183 | +{ |
| 184 | + private final ThriftCodec<ThriftHiveSplit> codec; |
| 185 | + |
| 186 | + @Override |
| 187 | + public String getType() |
| 188 | + { |
| 189 | + return "thrift"; |
| 190 | + } |
| 191 | + |
| 192 | + @Override |
| 193 | + public byte[] serialize(HiveSplit split) |
| 194 | + { |
| 195 | + ThriftHiveSplit thriftSplit = new ThriftHiveSplit(); |
| 196 | + // ... populate fields ... |
| 197 | + return codec.serialize(thriftSplit); |
| 198 | + } |
| 199 | + |
| 200 | + @Override |
| 201 | + public HiveSplit deserialize(byte[] data) |
| 202 | + { |
| 203 | + ThriftHiveSplit thriftSplit = codec.deserialize(data); |
| 204 | + return new HiveSplit(/* construct from thrift object */); |
| 205 | + } |
| 206 | +} |
| 207 | + |
| 208 | +// A registry to hold the serde methods based on connector id and serde type |
| 209 | +public class ConnectorSplitSerializerRegistry |
| 210 | +{ |
| 211 | + |
| 212 | + private final Map<String, SplitSerializer> serializers = new HashMap(); |
| 213 | + |
| 214 | + // One custom serializer per connector type |
| 215 | + public void registerSerializer(Stirng connectorId, SplitSerializer serializer) {...} |
| 216 | + public SplitSerializer getSerializer(String connectorId) {...} |
| 217 | +} |
| 218 | + |
| 219 | +// Register the correct serde method within the corresponding connector factory |
| 220 | +public class HiveConnectorFactory implements ConnectorFactory { |
| 221 | + @Override |
| 222 | + public Connector create(...) { |
| 223 | + // Register serializers |
| 224 | + serializerRegistry.registerSerializer("hive", new HiveSplitThriftSerializer()); |
| 225 | + // ... create connector ... |
| 226 | + } |
| 227 | +} |
| 228 | + |
| 229 | + |
| 230 | +// Use a custom codec for ConnectorSplit |
| 231 | +public class ConnectorSplitCodec implements ThriftCodec<ConnectorSplit> |
| 232 | +{ |
| 233 | + private final ConnectorSplitSerializerRegistry registry; |
| 234 | + |
| 235 | + @Override |
| 236 | + public void write(...); |
| 237 | + |
| 238 | + @Override |
| 239 | + public ConnectorSplit read(...); |
| 240 | + |
| 241 | +} |
| 242 | + |
| 243 | +// With in the byte array from serialization, we will find |
| 244 | +* String connectId; // hive or other connector |
| 245 | +* private final String type; // Optional, can be json, or thrift |
| 246 | +* private final byte[] data; |
| 247 | + |
| 248 | +``` |
| 249 | + |
| 250 | +#### Pros |
| 251 | +- We are free to use a different binary format for serde as long as we have the right information in the resulting byte array |
| 252 | + |
| 253 | +#### Cons |
| 254 | +- This design is different from the existing Jackson serde flow in the code base. |
| 255 | + |
| 256 | + |
| 257 | +### Q & A |
| 258 | +1. What modules are involved |
| 259 | + * presto-spi |
| 260 | + * presto-main-base |
| 261 | + * presto-main |
| 262 | + * presto-hive |
| 263 | +2. Any new terminologies/concepts/SQL language additions |
| 264 | + * N/A |
| 265 | +3. Method/class/interface contracts which you deem fit for implementation. |
| 266 | + * See above code example |
| 267 | +4. Code flow using bullet points or pseudo code as applicable |
| 268 | + * See above code example |
| 269 | +5. Any new user facing metrics that can be shown on CLI or UI. |
| 270 | + * N/A |
| 271 | + |
| 272 | +## [Optional] Metrics |
| 273 | + |
| 274 | +How can we measure the impact of this feature? |
| 275 | +1. taskUpdateSerializedCpuNanos |
| 276 | +2. taskUpdateDeliveredWallTimeNanos |
| 277 | +3. CPU usage for task update serde |
| 278 | + |
| 279 | +## [Optional] Other Approaches Considered |
| 280 | +1. See Option 1 |
| 281 | + |
| 282 | +## Adoption Plan |
| 283 | + |
| 284 | +### Rollout |
| 285 | +* As the first step, we will use drift to annotate all primitive types within those 3 classes mentioned before while keep complicated data types, e.g. Split, MetadataUpdate, TableWriteInfo as json |
| 286 | +* During the first step, we will add thrift support for those complicated data classes using one of the two options proposed above. |
| 287 | + |
| 288 | +- What impact (if any) will there be on existing users? Are there any new session parameters, configurations, SPI updates, client API updates, or SQL grammar? |
| 289 | + * the thrift serde will be disabled by default and can be enabled by a config |
| 290 | +- If we are changing behaviour how will we phase out the older behaviour? |
| 291 | + * N/A |
| 292 | +- If we need special migration tools, describe them here. |
| 293 | + * N/A |
| 294 | +- When will we remove the existing behaviour, if applicable. |
| 295 | + * N/A |
| 296 | +- How should this feature be taught to new and existing users? Basically mention if documentation changes/new blog are needed? |
| 297 | + * This feature will be documented in the Presto documentation. |
| 298 | +- What related issues do you consider out of scope for this RFC that could be addressed in the future independently of the solution that comes out of this RFC? |
| 299 | + * N/A |
| 300 | + |
| 301 | +## Test Plan |
| 302 | + |
| 303 | +How do we ensure the feature works as expected? Mention if any functional tests/integration tests are needed. Special mention for product-test changes. If any PoC has been done already, please mention the relevant test results here that you think will bolster your case of getting this RFC approved. |
| 304 | + |
| 305 | +- A PoC for step 1 about primitive type can be found from the following 2 PRs: |
| 306 | + * prestodb/presto#25020 |
| 307 | + * prestodb/presto#25079 |
0 commit comments