@@ -757,8 +757,8 @@ mod tests {
757757 convert_scheduling_solution_to_physical_plan_single_node_single_source,
758758 } ;
759759 use crate :: indexing_plan:: PhysicalIndexingPlan ;
760- use crate :: indexing_scheduler:: get_shard_locality_metrics;
761760 use crate :: indexing_scheduler:: scheduling:: assign_shards;
761+ use crate :: indexing_scheduler:: { MAX_LOAD_PER_PIPELINE , get_shard_locality_metrics} ;
762762 use crate :: model:: ShardLocations ;
763763
764764 fn source_id ( ) -> SourceUid {
@@ -939,6 +939,146 @@ mod tests {
939939 }
940940 }
941941
942+ #[ test]
943+ fn test_build_physical_plan_with_pipeline_limit ( ) {
944+ let indexer1 = "indexer1" . to_string ( ) ;
945+ let indexer2 = "indexer2" . to_string ( ) ;
946+ let source_uid0 = source_id ( ) ;
947+ let source_uid1 = source_id ( ) ;
948+ let source_0 = SourceToSchedule {
949+ source_uid : source_uid0. clone ( ) ,
950+ source_type : SourceToScheduleType :: Sharded {
951+ shard_ids : ( 0 ..16 ) . map ( ShardId :: from) . collect ( ) ,
952+ load_per_shard : NonZeroU32 :: new ( 800 ) . unwrap ( ) ,
953+ } ,
954+ params_fingerprint : 0 ,
955+ } ;
956+ let source_1 = SourceToSchedule {
957+ source_uid : source_uid1. clone ( ) ,
958+ source_type : SourceToScheduleType :: NonSharded {
959+ num_pipelines : 4 ,
960+ load_per_pipeline : NonZeroU32 :: new ( MAX_LOAD_PER_PIPELINE . cpu_millis ( ) ) . unwrap ( ) ,
961+ } ,
962+ params_fingerprint : 0 ,
963+ } ;
964+ let mut indexer_id_to_cpu_capacities = FnvHashMap :: default ( ) ;
965+ indexer_id_to_cpu_capacities. insert ( indexer1. clone ( ) , mcpu ( 16_000 ) ) ;
966+ indexer_id_to_cpu_capacities. insert ( indexer2. clone ( ) , mcpu ( 16_000 ) ) ;
967+ let shard_locations = ShardLocations :: default ( ) ;
968+ let indexing_plan = build_physical_indexing_plan (
969+ & [ source_0, source_1] ,
970+ & indexer_id_to_cpu_capacities,
971+ None ,
972+ & shard_locations,
973+ ) ;
974+ assert_eq ! ( indexing_plan. indexing_tasks_per_indexer( ) . len( ) , 2 ) ;
975+
976+ let node1_plan = indexing_plan. indexer ( & indexer1) . unwrap ( ) ;
977+ let node2_plan = indexing_plan. indexer ( & indexer2) . unwrap ( ) ;
978+
979+ let source_0_on_node1 = node1_plan
980+ . iter ( )
981+ . filter ( |task| task. source_id == source_uid0. source_id )
982+ . count ( ) ;
983+ let source_0_on_node2 = node2_plan
984+ . iter ( )
985+ . filter ( |task| task. source_id == source_uid0. source_id )
986+ . count ( ) ;
987+ assert ! ( source_0_on_node1 <= 3 ) ;
988+ assert ! ( source_0_on_node2 <= 3 ) ;
989+ assert_eq ! ( source_0_on_node1 + source_0_on_node2, 4 ) ;
990+
991+ let source_1_on_node1 = node1_plan
992+ . iter ( )
993+ . filter ( |task| task. source_id == source_uid1. source_id )
994+ . count ( ) ;
995+ let source_1_on_node2 = node2_plan
996+ . iter ( )
997+ . filter ( |task| task. source_id == source_uid1. source_id )
998+ . count ( ) ;
999+ assert ! ( source_1_on_node1 <= 3 ) ;
1000+ assert ! ( source_1_on_node2 <= 3 ) ;
1001+ assert_eq ! ( source_1_on_node1 + source_1_on_node2, 4 ) ;
1002+ }
1003+
1004+ #[ test]
1005+ fn test_build_physical_plan_second_iteration ( ) {
1006+ let indexer1 = "indexer1" . to_string ( ) ;
1007+ let indexer2 = "indexer2" . to_string ( ) ;
1008+ let indexer3 = "indexer3" . to_string ( ) ;
1009+ let mut sources = Vec :: new ( ) ;
1010+ for _ in 0 ..10 {
1011+ sources. push ( SourceToSchedule {
1012+ source_uid : source_id ( ) ,
1013+ source_type : SourceToScheduleType :: NonSharded {
1014+ num_pipelines : 4 ,
1015+ load_per_pipeline : NonZeroU32 :: new ( MAX_LOAD_PER_PIPELINE . cpu_millis ( ) ) . unwrap ( ) ,
1016+ } ,
1017+ params_fingerprint : 0 ,
1018+ } ) ;
1019+ }
1020+ let mut indexer_id_to_cpu_capacities = FnvHashMap :: default ( ) ;
1021+ indexer_id_to_cpu_capacities. insert ( indexer1. clone ( ) , mcpu ( 16_000 ) ) ;
1022+ indexer_id_to_cpu_capacities. insert ( indexer2. clone ( ) , mcpu ( 16_000 ) ) ;
1023+ indexer_id_to_cpu_capacities. insert ( indexer3. clone ( ) , mcpu ( 16_000 ) ) ;
1024+ let shard_locations = ShardLocations :: default ( ) ;
1025+ let indexing_plan = build_physical_indexing_plan (
1026+ & sources,
1027+ & indexer_id_to_cpu_capacities,
1028+ None ,
1029+ & shard_locations,
1030+ ) ;
1031+ assert_eq ! ( indexing_plan. indexing_tasks_per_indexer( ) . len( ) , 3 ) ;
1032+
1033+ for source in & sources {
1034+ let pipelines_per_indexer_for_source = indexing_plan
1035+ . indexing_tasks_per_indexer ( )
1036+ . values ( )
1037+ . map ( |tasks| {
1038+ tasks
1039+ . iter ( )
1040+ . filter ( |t| t. source_id == source. source_uid . source_id )
1041+ . count ( )
1042+ } )
1043+ . collect_vec ( ) ;
1044+ assert ! ( pipelines_per_indexer_for_source. contains( & 3 ) ) ;
1045+ assert ! ( pipelines_per_indexer_for_source. contains( & 1 ) ) ;
1046+ assert ! ( pipelines_per_indexer_for_source. contains( & 0 ) ) ;
1047+ assert_eq ! ( pipelines_per_indexer_for_source. iter( ) . sum:: <usize >( ) , 4 ) ;
1048+ }
1049+
1050+ for source in & mut sources {
1051+ if let SourceToScheduleType :: NonSharded { num_pipelines, .. } = & mut source. source_type
1052+ {
1053+ * num_pipelines = 5 ;
1054+ }
1055+ }
1056+
1057+ let new_indexing_plan = build_physical_indexing_plan (
1058+ & sources,
1059+ & indexer_id_to_cpu_capacities,
1060+ Some ( & indexing_plan) ,
1061+ & shard_locations,
1062+ ) ;
1063+
1064+ for source in & sources {
1065+ let pipelines_per_indexer_for_source = new_indexing_plan
1066+ . indexing_tasks_per_indexer ( )
1067+ . values ( )
1068+ . map ( |tasks| {
1069+ tasks
1070+ . iter ( )
1071+ . filter ( |t| t. source_id == source. source_uid . source_id )
1072+ . count ( )
1073+ } )
1074+ . collect_vec ( ) ;
1075+ assert ! ( pipelines_per_indexer_for_source. contains( & 3 ) ) ;
1076+ assert ! ( pipelines_per_indexer_for_source. contains( & 2 ) ) ;
1077+ assert ! ( pipelines_per_indexer_for_source. contains( & 0 ) ) ;
1078+ assert_eq ! ( pipelines_per_indexer_for_source. iter( ) . sum:: <usize >( ) , 5 ) ;
1079+ }
1080+ }
1081+
9421082 fn make_indexing_tasks (
9431083 source_uid : & SourceUid ,
9441084 shards : & [ ( PipelineUid , & [ ShardId ] ) ] ,
0 commit comments