Skip to content

[SPARK-52807][SDP] Proto changes to support analysis inside Declarative Pipelines query functions #51502

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

sryza
Copy link
Contributor

@sryza sryza commented Jul 15, 2025

What changes were proposed in this pull request?

Introduces a mechanism for lazy execution of Declarative Pipelines query functions. A query function is something like the mv1 in this example:

@materialized_view
def mv1():
    return spark.table("upstream_table").filter(some_condition)

Currently, query functions are always executed eagerly. I.e. the implementation of the materialized_view decorator immediately invokes the function that it decorates and then registers the resulting DataFrame with the server.

This PR introduces Spark Connect proto changes that enable executing query functions later on, initiated by the server during graph resolution. After all datasets and flows have been registered with the server, the server can tell the client to execute the query functions for flows that haven't yet successfully been executed. The way this works is that the client initiates an RPC with the server, and then the server streams back responses that indicate to the client when it's time to execute a query function for one of its flows. Relevant changes:

  • New QueryFunctionFailure message
  • New QueryFunctionResult message
  • Replace relation field in DefineFlow with query_function_result field
  • New DefineFlowQueryFunctionResult message
  • New GetQueryFunctionExecutionSignalStream message
  • New PipelineQueryFunctionExecutionSignal message

This PR also introduces Spark Connect proto changes that enable carrying out plan analysis "relative to" a dataflow graph. "Relative to" means that, when determining the existence and schema of a table that's defined in the graph, the definitions from the graph is used instead of the definition of the catalog. This will be used in cases where the code inside a query function triggers analysis. Relevant changes:

  • New FlowAnalysisContext message

Why are the changes needed?

There are some situations where we can't resolve the relation immediately at the time we're registering a flow.

E.g. consider this situation:
file 1:

@materialized_view
def mv1():
    data = [("Alice", 10), ("Bob", 15), ("Alice", 5)]
    return spark.createDataFrame(data, ["name", "amount"])

file 2:

@materialized_view
def mv2():
    return spark.table("mv1").groupBy("name").agg(sum("amount").alias("total_amount"))

Unlike some other transformations, which get analyzed lazily, groupBy can trigger an AnalyzePlan Spark Connect request immediately. If the query function for mv2 gets executed before mv1, then it will hit an error, because mv1 doesn't exist yet. groupBy isn't the only example here.

Other examples of these kinds of situations:

  • The set of columns for a downstream table is determined from the set of columns in an upstream table.
  • When spark.sql is used.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Was this patch authored or co-authored using generative AI tooling?

@sryza sryza force-pushed the proto-changes branch 3 times, most recently from 90a077f to c6ee6a8 Compare July 16, 2025 16:01
// Any plans that are analyzed within the RPC are analyzed "relative to" the dataflow graph.
// I.e., when determining the existence and schema of a data source that's defined in the graph,
// the definition from the graph is used instead of the definition in physical storage.
optional FlowAnalysisContext pipeline_flow_analysis_context = 3;
Copy link
Contributor Author

@sryza sryza Jul 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have strong opinions here, but it feels a little weird to me to put this inside UserContext, because it's not related to the user. Another option could be to put it directly on messages the the user context is on.

@sryza sryza marked this pull request as ready for review July 16, 2025 16:09
@sryza sryza requested a review from hvanhovell July 16, 2025 16:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant