From 147aa323016a15619aae6045fe2628c6709f2671 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Tue, 15 Jul 2025 12:51:07 -0700 Subject: [PATCH 1/2] proto changes --- .../main/protobuf/spark/connect/base.proto | 13 +++- .../protobuf/spark/connect/pipelines.proto | 66 ++++++++++++++++++- 2 files changed, 74 insertions(+), 5 deletions(-) diff --git a/sql/connect/common/src/main/protobuf/spark/connect/base.proto b/sql/connect/common/src/main/protobuf/spark/connect/base.proto index 7f317defd47b5..ceb976da593f7 100644 --- a/sql/connect/common/src/main/protobuf/spark/connect/base.proto +++ b/sql/connect/common/src/main/protobuf/spark/connect/base.proto @@ -43,14 +43,19 @@ message Plan { } } - - // User Context is used to refer to one particular user session that is executing // queries in the backend. message UserContext { string user_id = 1; string user_name = 2; + // (Optional) Should be non-null for RPCs that are sent during the execution of a Declarative + // Pipelines flow query function. Identifies the flow and the dataflow graph that it's a part of. + // Any plans that are analyzed within the RPC are analyzed "relative to" the dataflow graph. + // I.e., when determining the existence and schema of a data source that's defined in the graph, + // the definition from the graph is used instead of the definition in physical storage. + optional FlowAnalysisContext pipeline_flow_analysis_context = 3; + // To extend the existing user context message that is used to identify incoming requests, // Spark Connect leverages the Any protobuf type that can be used to inject arbitrary other // messages into this message. Extensions are stored as a `repeated` type to be able to @@ -406,6 +411,10 @@ message ExecutePlanResponse { // Pipeline command response PipelineCommandResult pipeline_command_result = 22; + // A signal from the server to the client to execute the query function for a flow, and to + // register its result with the server. + PipelineQueryFunctionExecutionSignal pipeline_query_function_execution_signal = 23; + // Support arbitrary result objects. google.protobuf.Any extension = 999; } diff --git a/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto b/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto index c5a6312645902..0d5d845a739fb 100644 --- a/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto +++ b/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto @@ -90,6 +90,24 @@ message PipelineCommand { optional string format = 8; } + // Metadata about why a query function failed to be executed successfully. + message QueryFunctionFailure { + // Identifier for a dataset within the graph that the query function needed to know the schema + // of but which had not yet been analyzed itself. + optional string missing_dependency = 1; + } + + // The result of executing a user-defined query function. + message QueryFunctionResult { + oneof flow_function_evaluation_result { + // If the query function executed successfully, the unresolved logical plan produced by it. + spark.connect.Relation plan = 1; + + // If the query function failed, metadata about the failure. + QueryFunctionFailure failure = 2; + } + } + // Request to define a flow targeting a dataset. message DefineFlow { // The graph to attach this flow to. @@ -101,14 +119,18 @@ message PipelineCommand { // Name of the dataset this flow writes to. Can be partially or fully qualified. optional string target_dataset_name = 3; - // An unresolved relation that defines the dataset's flow. - optional spark.connect.Relation relation = 4; + // The result of executing the flow's query function + optional QueryFunctionResult result = 4; // SQL configurations set when running this flow. map sql_conf = 5; // If true, this flow will only be run once per full refresh. optional bool once = 6; + + // Identifier for the client making the request. The server uses this to determine what flow + // evaluation request stream to dispatch evaluation requests to for this flow. + optional string client_id = 7; } // Resolves all datasets and flows and start a pipeline update. Should be called after all @@ -129,8 +151,30 @@ message PipelineCommand { // The contents of the SQL file. optional string sql_text = 3; } -} + // Request to get the stream of query function execution signals for a graph. Responses should + // be a stream of PipelineQueryFunctionExecutionSignal messages. + message GetQueryFunctionExecutionSignalStream { + // The graph to get the query function execution signal stream for. + optional string dataflow_graph_id = 1; + + // Identifier for the client that is requesting the stream. + optional string client_id = 2; + } + + // Request from the client to update the flow function evaluation result + // for a previously unanalyzed flow. + message DefineFlowQueryFunctionResult { + // The fully qualified name of the flow being updated. + optional string flow_name = 1; + + // The ID of the graph this flow belongs to. + optional string dataflow_graph_id = 2; + + // The result of executing the flow's query function + optional QueryFunctionResult result = 3; + } +} // Dispatch object for pipelines command results. message PipelineCommandResult { @@ -166,3 +210,19 @@ message PipelineEvent { // The message that should be displayed to users. optional string message = 2; } + +// A signal from the server to the client to execute the query function for one or more flows, and +// to register their results with the server. +message PipelineQueryFunctionExecutionSignal { + repeated string flow_names = 1; +} + +// Context describing the flow being resolved within a graph. +message FlowAnalysisContext { + // (Required) The ID of the dataflow graph that the flow belongs to. I.e. the value returned by + // CreateDataflowGraph. + string dataflow_graph_id = 1; + + // (Required) The name of the flow within the dataflow graph. + string flow_name = 2; +} From a9b7e11db9f2bf6c5e6152c43326e69a1ee9f2ed Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Thu, 31 Jul 2025 09:29:01 -0700 Subject: [PATCH 2/2] take out the flow analysis context stuff --- .../common/src/main/protobuf/spark/connect/base.proto | 7 ------- .../src/main/protobuf/spark/connect/pipelines.proto | 10 ---------- 2 files changed, 17 deletions(-) diff --git a/sql/connect/common/src/main/protobuf/spark/connect/base.proto b/sql/connect/common/src/main/protobuf/spark/connect/base.proto index ceb976da593f7..0a2bb4c3eef27 100644 --- a/sql/connect/common/src/main/protobuf/spark/connect/base.proto +++ b/sql/connect/common/src/main/protobuf/spark/connect/base.proto @@ -49,13 +49,6 @@ message UserContext { string user_id = 1; string user_name = 2; - // (Optional) Should be non-null for RPCs that are sent during the execution of a Declarative - // Pipelines flow query function. Identifies the flow and the dataflow graph that it's a part of. - // Any plans that are analyzed within the RPC are analyzed "relative to" the dataflow graph. - // I.e., when determining the existence and schema of a data source that's defined in the graph, - // the definition from the graph is used instead of the definition in physical storage. - optional FlowAnalysisContext pipeline_flow_analysis_context = 3; - // To extend the existing user context message that is used to identify incoming requests, // Spark Connect leverages the Any protobuf type that can be used to inject arbitrary other // messages into this message. Extensions are stored as a `repeated` type to be able to diff --git a/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto b/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto index 0d5d845a739fb..1159b6a2e4991 100644 --- a/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto +++ b/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto @@ -216,13 +216,3 @@ message PipelineEvent { message PipelineQueryFunctionExecutionSignal { repeated string flow_names = 1; } - -// Context describing the flow being resolved within a graph. -message FlowAnalysisContext { - // (Required) The ID of the dataflow graph that the flow belongs to. I.e. the value returned by - // CreateDataflowGraph. - string dataflow_graph_id = 1; - - // (Required) The name of the flow within the dataflow graph. - string flow_name = 2; -}