diff --git a/sql/connect/common/src/main/protobuf/spark/connect/base.proto b/sql/connect/common/src/main/protobuf/spark/connect/base.proto index 7f317defd47b5..0a2bb4c3eef27 100644 --- a/sql/connect/common/src/main/protobuf/spark/connect/base.proto +++ b/sql/connect/common/src/main/protobuf/spark/connect/base.proto @@ -43,8 +43,6 @@ message Plan { } } - - // User Context is used to refer to one particular user session that is executing // queries in the backend. message UserContext { @@ -406,6 +404,10 @@ message ExecutePlanResponse { // Pipeline command response PipelineCommandResult pipeline_command_result = 22; + // A signal from the server to the client to execute the query function for a flow, and to + // register its result with the server. + PipelineQueryFunctionExecutionSignal pipeline_query_function_execution_signal = 23; + // Support arbitrary result objects. google.protobuf.Any extension = 999; } diff --git a/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto b/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto index c5a6312645902..1159b6a2e4991 100644 --- a/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto +++ b/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto @@ -90,6 +90,24 @@ message PipelineCommand { optional string format = 8; } + // Metadata about why a query function failed to be executed successfully. + message QueryFunctionFailure { + // Identifier for a dataset within the graph that the query function needed to know the schema + // of but which had not yet been analyzed itself. + optional string missing_dependency = 1; + } + + // The result of executing a user-defined query function. + message QueryFunctionResult { + oneof flow_function_evaluation_result { + // If the query function executed successfully, the unresolved logical plan produced by it. + spark.connect.Relation plan = 1; + + // If the query function failed, metadata about the failure. + QueryFunctionFailure failure = 2; + } + } + // Request to define a flow targeting a dataset. message DefineFlow { // The graph to attach this flow to. @@ -101,14 +119,18 @@ message PipelineCommand { // Name of the dataset this flow writes to. Can be partially or fully qualified. optional string target_dataset_name = 3; - // An unresolved relation that defines the dataset's flow. - optional spark.connect.Relation relation = 4; + // The result of executing the flow's query function + optional QueryFunctionResult result = 4; // SQL configurations set when running this flow. map sql_conf = 5; // If true, this flow will only be run once per full refresh. optional bool once = 6; + + // Identifier for the client making the request. The server uses this to determine what flow + // evaluation request stream to dispatch evaluation requests to for this flow. + optional string client_id = 7; } // Resolves all datasets and flows and start a pipeline update. Should be called after all @@ -129,8 +151,30 @@ message PipelineCommand { // The contents of the SQL file. optional string sql_text = 3; } -} + // Request to get the stream of query function execution signals for a graph. Responses should + // be a stream of PipelineQueryFunctionExecutionSignal messages. + message GetQueryFunctionExecutionSignalStream { + // The graph to get the query function execution signal stream for. + optional string dataflow_graph_id = 1; + + // Identifier for the client that is requesting the stream. + optional string client_id = 2; + } + + // Request from the client to update the flow function evaluation result + // for a previously unanalyzed flow. + message DefineFlowQueryFunctionResult { + // The fully qualified name of the flow being updated. + optional string flow_name = 1; + + // The ID of the graph this flow belongs to. + optional string dataflow_graph_id = 2; + + // The result of executing the flow's query function + optional QueryFunctionResult result = 3; + } +} // Dispatch object for pipelines command results. message PipelineCommandResult { @@ -166,3 +210,9 @@ message PipelineEvent { // The message that should be displayed to users. optional string message = 2; } + +// A signal from the server to the client to execute the query function for one or more flows, and +// to register their results with the server. +message PipelineQueryFunctionExecutionSignal { + repeated string flow_names = 1; +}