1717 */
1818package org .apache .drill .exec .planner .fragment ;
1919
20+ import com .fasterxml .jackson .core .JsonProcessingException ;
2021import org .apache .commons .lang3 .tuple .Pair ;
2122import org .apache .drill .common .exceptions .ExecutionSetupException ;
2223import org .apache .drill .common .util .function .CheckedConsumer ;
2930import org .apache .drill .exec .resourcemgr .config .QueryQueueConfig ;
3031import org .apache .drill .exec .resourcemgr .config .exception .QueueSelectionException ;
3132import org .apache .drill .exec .work .foreman .rm .QueryResourceManager ;
32-
33+ import com . fasterxml . jackson . databind . ObjectMapper ;
3334import java .util .ArrayList ;
3435import java .util .Collection ;
3536import java .util .HashMap ;
4647 * fragment is based on the cluster state and provided queue configuration.
4748 */
4849public class DistributedQueueParallelizer extends SimpleParallelizer {
50+ static final org .slf4j .Logger logger = org .slf4j .LoggerFactory .getLogger (DistributedQueueParallelizer .class );
4951 private final boolean planHasMemory ;
5052 private final QueryContext queryContext ;
5153 private final QueryResourceManager rm ;
@@ -65,9 +67,13 @@ public BiFunction<DrillbitEndpoint, PhysicalOperator, Long> getMemory() {
6567 if (!planHasMemory ) {
6668 final DrillNode drillEndpointNode = DrillNode .create (endpoint );
6769 if (operator .isBufferedOperator (queryContext )) {
68- return operators .get (drillEndpointNode ).get (operator );
70+ Long operatorsMemory = operators .get (drillEndpointNode ).get (operator );
71+ logger .debug (" Memory requirement for the operator {} in endpoint {} is {}" , operator , endpoint , operatorsMemory );
72+ return operatorsMemory ;
6973 } else {
70- return operator .getMaxAllocation ();
74+ Long nonBufferedMemory = (long )operator .getCost ().getMemoryCost ();
75+ logger .debug (" Memory requirement for the operator {} in endpoint {} is {}" , operator , endpoint , nonBufferedMemory );
76+ return nonBufferedMemory ;
7177 }
7278 }
7379 else {
@@ -92,10 +98,11 @@ public BiFunction<DrillbitEndpoint, PhysicalOperator, Long> getMemory() {
9298 */
9399 public void adjustMemory (PlanningSet planningSet , Set <Wrapper > roots ,
94100 Map <DrillbitEndpoint , String > onlineEndpointUUIDs ) throws ExecutionSetupException {
95-
96101 if (planHasMemory ) {
102+ logger .debug (" Plan already has memory settings. Adjustment of the memory is skipped" );
97103 return ;
98104 }
105+ logger .info (" Memory adjustment phase triggered" );
99106
100107 final Map <DrillNode , String > onlineDrillNodeUUIDs = onlineEndpointUUIDs .entrySet ().stream ()
101108 .collect (Collectors .toMap (x -> DrillNode .create (x .getKey ()), x -> x .getValue ()));
@@ -112,7 +119,7 @@ public void adjustMemory(PlanningSet planningSet, Set<Wrapper> roots,
112119
113120 for (Wrapper wrapper : roots ) {
114121 traverse (wrapper , CheckedConsumer .throwingConsumerWrapper ((Wrapper fragment ) -> {
115- MemoryCalculator calculator = new MemoryCalculator (planningSet , queryContext );
122+ MemoryCalculator calculator = new MemoryCalculator (planningSet , queryContext , rm . minimumOperatorMemory () );
116123 fragment .getNode ().getRoot ().accept (calculator , fragment );
117124 NodeResources .merge (totalNodeResources , fragment .getResourceMap ());
118125 operators .entrySet ()
@@ -122,6 +129,10 @@ public void adjustMemory(PlanningSet planningSet, Set<Wrapper> roots,
122129 }));
123130 }
124131
132+ if (logger .isDebugEnabled ()) {
133+ logger .debug (" Total node resource requirements for the plan is {}" , getJSONFromResourcesMap (totalNodeResources ));
134+ }
135+
125136 final QueryQueueConfig queueConfig ;
126137 try {
127138 queueConfig = this .rm .selectQueue (max (totalNodeResources .values ()));
@@ -130,8 +141,10 @@ public void adjustMemory(PlanningSet planningSet, Set<Wrapper> roots,
130141 }
131142
132143 Map <DrillNode ,
133- List <Pair <PhysicalOperator , Long >>> memoryAdjustedOperators = ensureOperatorMemoryWithinLimits (operators , totalNodeResources ,
134- queueConfig .getMaxQueryMemoryInMBPerNode ());
144+ List <Pair <PhysicalOperator , Long >>> memoryAdjustedOperators =
145+ ensureOperatorMemoryWithinLimits (operators , totalNodeResources ,
146+ convertMBToBytes (Math .min (queueConfig .getMaxQueryMemoryInMBPerNode (),
147+ queueConfig .getQueueTotalMemoryInMB (onlineEndpointUUIDs .size ()))));
135148 memoryAdjustedOperators .entrySet ().stream ().forEach ((x ) -> {
136149 Map <PhysicalOperator , Long > memoryPerOperator = x .getValue ().stream ()
137150 .collect (Collectors .toMap (operatorLongPair -> operatorLongPair .getLeft (),
@@ -140,9 +153,17 @@ public void adjustMemory(PlanningSet planningSet, Set<Wrapper> roots,
140153 this .operators .put (x .getKey (), memoryPerOperator );
141154 });
142155
156+ if (logger .isDebugEnabled ()) {
157+ logger .debug (" Total node resource requirements after adjustment {}" , getJSONFromResourcesMap (totalNodeResources ));
158+ }
159+
143160 this .rm .setCost (convertToUUID (totalNodeResources , onlineDrillNodeUUIDs ));
144161 }
145162
163+ private long convertMBToBytes (long value ) {
164+ return value * 1024 * 1024 ;
165+ }
166+
146167 private Map <String , NodeResources > convertToUUID (Map <DrillNode , NodeResources > nodeResourcesMap ,
147168 Map <DrillNode , String > onlineDrillNodeUUIDs ) {
148169 Map <String , NodeResources > nodeResourcesPerUUID = new HashMap <>();
@@ -172,50 +193,81 @@ private NodeResources max(Collection<NodeResources> resources) {
172193 */
173194 private Map <DrillNode , List <Pair <PhysicalOperator , Long >>>
174195 ensureOperatorMemoryWithinLimits (Map <DrillNode , List <Pair <PhysicalOperator , Long >>> memoryPerOperator ,
175- Map <DrillNode , NodeResources > nodeResourceMap , long nodeLimit ) {
196+ Map <DrillNode , NodeResources > nodeResourceMap , long nodeLimit ) throws ExecutionSetupException {
176197 // Get the physical operators which are above the node memory limit.
177- Map <DrillNode , List <Pair <PhysicalOperator , Long >>> onlyMemoryAboveLimitOperators = new HashMap <>();
178- memoryPerOperator .entrySet ().stream ().forEach ((entry ) -> {
179- onlyMemoryAboveLimitOperators .putIfAbsent (entry .getKey (), new ArrayList <>());
180- if (nodeResourceMap .get (entry .getKey ()).getMemoryInBytes () > nodeLimit ) {
181- onlyMemoryAboveLimitOperators .get (entry .getKey ()).addAll (entry .getValue ());
182- }
183- });
184-
198+ Map <DrillNode ,
199+ List <Pair <PhysicalOperator , Long >>> onlyMemoryAboveLimitOperators = memoryPerOperator .entrySet ()
200+ .stream ()
201+ .filter (entry -> nodeResourceMap .get (entry .getKey ()).getMemoryInBytes () > nodeLimit )
202+ .collect (Collectors .toMap (entry -> entry .getKey (), entry -> entry .getValue ()));
185203
186204 // Compute the total memory required by the physical operators on the drillbits which are above node limit.
187205 // Then use the total memory to adjust the memory requirement based on the permissible node limit.
188206 Map <DrillNode , List <Pair <PhysicalOperator , Long >>> memoryAdjustedDrillbits = new HashMap <>();
189207 onlyMemoryAboveLimitOperators .entrySet ().stream ().forEach (
190- entry -> {
191- Long totalMemory = entry .getValue ().stream ().mapToLong (Pair ::getValue ).sum ();
192- List <Pair <PhysicalOperator , Long >> adjustedMemory = entry .getValue ().stream ().map (operatorMemory -> {
208+ CheckedConsumer .throwingConsumerWrapper (entry -> {
209+ Long totalBufferedOperatorsMemoryReq = entry .getValue ().stream ().mapToLong (Pair ::getValue ).sum ();
210+ Long nonBufferedOperatorsMemoryReq = nodeResourceMap .get (entry .getKey ()).getMemoryInBytes () - totalBufferedOperatorsMemoryReq ;
211+ Long bufferedOperatorsMemoryLimit = nodeLimit - nonBufferedOperatorsMemoryReq ;
212+ if (bufferedOperatorsMemoryLimit < 0 || nonBufferedOperatorsMemoryReq < 0 ) {
213+ logger .error (" Operator memory requirements for buffered operators {} or non buffered operators {} is negative" , bufferedOperatorsMemoryLimit ,
214+ nonBufferedOperatorsMemoryReq );
215+ throw new ExecutionSetupException ("Operator memory requirements for buffered operators " + bufferedOperatorsMemoryLimit + " or non buffered operators " +
216+ nonBufferedOperatorsMemoryReq + " is less than zero" );
217+ }
218+ List <Pair <PhysicalOperator , Long >> adjustedMemory = entry .getValue ().stream ().map (operatorAndMemory -> {
193219 // formula to adjust the memory is (optimalMemory / totalMemory(this is for all the operators)) * permissible_node_limit.
194- return Pair .of (operatorMemory .getKey (), (long ) Math .ceil (operatorMemory .getValue ()/totalMemory * nodeLimit ));
220+ return Pair .of (operatorAndMemory .getKey (),
221+ Math .max (this .rm .minimumOperatorMemory (),
222+ (long ) Math .ceil (operatorAndMemory .getValue ()/totalBufferedOperatorsMemoryReq * bufferedOperatorsMemoryLimit )));
195223 }).collect (Collectors .toList ());
196224 memoryAdjustedDrillbits .put (entry .getKey (), adjustedMemory );
197225 NodeResources nodeResources = nodeResourceMap .get (entry .getKey ());
198- nodeResources .setMemoryInBytes (adjustedMemory .stream ().mapToLong (Pair ::getValue ).sum ());
199- }
226+ nodeResources .setMemoryInBytes (nonBufferedOperatorsMemoryReq + adjustedMemory .stream ().mapToLong (Pair ::getValue ).sum ());
227+ })
200228 );
201229
230+ checkIfWithinLimit (nodeResourceMap , nodeLimit );
231+
202232 // Get all the operations on drillbits which were adjusted for memory and merge them with operators which are not
203233 // adjusted for memory.
204- Map <DrillNode , List <Pair <PhysicalOperator , Long >>> allDrillbits = new HashMap <>();
205- memoryPerOperator .entrySet ().stream ().filter ((entry ) -> !memoryAdjustedDrillbits .containsKey (entry .getKey ())).forEach (
206- operatorMemory -> {
207- allDrillbits .put (operatorMemory .getKey (), operatorMemory .getValue ());
208- }
209- );
234+ Map <DrillNode ,
235+ List <Pair <PhysicalOperator , Long >>> allDrillbits = memoryPerOperator .entrySet ()
236+ .stream ()
237+ .filter ((entry ) -> !memoryAdjustedDrillbits .containsKey (entry .getKey ()))
238+ .collect (Collectors .toMap (entry -> entry .getKey (), entry -> entry .getValue ()));
210239
211240 memoryAdjustedDrillbits .entrySet ().stream ().forEach (
212- operatorMemory -> {
213- allDrillbits .put (operatorMemory .getKey (), operatorMemory .getValue ());
214- }
215- );
241+ operatorMemory -> allDrillbits .put (operatorMemory .getKey (), operatorMemory .getValue ()));
216242
217243 // At this point allDrillbits contains the operators on all drillbits. The memory also is adjusted based on the nodeLimit and
218244 // the ratio of their requirements.
219245 return allDrillbits ;
220246 }
247+
248+ private void checkIfWithinLimit (Map <DrillNode , NodeResources > nodeResourcesMap , long nodeLimit ) throws ExecutionSetupException {
249+ for (Map .Entry <DrillNode , NodeResources > entry : nodeResourcesMap .entrySet ()) {
250+ if (entry .getValue ().getMemoryInBytes () > nodeLimit ) {
251+ logger .error (" Memory requirement for the query cannot be adjusted." +
252+ " Memory requirement {} (in bytes) for a node {} is greater than limit {}" , entry .getValue ()
253+ .getMemoryInBytes (), entry .getKey (), nodeLimit );
254+ throw new ExecutionSetupException ("Minimum memory requirement "
255+ + entry .getValue ().getMemoryInBytes () + " for a node " + entry .getKey () + " is greater than limit: " + nodeLimit );
256+ }
257+ }
258+ }
259+
260+ private String getJSONFromResourcesMap (Map <DrillNode , NodeResources > resourcesMap ) {
261+ String json = "" ;
262+ try {
263+ json = new ObjectMapper ().writeValueAsString (resourcesMap .entrySet ()
264+ .stream ()
265+ .collect (Collectors .toMap (entry -> entry .getKey ()
266+ .toString (), Map .Entry ::getValue )));
267+ } catch (JsonProcessingException exception ) {
268+ logger .error (" Cannot convert the Node resources map to json " );
269+ }
270+
271+ return json ;
272+ }
221273}
0 commit comments