@@ -232,7 +232,7 @@ pub mod auto {
232232}
233233
234234mod fixed {
235- use std:: collections:: HashMap ;
235+ use std:: collections:: { HashMap , HashSet } ;
236236
237237 use risingwave_common:: bail;
238238 use risingwave_common:: catalog:: ObjectId ;
@@ -243,16 +243,65 @@ mod fixed {
243243 use crate :: optimizer:: backfill_order_strategy:: common:: {
244244 bind_backfill_relation_id_by_name, has_cycle,
245245 } ;
246+ use crate :: optimizer:: plan_node:: { StreamPlanNodeType , StreamPlanRef } ;
246247 use crate :: session:: SessionImpl ;
247248
249+ /// Collect all relation IDs (tables and sources) that are scanned in the plan.
250+ fn collect_scanned_relation_ids ( plan : StreamPlanRef ) -> HashSet < ObjectId > {
251+ let mut relation_ids = HashSet :: new ( ) ;
252+
253+ fn visit ( plan : StreamPlanRef , relation_ids : & mut HashSet < ObjectId > ) {
254+ match plan. node_type ( ) {
255+ StreamPlanNodeType :: StreamTableScan => {
256+ let table_scan = plan. as_stream_table_scan ( ) . expect ( "table scan" ) ;
257+ let relation_id = table_scan. core ( ) . table_catalog . id ( ) . into ( ) ;
258+ relation_ids. insert ( relation_id) ;
259+ }
260+ StreamPlanNodeType :: StreamSourceScan => {
261+ let source_scan = plan. as_stream_source_scan ( ) . expect ( "source scan" ) ;
262+ let relation_id = source_scan. source_catalog ( ) . id ;
263+ relation_ids. insert ( relation_id) ;
264+ }
265+ _ => { }
266+ }
267+
268+ // Recursively visit all inputs
269+ for child in plan. inputs ( ) {
270+ visit ( child, relation_ids) ;
271+ }
272+ }
273+
274+ visit ( plan, & mut relation_ids) ;
275+ relation_ids
276+ }
277+
248278 pub ( super ) fn plan_fixed_strategy (
249279 session : & SessionImpl ,
250280 orders : Vec < ( ObjectName , ObjectName ) > ,
281+ plan : StreamPlanRef ,
251282 ) -> Result < HashMap < ObjectId , Uint32Vector > > {
283+ // Collect all scanned relation IDs from the plan
284+ let scanned_relation_ids = collect_scanned_relation_ids ( plan) ;
285+
252286 let mut order: HashMap < ObjectId , Uint32Vector > = HashMap :: new ( ) ;
253287 for ( start_name, end_name) in orders {
254- let start_relation_id = bind_backfill_relation_id_by_name ( session, start_name) ?;
255- let end_relation_id = bind_backfill_relation_id_by_name ( session, end_name) ?;
288+ let start_relation_id = bind_backfill_relation_id_by_name ( session, start_name. clone ( ) ) ?;
289+ let end_relation_id = bind_backfill_relation_id_by_name ( session, end_name. clone ( ) ) ?;
290+
291+ // Validate that both relations are present in the query plan
292+ if !scanned_relation_ids. contains ( & start_relation_id) {
293+ bail ! (
294+ "Table or source '{}' specified in backfill_order is not used in the query" ,
295+ start_name
296+ ) ;
297+ }
298+ if !scanned_relation_ids. contains ( & end_relation_id) {
299+ bail ! (
300+ "Table or source '{}' specified in backfill_order is not used in the query" ,
301+ end_name
302+ ) ;
303+ }
304+
256305 order
257306 . entry ( start_relation_id)
258307 . or_default ( )
@@ -423,7 +472,7 @@ pub fn plan_backfill_order(
423472 let order = match backfill_order_strategy {
424473 BackfillOrderStrategy :: Default | BackfillOrderStrategy :: None => Default :: default ( ) ,
425474 BackfillOrderStrategy :: Auto => plan_auto_strategy ( session, plan) ,
426- BackfillOrderStrategy :: Fixed ( orders) => plan_fixed_strategy ( session, orders) ?,
475+ BackfillOrderStrategy :: Fixed ( orders) => plan_fixed_strategy ( session, orders, plan ) ?,
427476 } ;
428477 Ok ( BackfillOrder { order } )
429478}
0 commit comments