-
Notifications
You must be signed in to change notification settings - Fork 5.5k
Allow connectors to pass through richer representations of commit metadata for query event listeners #26331
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
…adata for query event listeners Summary: Currently, the `Input` and `Output` query metadata classes retain two source of connector-specific information that can be useful for reporting via an `EventListener`: ``` Optional<Object> connectorInfo; String serializedCommitOutput; ``` * `connectorInfo` can be cast back to the correct type in an `EventListener` implementation, allowing rich access to the underlying data * `serializedCommitOutput` however, is serialized in a given format by the `ConnectorCommitHandle` implementation, which makes it difficult to correctly represent the reporting requirements in an EventListener (which may need correlation with data in the `connectorInfo` result. For example, `HiveCommitHandle` retains the lastDataCommitTime for each partition in a simple array associated with the table name, where the partition names are retained in the `HiveInputInfo` instance carried through in connectorInfo. For these times to be mapped back to individual partitions, the entries must be in the exact same order as the entries in HiveInputInfo. This change simply replaces the `serializedCommitOutput` property with an `Optional<ConnectorCommitHandle>`, instance, providing parity with the `connectorInfo`, and allowing `EventListener` implementations to cast the commit handle back to the correct type for richer access to the underlying data. Differential Revision: D84382446
Reviewer's GuideThis PR replaces the string-based serializedCommitOutput with an Optional across core metadata and execution classes, updating constructors, getters, JSON annotations, equals/hashCode/toString, and propagating the richer commit handle through QueryStateMachine, QueryMonitor, SQL planner extractors, SPI event metadata, and tests to enable event listeners to obtain connector-specific commit data directly. Sequence diagram for propagation of ConnectorCommitHandle in QueryStateMachine and QueryMonitorsequenceDiagram
participant "QueryStateMachine"
participant "Input/Output"
participant "QueryMonitor"
participant "EventListener"
"QueryStateMachine"->>"Input/Output": attach commitHandle (Optional<ConnectorCommitHandle>)
"Input/Output"->>"QueryMonitor": pass commitHandle via getCommitHandle()
"QueryMonitor"->>"EventListener": provide commitHandle for richer event reporting
Class diagram for updated Input and Output metadata classesclassDiagram
class Input {
- ConnectorId connectorId
- String schema
- String table
- Optional<Object> connectorInfo
- List<Column> columns
- Optional<TableStatistics> statistics
- Optional<ConnectorCommitHandle> commitHandle
+ getConnectorId()
+ getSchema()
+ getTable()
+ getConnectorInfo()
+ getColumns()
+ getStatistics()
+ getCommitHandle()
}
class Output {
- ConnectorId connectorId
- String schema
- String table
- Optional<List<OutputColumnMetadata>> columns
- Optional<ConnectorCommitHandle> commitHandle
+ getConnectorId()
+ getSchema()
+ getTable()
+ getColumns()
+ getCommitHandle()
}
Input --> ConnectorCommitHandle
Output --> ConnectorCommitHandle
Input --> TableStatistics
Output --> OutputColumnMetadata
Input --> Column
Class diagram for updated QueryInputMetadata and QueryOutputMetadataclassDiagram
class QueryInputMetadata {
- String catalogName
- String schema
- String table
- List<Column> columnObjects
- Optional<Object> connectorInfo
- Optional<TableStatistics> statistics
- Optional<ConnectorCommitHandle> commitHandle
+ getCatalogName()
+ getSchema()
+ getTable()
+ getColumnObjects()
+ getConnectorInfo()
+ getStatistics()
+ getCommitHandle()
+ getSerializedCommitOutput()
}
class QueryOutputMetadata {
- String catalogName
- String schema
- String table
- Optional<String> connectorOutputMetadata
- Optional<Boolean> jsonLengthLimitExceeded
- Optional<List<OutputColumnMetadata>> columns
- Optional<ConnectorCommitHandle> commitHandle
+ getCatalogName()
+ getSchema()
+ getTable()
+ getConnectorOutputMetadata()
+ getJsonLengthLimitExceeded()
+ getColumns()
+ getCommitHandle()
+ getSerializedCommitOutput()
}
QueryInputMetadata --> ConnectorCommitHandle
QueryOutputMetadata --> ConnectorCommitHandle
QueryInputMetadata --> TableStatistics
QueryOutputMetadata --> OutputColumnMetadata
QueryInputMetadata --> Column
Class diagram for HiveCommitHandle with new JSON ignore propertiesclassDiagram
class HiveCommitHandle {
+ getSerializedCommitOutputForRead(SchemaTableName)
+ getSerializedCommitOutputForWrite(SchemaTableName)
<<implements ConnectorCommitHandle>>
<<@JsonIgnoreProperties>>
lastDataCommitTimesForRead
lastDataCommitTimesForWrite
}
HiveCommitHandle ..|> ConnectorCommitHandle
Class diagram for propagation in SQL planner extractorsclassDiagram
class OutputExtractor {
+ extractOutput(PlanNode): Optional<Output>
}
class InputExtractor {
+ createInput(TableMetadata, TableHandle, Set<Column>, Optional<TableStatistics>): Input
}
OutputExtractor --> Output
InputExtractor --> Input
Output --> ConnectorCommitHandle
Input --> ConnectorCommitHandle
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes and they look great!
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location> `presto-main/src/test/java/com/facebook/presto/eventlistener/TestEventListenerManager.java:396` </location>
<code_context>
- "dummySerializedCommitOutput",
- Optional.of(columns));
+ Optional.of(columns),
+ Optional.of(new TestCommitHandle("", "dummySerializedCommitOutput")));
return new QueryIOMetadata(inputs, Optional.of(outputMetadata));
}
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding assertions that verify the correct handling and casting of ConnectorCommitHandle in event listener scenarios.
Adding targeted tests for casting and accessing custom fields in ConnectorCommitHandle within EventListener will help validate the new API and prevent regressions.
Suggested implementation:
```java
import com.facebook.presto.spi.connector.ConnectorCommitHandle;
import com.facebook.presto.spi.eventlistener.CTEInformation;
import com.facebook.presto.spi.eventlistener.Column;
import com.facebook.presto.spi.eventlistener.EventListener;
import org.testng.annotations.Test;
import static org.testng.Assert.*;
```
```java
private static SplitCompletedEvent createDummySplitCompletedEvent()
}
@Test
public void testConnectorCommitHandleCastingInEventListener() {
// Simulate the event listener receiving a ConnectorCommitHandle
ConnectorCommitHandle handle = new TestCommitHandle("testId", "testSerializedOutput");
// Assert correct type and field access
assertTrue(handle instanceof TestCommitHandle, "Handle should be instance of TestCommitHandle");
TestCommitHandle testHandle = (TestCommitHandle) handle;
assertEquals(testHandle.getId(), "testId", "Custom field 'id' should be accessible and correct");
assertEquals(testHandle.getSerializedCommitOutput(), "testSerializedOutput", "Custom field 'serializedCommitOutput' should be accessible and correct");
}
```
</issue_to_address>
### Comment 2
<location> `presto-main-base/src/test/java/com/facebook/presto/execution/TestOutput.java:46` </location>
<code_context>
new Column("column3", "string")),
Optional.empty(),
- "");
+ Optional.empty());
String json = codec.toJson(expected);
</code_context>
<issue_to_address>
**suggestion (testing):** Add tests for non-empty ConnectorCommitHandle in Output round-trip serialization.
Please add a test with a non-empty commitHandle containing custom data to verify correct serialization and deserialization for this field.
</issue_to_address>
### Comment 3
<location> `presto-main-base/src/test/java/com/facebook/presto/execution/TestInput.java:37` </location>
<code_context>
new Column("column3", "string")),
Optional.empty(),
- "");
+ Optional.empty());
String json = codec.toJson(expected);
</code_context>
<issue_to_address>
**suggestion (testing):** Add tests for non-empty ConnectorCommitHandle in Input round-trip serialization.
Please include a test with a non-empty commitHandle to ensure proper handling during serialization and deserialization, particularly for custom implementations.
</issue_to_address>
### Comment 4
<location> `presto-main-base/src/test/java/com/facebook/presto/execution/TestQueryInfo.java:186` </location>
<code_context>
null,
ImmutableList.of(new PrestoWarning(new WarningCode(1, "name"), "message")),
- ImmutableSet.of(new Input(new ConnectorId("connector"), "schema", "table", Optional.empty(), ImmutableList.of(new Column("name", "type")), Optional.empty(), "")),
+ ImmutableSet.of(new Input(new ConnectorId("connector"), "schema", "table", Optional.empty(), ImmutableList.of(new Column("name", "type")), Optional.empty(), Optional.empty())),
Optional.empty(),
true,
</code_context>
<issue_to_address>
**suggestion (testing):** Test coverage for Input with non-empty commitHandle in QueryInfo is missing.
Add a test where commitHandle in Input is non-empty to verify correct handling in QueryInfo.
Suggested implementation:
```java
ImmutableSet.of(new Input(new ConnectorId("connector"), "schema", "table", Optional.empty(), ImmutableList.of(new Column("name", "type")), Optional.empty(), Optional.empty())),
// Test Input with non-empty commitHandle
ImmutableSet.of(new Input(
new ConnectorId("connector"),
"schema",
"table",
Optional.empty(),
ImmutableList.of(new Column("name", "type")),
Optional.empty(),
Optional.of("commit-handle-value")
)),
```
You should also:
1. Add assertions after constructing the QueryInfo object to verify that the Input with non-empty commitHandle is present and its value is as expected.
2. If this is inside a method like `testQueryInfoInputs()`, ensure the new Input is included in the QueryInfo instance and checked in the test logic.
</issue_to_address>
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
null, | ||
ImmutableList.of(new PrestoWarning(new WarningCode(1, "name"), "message")), | ||
ImmutableSet.of(new Input(new ConnectorId("connector"), "schema", "table", Optional.empty(), ImmutableList.of(new Column("name", "type")), Optional.empty(), "")), | ||
ImmutableSet.of(new Input(new ConnectorId("connector"), "schema", "table", Optional.empty(), ImmutableList.of(new Column("name", "type")), Optional.empty(), Optional.empty())), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (testing): Test coverage for Input with non-empty commitHandle in QueryInfo is missing.
Add a test where commitHandle in Input is non-empty to verify correct handling in QueryInfo.
Suggested implementation:
ImmutableSet.of(new Input(new ConnectorId("connector"), "schema", "table", Optional.empty(), ImmutableList.of(new Column("name", "type")), Optional.empty(), Optional.empty())),
// Test Input with non-empty commitHandle
ImmutableSet.of(new Input(
new ConnectorId("connector"),
"schema",
"table",
Optional.empty(),
ImmutableList.of(new Column("name", "type")),
Optional.empty(),
Optional.of("commit-handle-value")
)),
You should also:
- Add assertions after constructing the QueryInfo object to verify that the Input with non-empty commitHandle is present and its value is as expected.
- If this is inside a method like
testQueryInfoInputs()
, ensure the new Input is included in the QueryInfo instance and checked in the test logic.
Summary:
Currently, the
Input
andOutput
query metadata classes retain two source of connector-specific information that can be useful for reporting via anEventListener
:connectorInfo
can be cast back to the correct type in anEventListener
implementation, allowing rich access to the underlying dataserializedCommitOutput
however, is serialized in a given format by theConnectorCommitHandle
implementation, which makes it difficult to correctly represent the reporting requirements in an EventListener (which may need correlation with data in theconnectorInfo
result.For example,
HiveCommitHandle
retains the lastDataCommitTime for each partition in a simple array associated with the table name, where the partition names are retained in theHiveInputInfo
instance carried through in connectorInfo. For these times to be mapped back to individual partitions, the entries must be in the exact same order as the entries in HiveInputInfo.This change simply replaces the
serializedCommitOutput
property with anOptional<ConnectorCommitHandle>
, instance, providing parity with theconnectorInfo
, and allowingEventListener
implementations to cast the commit handle back to the correct type for richer access to the underlying data.Differential Revision: D84382446