@@ -216,12 +216,14 @@ fn attempt_place_unassigned_shards(
216216)  -> Result < SchedulingSolution ,  NotEnoughCapacity >  { 
217217    let  mut  solution = partial_solution. clone ( ) ; 
218218    for  source in  unassigned_shards { 
219-         let  indexers_with_most_available_capacity =
220-             compute_indexer_available_capacity ( problem,  & solution) 
221-                 . sorted_by_key ( |( indexer_ord,  capacity) | Reverse ( ( * capacity,  * indexer_ord) ) ) ; 
219+         let  mut  indexers_with_most_available_capacity =
220+             compute_indexer_available_capacity ( problem,  & solution) . collect_vec ( ) ; 
221+         indexers_with_most_available_capacity
222+             . sort_by_key ( |( indexer_ord,  capacity) | Reverse ( ( * capacity,  * indexer_ord) ) ) ; 
222223        place_unassigned_shards_single_source ( 
223224            source, 
224225            indexers_with_most_available_capacity, 
226+             problem. unscaled_indexer_cpu_capacities ( ) , 
225227            & mut  solution, 
226228        ) ?; 
227229    } 
@@ -241,7 +243,7 @@ fn place_unassigned_shards_with_affinity(
241243    for  source in  & unassigned_shards { 
242244        // List of indexer with a non-null affinity and some available capacity, sorted by 
243245        // (affinity, available capacity) in that order. 
244-         let  indexers_with_affinity_and_available_capacity  = source
246+         let  indexers_with_available_capacity  = source
245247            . affinities 
246248            . iter ( ) 
247249            . filter ( |& ( _,  & affinity) | affinity != 0u32 ) 
@@ -254,10 +256,12 @@ fn place_unassigned_shards_with_affinity(
254256            . sorted_by_key ( |( indexer_ord,  affinity,  capacity) | { 
255257                Reverse ( ( * affinity,  * capacity,  * indexer_ord) ) 
256258            } ) 
257-             . map ( |( indexer_ord,  _,  capacity) | ( indexer_ord,  capacity) ) ; 
259+             . map ( |( indexer_ord,  _,  capacity) | ( indexer_ord,  capacity) ) 
260+             . collect_vec ( ) ; 
258261        let  _ = place_unassigned_shards_single_source ( 
259262            source, 
260-             indexers_with_affinity_and_available_capacity, 
263+             indexers_with_available_capacity, 
264+             problem. unscaled_indexer_cpu_capacities ( ) , 
261265            solution, 
262266        ) ; 
263267    } 
@@ -346,26 +350,73 @@ struct NotEnoughCapacity;
346350/// amongst the node with their given node capacity. 
347351fn  place_unassigned_shards_single_source ( 
348352    source :  & Source , 
349-     mut  indexer_with_capacities :  impl  Iterator < Item  = ( IndexerOrd ,  CpuCapacity ) > , 
353+     mut  indexer_with_capacities :  Vec < ( IndexerOrd ,  CpuCapacity ) > , 
354+     unscaled_capacities :  & [ CpuCapacity ] , 
350355    solution :  & mut  SchedulingSolution , 
351356)  -> Result < ( ) ,  NotEnoughCapacity >  { 
352357    let  mut  num_shards = source. num_shards ; 
353-     while  num_shards > 0  { 
354-         let  Some ( ( indexer_ord,  available_capacity) )  = indexer_with_capacities. next ( )  else  { 
355-             return  Err ( NotEnoughCapacity ) ; 
356-         } ; 
357-         let  num_placable_shards = available_capacity. cpu_millis ( )  / source. load_per_shard ; 
358-         let  num_shards_to_place = num_placable_shards. min ( num_shards) ; 
358+     let  mut  previous_num_shards = u32:: MAX ; 
359+     while  previous_num_shards > num_shards { 
360+         previous_num_shards = num_shards; 
361+         let  indexer_with_capacities_iter = indexer_with_capacities
362+             . iter_mut ( ) 
363+             . map ( |( indexer_ord,  available_capacity) | ( * indexer_ord,  available_capacity) ) ; 
364+         place_unassigned_shards_single_source_iteration ( 
365+             source, 
366+             & mut  num_shards, 
367+             indexer_with_capacities_iter, 
368+             unscaled_capacities, 
369+             solution, 
370+         ) ; 
371+         if  num_shards == 0  { 
372+             // All shards have been placed. 
373+             return  Ok ( ( ) ) ; 
374+         } 
375+     } 
376+     // Last placement iteration didn't make progress, 
377+     // we won't be able to place the remaining shards 
378+     Err ( NotEnoughCapacity ) 
379+ } 
380+ 
381+ /// Places as many shards as possible to indexers while respecting both the the 
382+ /// remaining scaled node capacities and the original unscaled node capacities. 
383+ fn  place_unassigned_shards_single_source_iteration < ' a > ( 
384+     source :  & Source , 
385+     remaining_shards_to_place :  & mut  u32 , 
386+     indexer_with_capacities :  impl  Iterator < Item  = ( IndexerOrd ,  & ' a  mut  CpuCapacity ) > , 
387+     unscaled_capacities :  & [ CpuCapacity ] , 
388+     solution :  & mut  SchedulingSolution , 
389+ )  { 
390+     for  ( indexer_ord,  available_capacity)  in  indexer_with_capacities { 
391+         if  * remaining_shards_to_place == 0  { 
392+             return ; 
393+         } 
394+         let  num_placable_shards_into_scaled_capacity =
395+             available_capacity. cpu_millis ( )  / source. load_per_shard ; 
396+ 
397+         // We limit each node's shard allocation per iteration to what fits in 
398+         // its original capacity. This introduces a behavior that distributes 
399+         // shards more evenly accross nodes when the system capacity is 
400+         // over-subscribed. If the shard's load doesn't fit into the original 
401+         // capacity, we still allow one shard to be placed. 
402+         let  num_placable_shards_into_original_capacity =
403+             ( unscaled_capacities[ indexer_ord] . cpu_millis ( )  / source. load_per_shard ) . max ( 1 ) ; 
404+ 
405+         let  num_shards_to_place = num_placable_shards_into_scaled_capacity
406+             . min ( num_placable_shards_into_original_capacity) 
407+             . min ( * remaining_shards_to_place) ; 
408+ 
359409        // Update the solution, the shard load, and the number of shards to place. 
360410        if  num_shards_to_place == 0u32  { 
361411            // No need to fill indexer_assignments with empty assignments. 
362412            continue ; 
363413        } 
364414        solution. indexer_assignments [ indexer_ord] 
365415            . add_shards ( source. source_ord ,  num_shards_to_place) ; 
366-         num_shards -= num_shards_to_place; 
416+         * remaining_shards_to_place -= num_shards_to_place; 
417+         * available_capacity = * available_capacity
418+             - CpuCapacity :: from_cpu_millis ( num_shards_to_place *  source. load_per_shard . get ( ) ) ; 
367419    } 
368-     Ok ( ( ) ) 
369420} 
370421
371422/// Compute the sources/shards that have not been assigned to any indexer yet. 
@@ -419,7 +470,7 @@ mod tests {
419470    use  std:: num:: NonZeroU32 ; 
420471
421472    use  proptest:: prelude:: * ; 
422-     use  quickwit_proto:: indexing:: mcpu; 
473+     use  quickwit_proto:: indexing:: { PIPELINE_FULL_CAPACITY ,   mcpu} ; 
423474
424475    use  super :: * ; 
425476
@@ -783,4 +834,28 @@ mod tests {
783834            solve( problem,  solution) ; 
784835        } 
785836    } 
837+ 
838+     #[ test]  
839+     fn  test_oversubscribing_sources_get_balanced ( )  { 
840+         let  mut  problem:  SchedulingProblem  = SchedulingProblem :: with_indexer_cpu_capacities ( vec ! [ 
841+             mcpu( 8000 ) , 
842+             mcpu( 8000 ) , 
843+             mcpu( 8000 ) , 
844+             mcpu( 8000 ) , 
845+         ] ) ; 
846+         for  _ in  0 ..12  { 
847+             problem. add_source ( 
848+                 4 , 
849+                 NonZeroU32 :: new ( PIPELINE_FULL_CAPACITY . cpu_millis ( ) ) . unwrap ( ) , 
850+             ) ; 
851+         } 
852+ 
853+         let  old_solution = problem. new_solution ( ) ; 
854+         let  solution = solve ( problem,  old_solution) ; 
855+         for  assignement in  & solution. indexer_assignments  { 
856+             for  & num_shards in  assignement. num_shards_per_source . values ( )  { 
857+                 assert_eq ! ( num_shards,  2 ) ; 
858+             } 
859+         } 
860+     } 
786861} 
0 commit comments