Skip to content

[SPARK-52807][SDP] Proto changes to support analysis inside Declarative Pipelines query functions #51502

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions sql/connect/common/src/main/protobuf/spark/connect/base.proto
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,19 @@ message Plan {
}
}



// User Context is used to refer to one particular user session that is executing
// queries in the backend.
message UserContext {
string user_id = 1;
string user_name = 2;

// (Optional) Should be non-null for RPCs that are sent during the execution of a Declarative
// Pipelines flow query function. Identifies the flow and the dataflow graph that it's a part of.
// Any plans that are analyzed within the RPC are analyzed "relative to" the dataflow graph.
// I.e., when determining the existence and schema of a data source that's defined in the graph,
// the definition from the graph is used instead of the definition in physical storage.
optional FlowAnalysisContext pipeline_flow_analysis_context = 3;
Copy link
Contributor Author

@sryza sryza Jul 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have strong opinions here, but it feels a little weird to me to put this inside UserContext, because it's not related to the user. Another option could be to put it directly on messages the the user context is on.


// To extend the existing user context message that is used to identify incoming requests,
// Spark Connect leverages the Any protobuf type that can be used to inject arbitrary other
// messages into this message. Extensions are stored as a `repeated` type to be able to
Expand Down Expand Up @@ -406,6 +411,10 @@ message ExecutePlanResponse {
// Pipeline command response
PipelineCommandResult pipeline_command_result = 22;

// A signal from the server to the client to execute the query function for a flow, and to
// register its result with the server.
PipelineQueryFunctionExecutionSignal pipeline_query_function_execution_signal = 23;

// Support arbitrary result objects.
google.protobuf.Any extension = 999;
}
Expand Down
66 changes: 63 additions & 3 deletions sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,24 @@ message PipelineCommand {
optional string format = 8;
}

// Metadata about why a query function failed to be executed successfully.
message QueryFunctionFailure {
// Identifier for a dataset within the graph that the query function needed to know the schema
// of but which had not yet been analyzed itself.
optional string missing_dependency = 1;
}

// The result of executing a user-defined query function.
message QueryFunctionResult {
oneof flow_function_evaluation_result {
// If the query function executed successfully, the unresolved logical plan produced by it.
spark.connect.Relation plan = 1;

// If the query function failed, metadata about the failure.
QueryFunctionFailure failure = 2;
}
}

// Request to define a flow targeting a dataset.
message DefineFlow {
// The graph to attach this flow to.
Expand All @@ -101,14 +119,18 @@ message PipelineCommand {
// Name of the dataset this flow writes to. Can be partially or fully qualified.
optional string target_dataset_name = 3;

// An unresolved relation that defines the dataset's flow.
optional spark.connect.Relation relation = 4;
// The result of executing the flow's query function
optional QueryFunctionResult result = 4;

// SQL configurations set when running this flow.
map<string, string> sql_conf = 5;

// If true, this flow will only be run once per full refresh.
optional bool once = 6;

// Identifier for the client making the request. The server uses this to determine what flow
// evaluation request stream to dispatch evaluation requests to for this flow.
optional string client_id = 7;
}

// Resolves all datasets and flows and start a pipeline update. Should be called after all
Expand All @@ -129,8 +151,30 @@ message PipelineCommand {
// The contents of the SQL file.
optional string sql_text = 3;
}
}

// Request to get the stream of query function execution signals for a graph. Responses should
// be a stream of PipelineQueryFunctionExecutionSignal messages.
message GetQueryFunctionExecutionSignalStream {
// The graph to get the query function execution signal stream for.
optional string dataflow_graph_id = 1;

// Identifier for the client that is requesting the stream.
optional string client_id = 2;
}

// Request from the client to update the flow function evaluation result
// for a previously unanalyzed flow.
message DefineFlowQueryFunctionResult {
// The fully qualified name of the flow being updated.
optional string flow_name = 1;

// The ID of the graph this flow belongs to.
optional string dataflow_graph_id = 2;

// The result of executing the flow's query function
optional QueryFunctionResult result = 3;
}
}

// Dispatch object for pipelines command results.
message PipelineCommandResult {
Expand Down Expand Up @@ -166,3 +210,19 @@ message PipelineEvent {
// The message that should be displayed to users.
optional string message = 2;
}

// A signal from the server to the client to execute the query function for one or more flows, and
// to register their results with the server.
message PipelineQueryFunctionExecutionSignal {
repeated string flow_names = 1;
}

// Context describing the flow being resolved within a graph.
message FlowAnalysisContext {
// (Required) The ID of the dataflow graph that the flow belongs to. I.e. the value returned by
// CreateDataflowGraph.
string dataflow_graph_id = 1;

// (Required) The name of the flow within the dataflow graph.
string flow_name = 2;
}