@@ -757,8 +757,8 @@ mod tests {
757757        convert_scheduling_solution_to_physical_plan_single_node_single_source, 
758758    } ; 
759759    use  crate :: indexing_plan:: PhysicalIndexingPlan ; 
760-     use  crate :: indexing_scheduler:: get_shard_locality_metrics; 
761760    use  crate :: indexing_scheduler:: scheduling:: assign_shards; 
761+     use  crate :: indexing_scheduler:: { MAX_LOAD_PER_PIPELINE ,  get_shard_locality_metrics} ; 
762762    use  crate :: model:: ShardLocations ; 
763763
764764    fn  source_id ( )  -> SourceUid  { 
@@ -939,6 +939,146 @@ mod tests {
939939        } 
940940    } 
941941
942+     #[ test]  
943+     fn  test_build_physical_plan_with_pipeline_limit ( )  { 
944+         let  indexer1 = "indexer1" . to_string ( ) ; 
945+         let  indexer2 = "indexer2" . to_string ( ) ; 
946+         let  source_uid0 = source_id ( ) ; 
947+         let  source_uid1 = source_id ( ) ; 
948+         let  source_0 = SourceToSchedule  { 
949+             source_uid :  source_uid0. clone ( ) , 
950+             source_type :  SourceToScheduleType :: Sharded  { 
951+                 shard_ids :  ( 0 ..16 ) . map ( ShardId :: from) . collect ( ) , 
952+                 load_per_shard :  NonZeroU32 :: new ( 800 ) . unwrap ( ) , 
953+             } , 
954+             params_fingerprint :  0 , 
955+         } ; 
956+         let  source_1 = SourceToSchedule  { 
957+             source_uid :  source_uid1. clone ( ) , 
958+             source_type :  SourceToScheduleType :: NonSharded  { 
959+                 num_pipelines :  4 , 
960+                 load_per_pipeline :  NonZeroU32 :: new ( MAX_LOAD_PER_PIPELINE . cpu_millis ( ) ) . unwrap ( ) , 
961+             } , 
962+             params_fingerprint :  0 , 
963+         } ; 
964+         let  mut  indexer_id_to_cpu_capacities = FnvHashMap :: default ( ) ; 
965+         indexer_id_to_cpu_capacities. insert ( indexer1. clone ( ) ,  mcpu ( 16_000 ) ) ; 
966+         indexer_id_to_cpu_capacities. insert ( indexer2. clone ( ) ,  mcpu ( 16_000 ) ) ; 
967+         let  shard_locations = ShardLocations :: default ( ) ; 
968+         let  indexing_plan = build_physical_indexing_plan ( 
969+             & [ source_0,  source_1] , 
970+             & indexer_id_to_cpu_capacities, 
971+             None , 
972+             & shard_locations, 
973+         ) ; 
974+         assert_eq ! ( indexing_plan. indexing_tasks_per_indexer( ) . len( ) ,  2 ) ; 
975+ 
976+         let  node1_plan = indexing_plan. indexer ( & indexer1) . unwrap ( ) ; 
977+         let  node2_plan = indexing_plan. indexer ( & indexer2) . unwrap ( ) ; 
978+ 
979+         let  source_0_on_node1 = node1_plan
980+             . iter ( ) 
981+             . filter ( |task| task. source_id  == source_uid0. source_id ) 
982+             . count ( ) ; 
983+         let  source_0_on_node2 = node2_plan
984+             . iter ( ) 
985+             . filter ( |task| task. source_id  == source_uid0. source_id ) 
986+             . count ( ) ; 
987+         assert ! ( source_0_on_node1 <= 3 ) ; 
988+         assert ! ( source_0_on_node2 <= 3 ) ; 
989+         assert_eq ! ( source_0_on_node1 + source_0_on_node2,  4 ) ; 
990+ 
991+         let  source_1_on_node1 = node1_plan
992+             . iter ( ) 
993+             . filter ( |task| task. source_id  == source_uid1. source_id ) 
994+             . count ( ) ; 
995+         let  source_1_on_node2 = node2_plan
996+             . iter ( ) 
997+             . filter ( |task| task. source_id  == source_uid1. source_id ) 
998+             . count ( ) ; 
999+         assert ! ( source_1_on_node1 <= 3 ) ; 
1000+         assert ! ( source_1_on_node2 <= 3 ) ; 
1001+         assert_eq ! ( source_1_on_node1 + source_1_on_node2,  4 ) ; 
1002+     } 
1003+ 
1004+     #[ test]  
1005+     fn  test_build_physical_plan_second_iteration ( )  { 
1006+         let  indexer1 = "indexer1" . to_string ( ) ; 
1007+         let  indexer2 = "indexer2" . to_string ( ) ; 
1008+         let  indexer3 = "indexer3" . to_string ( ) ; 
1009+         let  mut  sources = Vec :: new ( ) ; 
1010+         for  _ in  0 ..10  { 
1011+             sources. push ( SourceToSchedule  { 
1012+                 source_uid :  source_id ( ) , 
1013+                 source_type :  SourceToScheduleType :: NonSharded  { 
1014+                     num_pipelines :  4 , 
1015+                     load_per_pipeline :  NonZeroU32 :: new ( MAX_LOAD_PER_PIPELINE . cpu_millis ( ) ) . unwrap ( ) , 
1016+                 } , 
1017+                 params_fingerprint :  0 , 
1018+             } ) ; 
1019+         } 
1020+         let  mut  indexer_id_to_cpu_capacities = FnvHashMap :: default ( ) ; 
1021+         indexer_id_to_cpu_capacities. insert ( indexer1. clone ( ) ,  mcpu ( 16_000 ) ) ; 
1022+         indexer_id_to_cpu_capacities. insert ( indexer2. clone ( ) ,  mcpu ( 16_000 ) ) ; 
1023+         indexer_id_to_cpu_capacities. insert ( indexer3. clone ( ) ,  mcpu ( 16_000 ) ) ; 
1024+         let  shard_locations = ShardLocations :: default ( ) ; 
1025+         let  indexing_plan = build_physical_indexing_plan ( 
1026+             & sources, 
1027+             & indexer_id_to_cpu_capacities, 
1028+             None , 
1029+             & shard_locations, 
1030+         ) ; 
1031+         assert_eq ! ( indexing_plan. indexing_tasks_per_indexer( ) . len( ) ,  3 ) ; 
1032+ 
1033+         for  source in  & sources { 
1034+             let  pipelines_per_indexer_for_source = indexing_plan
1035+                 . indexing_tasks_per_indexer ( ) 
1036+                 . values ( ) 
1037+                 . map ( |tasks| { 
1038+                     tasks
1039+                         . iter ( ) 
1040+                         . filter ( |t| t. source_id  == source. source_uid . source_id ) 
1041+                         . count ( ) 
1042+                 } ) 
1043+                 . collect_vec ( ) ; 
1044+             assert ! ( pipelines_per_indexer_for_source. contains( & 3 ) ) ; 
1045+             assert ! ( pipelines_per_indexer_for_source. contains( & 1 ) ) ; 
1046+             assert ! ( pipelines_per_indexer_for_source. contains( & 0 ) ) ; 
1047+             assert_eq ! ( pipelines_per_indexer_for_source. iter( ) . sum:: <usize >( ) ,  4 ) ; 
1048+         } 
1049+ 
1050+         for  source in  & mut  sources { 
1051+             if  let  SourceToScheduleType :: NonSharded  {  num_pipelines,  .. }  = & mut  source. source_type 
1052+             { 
1053+                 * num_pipelines = 5 ; 
1054+             } 
1055+         } 
1056+ 
1057+         let  new_indexing_plan = build_physical_indexing_plan ( 
1058+             & sources, 
1059+             & indexer_id_to_cpu_capacities, 
1060+             Some ( & indexing_plan) , 
1061+             & shard_locations, 
1062+         ) ; 
1063+ 
1064+         for  source in  & sources { 
1065+             let  pipelines_per_indexer_for_source = new_indexing_plan
1066+                 . indexing_tasks_per_indexer ( ) 
1067+                 . values ( ) 
1068+                 . map ( |tasks| { 
1069+                     tasks
1070+                         . iter ( ) 
1071+                         . filter ( |t| t. source_id  == source. source_uid . source_id ) 
1072+                         . count ( ) 
1073+                 } ) 
1074+                 . collect_vec ( ) ; 
1075+             assert ! ( pipelines_per_indexer_for_source. contains( & 3 ) ) ; 
1076+             assert ! ( pipelines_per_indexer_for_source. contains( & 2 ) ) ; 
1077+             assert ! ( pipelines_per_indexer_for_source. contains( & 0 ) ) ; 
1078+             assert_eq ! ( pipelines_per_indexer_for_source. iter( ) . sum:: <usize >( ) ,  5 ) ; 
1079+         } 
1080+     } 
1081+ 
9421082    fn  make_indexing_tasks ( 
9431083        source_uid :  & SourceUid , 
9441084        shards :  & [ ( PipelineUid ,  & [ ShardId ] ) ] , 
0 commit comments