35
35
import java .util .IdentityHashMap ;
36
36
import java .util .Iterator ;
37
37
import java .util .List ;
38
+ import java .util .Locale ;
38
39
import java .util .Map ;
39
40
import java .util .PriorityQueue ;
40
41
import java .util .Set ;
@@ -552,6 +553,8 @@ private static ByteSizeValue getFreeBytesThreshold(
552
553
}
553
554
554
555
static class MergeTaskPriorityBlockingQueue extends PriorityBlockingQueueWithBudget <MergeTask > {
556
+ private static final Logger LOGGER = LogManager .getLogger (MergeTaskPriorityBlockingQueue .class );
557
+
555
558
MergeTaskPriorityBlockingQueue () {
556
559
// by default, start with 0 budget (so takes on this queue will always block until the first {@link #updateBudget} is invoked)
557
560
// use the estimated *remaining* merge size as the budget function so that the disk space budget of elements is updated
@@ -567,6 +570,55 @@ long getAvailableBudget() {
567
570
MergeTask peekQueue () {
568
571
return enqueuedByBudget .peek ().v1 ();
569
572
}
573
+
574
+ @ Override
575
+ void postBudgetUpdate () {
576
+ assert super .lock .isHeldByCurrentThread ();
577
+ Tuple <MergeTask , Long > head = enqueuedByBudget .peek ();
578
+ if (head != null && head .v2 () > availableBudget ) {
579
+ LOGGER .warn (
580
+ String .format (
581
+ Locale .ROOT ,
582
+ "There are merge tasks enqueued but there's insufficient disk space available to execute them "
583
+ + "(the smallest merge task requires [%d] bytes, but the available disk space is only [%d] bytes)" ,
584
+ head .v2 (),
585
+ availableBudget
586
+ )
587
+ );
588
+ if (LOGGER .isDebugEnabled ()) {
589
+ if (unreleasedBudgetPerElement .isEmpty ()) {
590
+ LOGGER .debug (
591
+ String .format (
592
+ Locale .ROOT ,
593
+ "There are no merge tasks currently running, "
594
+ + "but there are [%d] enqueued ones that are blocked because of insufficient disk space "
595
+ + "(the smallest merge task requires [%d] bytes, but the available disk space is only [%d] bytes)" ,
596
+ enqueuedByBudget .size (),
597
+ head .v2 (),
598
+ availableBudget
599
+ )
600
+ );
601
+ } else {
602
+ StringBuilder messageBuilder = new StringBuilder ();
603
+ messageBuilder .append ("The following merge tasks are currently running [" );
604
+ for (var runningMergeTask : super .unreleasedBudgetPerElement .entrySet ()) {
605
+ messageBuilder .append (runningMergeTask .getKey ().element ().toString ());
606
+ messageBuilder .append (" with disk space budgets in bytes " ).append (runningMergeTask .getValue ()).append (" , " );
607
+ }
608
+ messageBuilder .delete (messageBuilder .length () - 3 , messageBuilder .length ());
609
+ messageBuilder .append ("], and there are [" )
610
+ .append (enqueuedByBudget .size ())
611
+ .append ("] additional enqueued ones that are blocked because of insufficient disk space" );
612
+ messageBuilder .append (" (the smallest merge task requires [" )
613
+ .append (head .v2 ())
614
+ .append ("] bytes, but the available disk space is only [" )
615
+ .append (availableBudget )
616
+ .append ("] bytes)" );
617
+ LOGGER .debug (messageBuilder .toString ());
618
+ }
619
+ }
620
+ }
621
+ }
570
622
}
571
623
572
624
/**
@@ -576,7 +628,7 @@ MergeTask peekQueue() {
576
628
static class PriorityBlockingQueueWithBudget <E > {
577
629
private final ToLongFunction <? super E > budgetFunction ;
578
630
protected final PriorityQueue <Tuple <E , Long >> enqueuedByBudget ;
579
- private final IdentityHashMap <ElementWithReleasableBudget , Long > unreleasedBudgetPerElement ;
631
+ protected final IdentityHashMap <ElementWithReleasableBudget , Budgets > unreleasedBudgetPerElement ;
580
632
private final ReentrantLock lock ;
581
633
private final Condition elementAvailable ;
582
634
protected long availableBudget ;
@@ -637,15 +689,23 @@ void updateBudget(long availableBudget) {
637
689
// updates the budget of enqueued elements (and possibly reorders the priority queue)
638
690
updateBudgetOfEnqueuedElementsAndReorderQueue ();
639
691
// update the budget of dequeued, but still in-use elements (these are the elements that are consuming budget)
640
- unreleasedBudgetPerElement .replaceAll ((e , v ) -> budgetFunction .applyAsLong (e .element ()));
692
+ unreleasedBudgetPerElement .replaceAll ((e , v ) -> v . updateBudgetEstimation ( budgetFunction .applyAsLong (e .element () )));
641
693
// the available budget is decreased by the budget of still in-use elements (dequeued elements that are still in-use)
642
- this .availableBudget -= unreleasedBudgetPerElement .values ().stream ().mapToLong (i -> i ).sum ();
694
+ this .availableBudget -= unreleasedBudgetPerElement .values ()
695
+ .stream ()
696
+ .mapToLong (i -> i .latestBudgetEstimationForElement )
697
+ .sum ();
643
698
elementAvailable .signalAll ();
699
+ postBudgetUpdate ();
644
700
} finally {
645
701
lock .unlock ();
646
702
}
647
703
}
648
704
705
+ void postBudgetUpdate () {
706
+ assert lock .isHeldByCurrentThread ();
707
+ };
708
+
649
709
private void updateBudgetOfEnqueuedElementsAndReorderQueue () {
650
710
assert this .lock .isHeldByCurrentThread ();
651
711
int queueSizeBefore = enqueuedByBudget .size ();
@@ -686,7 +746,7 @@ private ElementWithReleasableBudget newElementWithReleasableBudget(E element, lo
686
746
ElementWithReleasableBudget elementWithReleasableBudget = new ElementWithReleasableBudget (element );
687
747
assert this .lock .isHeldByCurrentThread ();
688
748
// the taken element holds up some budget
689
- var prev = this .unreleasedBudgetPerElement .put (elementWithReleasableBudget , budget );
749
+ var prev = this .unreleasedBudgetPerElement .put (elementWithReleasableBudget , new Budgets ( budget , budget , this . availableBudget ) );
690
750
assert prev == null ;
691
751
this .availableBudget -= budget ;
692
752
assert this .availableBudget >= 0L ;
@@ -736,6 +796,16 @@ E element() {
736
796
return element ;
737
797
}
738
798
}
799
+
800
+ record Budgets (long initialBudgetEstimationForElement , long latestBudgetEstimationForElement , long initialTotalAvailableBudget ) {
801
+ Budgets updateBudgetEstimation (long latestBudgetEstimationForElement ) {
802
+ return new Budgets (
803
+ this .initialBudgetEstimationForElement ,
804
+ latestBudgetEstimationForElement ,
805
+ this .initialTotalAvailableBudget
806
+ );
807
+ }
808
+ }
739
809
}
740
810
741
811
private static long newTargetIORateBytesPerSec (
0 commit comments