1212// See the License for the specific language governing permissions and
1313// limitations under the License.
1414
15- use std:: cmp:: Reverse ;
15+ use std:: cmp:: { Ordering , Reverse } ;
1616use std:: collections:: BTreeMap ;
1717use std:: collections:: btree_map:: Entry ;
1818
19- use itertools:: Itertools ;
2019use quickwit_proto:: indexing:: CpuCapacity ;
2120
2221use super :: scheduling_logic_model:: * ;
@@ -229,21 +228,61 @@ fn assert_enforce_nodes_cpu_capacity_post_condition(
229228// If this algorithm fails to place all remaining shards, we inflate
230229// the node capacities by 20% in the scheduling problem and start from the beginning.
231230
231+ #[ derive( Debug , PartialEq , Eq , Ord ) ]
232+ struct PlacementCandidate {
233+ indexer_ord : IndexerOrd ,
234+ current_num_shards : u32 ,
235+ available_capacity : CpuCapacity ,
236+ affinity : u32 ,
237+ }
238+
239+ impl PartialOrd for PlacementCandidate {
240+ fn partial_cmp ( & self , other : & Self ) -> Option < Ordering > {
241+ // Higher affinity is better
242+ match self . affinity . cmp ( & other. affinity ) {
243+ Ordering :: Equal => { }
244+ ordering => return Some ( ordering. reverse ( ) ) ,
245+ }
246+ // If tie, pick the node with shards already assigned first
247+ match self . current_num_shards . cmp ( & other. current_num_shards ) {
248+ Ordering :: Equal => { }
249+ ordering => return Some ( ordering. reverse ( ) ) ,
250+ }
251+ // If tie, pick the node with the highest available capacity
252+ match self . available_capacity . cmp ( & other. available_capacity ) {
253+ Ordering :: Equal => { }
254+ ordering => return Some ( ordering. reverse ( ) ) ,
255+ }
256+ // Final tie-breaker: indexer ID for deterministic ordering
257+ Some ( self . indexer_ord . cmp ( & other. indexer_ord ) . reverse ( ) )
258+ }
259+ }
260+
232261fn attempt_place_unassigned_shards (
233262 unassigned_shards : & [ Source ] ,
234263 problem : & SchedulingProblem ,
235264 partial_solution : & SchedulingSolution ,
236265) -> Result < SchedulingSolution , NotEnoughCapacity > {
237266 let mut solution = partial_solution. clone ( ) ;
238267 for source in unassigned_shards {
239- let indexers_with_most_available_capacity =
240- compute_indexer_available_capacity ( problem, & solution)
241- . sorted_by_key ( |( indexer_ord, capacity) | Reverse ( ( * capacity, * indexer_ord) ) ) ;
242- place_unassigned_shards_single_source (
243- source,
244- indexers_with_most_available_capacity,
245- & mut solution,
246- ) ?;
268+ let mut placements: Vec < PlacementCandidate > = solution
269+ . indexer_assignments
270+ . iter ( )
271+ . map ( |indexer_assignment : & IndexerAssignment | {
272+ let available_capacity = indexer_assignment. indexer_available_capacity ( problem) ;
273+ assert ! ( available_capacity >= 0i32 ) ;
274+ let available_capacity = CpuCapacity :: from_cpu_millis ( available_capacity as u32 ) ;
275+ let current_num_shards = indexer_assignment. num_shards ( source. source_ord ) ;
276+ PlacementCandidate {
277+ affinity : 0 ,
278+ current_num_shards,
279+ available_capacity,
280+ indexer_ord : indexer_assignment. indexer_ord ,
281+ }
282+ } )
283+ . collect ( ) ;
284+ placements. sort ( ) ;
285+ place_unassigned_shards_single_source ( source, & placements, & mut solution) ?;
247286 }
248287 assert_place_unassigned_shards_post_condition ( problem, & solution) ;
249288 Ok ( solution)
@@ -259,27 +298,26 @@ fn place_unassigned_shards_with_affinity(
259298 Reverse ( load)
260299 } ) ;
261300 for source in & unassigned_shards {
262- // List of indexer with a non-null affinity and some available capacity, sorted by
263- // (affinity, available capacity) in that order.
264- let indexers_with_affinity_and_available_capacity = source
301+ let mut placements: Vec < PlacementCandidate > = source
265302 . affinities
266303 . iter ( )
267304 . filter ( |& ( _, & affinity) | affinity != 0u32 )
268- . map ( |( & indexer_ord, affinity) | {
305+ . map ( |( & indexer_ord, & affinity) | {
269306 let available_capacity =
270307 solution. indexer_assignments [ indexer_ord] . indexer_available_capacity ( problem) ;
271- let capacity = CpuCapacity :: from_cpu_millis ( available_capacity as u32 ) ;
272- ( indexer_ord, affinity, capacity)
273- } )
274- . sorted_by_key ( |( indexer_ord, affinity, capacity) | {
275- Reverse ( ( * affinity, * capacity, * indexer_ord) )
308+ let available_capacity = CpuCapacity :: from_cpu_millis ( available_capacity as u32 ) ;
309+ let current_num_shards =
310+ solution. indexer_assignments [ indexer_ord] . num_shards ( source. source_ord ) ;
311+ PlacementCandidate {
312+ affinity,
313+ current_num_shards,
314+ available_capacity,
315+ indexer_ord,
316+ }
276317 } )
277- . map ( |( indexer_ord, _, capacity) | ( indexer_ord, capacity) ) ;
278- let _ = place_unassigned_shards_single_source (
279- source,
280- indexers_with_affinity_and_available_capacity,
281- solution,
282- ) ;
318+ . collect ( ) ;
319+ placements. sort ( ) ;
320+ let _ = place_unassigned_shards_single_source ( source, & placements, solution) ;
283321 }
284322}
285323
@@ -350,22 +388,27 @@ struct NotEnoughCapacity;
350388/// amongst the node with their given node capacity.
351389fn place_unassigned_shards_single_source (
352390 source : & Source ,
353- mut indexer_with_capacities : impl Iterator < Item = ( IndexerOrd , CpuCapacity ) > ,
391+ sorted_candidates : & [ PlacementCandidate ] ,
354392 solution : & mut SchedulingSolution ,
355393) -> Result < ( ) , NotEnoughCapacity > {
356394 let mut num_shards = source. num_shards ;
357- while num_shards > 0 {
358- let Some ( ( indexer_ord, available_capacity) ) = indexer_with_capacities. next ( ) else {
359- return Err ( NotEnoughCapacity ) ;
360- } ;
395+ for PlacementCandidate {
396+ indexer_ord,
397+ available_capacity,
398+ ..
399+ } in sorted_candidates
400+ {
361401 let num_placable_shards = available_capacity. cpu_millis ( ) / source. load_per_shard ;
362402 let num_shards_to_place = num_placable_shards. min ( num_shards) ;
363403 // Update the solution, the shard load, and the number of shards to place.
364- solution. indexer_assignments [ indexer_ord]
404+ solution. indexer_assignments [ * indexer_ord]
365405 . add_shards ( source. source_ord , num_shards_to_place) ;
366406 num_shards -= num_shards_to_place;
407+ if num_shards == 0 {
408+ return Ok ( ( ) ) ;
409+ }
367410 }
368- Ok ( ( ) )
411+ Err ( NotEnoughCapacity )
369412}
370413
371414/// Compute the sources/shards that have not been assigned to any indexer yet.
@@ -394,30 +437,11 @@ fn compute_unassigned_sources(
394437 unassigned_sources. into_values ( ) . collect ( )
395438}
396439
397- /// Builds a BinaryHeap with the different indexer capacities.
398- ///
399- /// Panics if one of the indexer is over-assigned.
400- fn compute_indexer_available_capacity < ' a > (
401- problem : & ' a SchedulingProblem ,
402- solution : & ' a SchedulingSolution ,
403- ) -> impl Iterator < Item = ( IndexerOrd , CpuCapacity ) > + ' a {
404- solution
405- . indexer_assignments
406- . iter ( )
407- . map ( |indexer_assignment| {
408- let available_capacity: i32 = indexer_assignment. indexer_available_capacity ( problem) ;
409- assert ! ( available_capacity >= 0i32 ) ;
410- (
411- indexer_assignment. indexer_ord ,
412- CpuCapacity :: from_cpu_millis ( available_capacity as u32 ) ,
413- )
414- } )
415- }
416-
417440#[ cfg( test) ]
418441mod tests {
419442 use std:: num:: NonZeroU32 ;
420443
444+ use itertools:: Itertools ;
421445 use proptest:: prelude:: * ;
422446 use quickwit_proto:: indexing:: mcpu;
423447
0 commit comments