24
24
import org .apache .hadoop .yarn .api .records .ContainerExitStatus ;
25
25
import org .apache .hadoop .yarn .api .records .ContainerId ;
26
26
import org .apache .hadoop .yarn .api .records .ExecutionType ;
27
+ import org .apache .hadoop .yarn .api .records .Resource ;
27
28
import org .apache .hadoop .yarn .api .records .ResourceUtilization ;
28
29
import org .apache .hadoop .yarn .conf .YarnConfiguration ;
29
30
import org .apache .hadoop .yarn .event .AsyncDispatcher ;
46
47
import org .apache .hadoop .yarn .server .nodemanager .recovery .NMStateStoreService
47
48
.RecoveredContainerState ;
48
49
import org .apache .hadoop .yarn .server .nodemanager .recovery .NMStateStoreService .RecoveredContainerStatus ;
50
+ import org .apache .hadoop .yarn .util .resource .Resources ;
49
51
import org .slf4j .Logger ;
50
52
import org .slf4j .LoggerFactory ;
51
53
@@ -74,6 +76,7 @@ public class ContainerScheduler extends AbstractService implements
74
76
private final Context context ;
75
77
// Capacity of the queue for opportunistic Containers.
76
78
private final int maxOppQueueLength ;
79
+ private final boolean forceStartGuaranteedContainers ;
77
80
78
81
// Queue of Guaranteed Containers waiting for resources to run
79
82
private final LinkedHashMap <ContainerId , Container >
@@ -106,9 +109,39 @@ public class ContainerScheduler extends AbstractService implements
106
109
107
110
private final AsyncDispatcher dispatcher ;
108
111
private final NodeManagerMetrics metrics ;
112
+ private final OpportunisticContainersQueuePolicy oppContainersQueuePolicy ;
109
113
110
114
private Boolean usePauseEventForPreemption = false ;
111
115
116
+ private static int getMaxOppQueueLengthFromConf (final Context context ) {
117
+ if (context == null || context .getConf () == null ) {
118
+ return YarnConfiguration
119
+ .DEFAULT_NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH ;
120
+ }
121
+
122
+ return context .getConf ().getInt (
123
+ YarnConfiguration .NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH ,
124
+ YarnConfiguration .DEFAULT_NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH
125
+ );
126
+ }
127
+
128
+ private static OpportunisticContainersQueuePolicy
129
+ getOppContainersQueuePolicyFromConf (final Context context ) {
130
+ final String queuePolicy ;
131
+ if (context == null || context .getConf () == null ) {
132
+ queuePolicy = YarnConfiguration
133
+ .DEFAULT_NM_OPPORTUNISTIC_CONTAINERS_QUEUE_POLICY ;
134
+ } else {
135
+ queuePolicy = context .getConf ().get (
136
+ YarnConfiguration .NM_OPPORTUNISTIC_CONTAINERS_QUEUE_POLICY ,
137
+ YarnConfiguration
138
+ .DEFAULT_NM_OPPORTUNISTIC_CONTAINERS_QUEUE_POLICY
139
+ );
140
+ }
141
+
142
+ return OpportunisticContainersQueuePolicy .valueOf (queuePolicy );
143
+ }
144
+
112
145
@ VisibleForTesting
113
146
ResourceHandlerChain resourceHandlerChain = null ;
114
147
@@ -120,10 +153,9 @@ public class ContainerScheduler extends AbstractService implements
120
153
*/
121
154
public ContainerScheduler (Context context , AsyncDispatcher dispatcher ,
122
155
NodeManagerMetrics metrics ) {
123
- this (context , dispatcher , metrics , context .getConf ().getInt (
124
- YarnConfiguration .NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH ,
125
- YarnConfiguration .
126
- DEFAULT_NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH ));
156
+ this (context , dispatcher , metrics ,
157
+ getOppContainersQueuePolicyFromConf (context ),
158
+ getMaxOppQueueLengthFromConf (context ));
127
159
}
128
160
129
161
@@ -149,13 +181,35 @@ public void serviceInit(Configuration conf) throws Exception {
149
181
@ VisibleForTesting
150
182
public ContainerScheduler (Context context , AsyncDispatcher dispatcher ,
151
183
NodeManagerMetrics metrics , int qLength ) {
184
+ this (context , dispatcher , metrics ,
185
+ getOppContainersQueuePolicyFromConf (context ), qLength );
186
+ }
187
+
188
+ @ VisibleForTesting
189
+ public ContainerScheduler (Context context , AsyncDispatcher dispatcher ,
190
+ NodeManagerMetrics metrics ,
191
+ OpportunisticContainersQueuePolicy oppContainersQueuePolicy ,
192
+ int qLength ) {
152
193
super (ContainerScheduler .class .getName ());
153
194
this .context = context ;
154
195
this .dispatcher = dispatcher ;
155
196
this .metrics = metrics ;
156
- this .maxOppQueueLength = (qLength <= 0 ) ? 0 : qLength ;
157
197
this .utilizationTracker =
158
198
new AllocationBasedResourceUtilizationTracker (this );
199
+ this .oppContainersQueuePolicy = oppContainersQueuePolicy ;
200
+ switch (oppContainersQueuePolicy ) {
201
+ case BY_RESOURCES :
202
+ this .maxOppQueueLength = 0 ;
203
+ this .forceStartGuaranteedContainers = false ;
204
+ LOG .info ("Setting max opportunistic queue length to 0,"
205
+ + " as {} is incompatible with queue length" ,
206
+ oppContainersQueuePolicy );
207
+ break ;
208
+ case BY_QUEUE_LEN :
209
+ default :
210
+ this .maxOppQueueLength = qLength ;
211
+ this .forceStartGuaranteedContainers = (maxOppQueueLength <= 0 );
212
+ }
159
213
this .opportunisticContainersStatus =
160
214
OpportunisticContainersStatus .newInstance ();
161
215
}
@@ -187,7 +241,7 @@ public void handle(ContainerSchedulerEvent event) {
187
241
shedQueuedOpportunisticContainers ();
188
242
break ;
189
243
case RECOVERY_COMPLETED :
190
- startPendingContainers (maxOppQueueLength <= 0 );
244
+ startPendingContainers (forceStartGuaranteedContainers );
191
245
metrics .setQueuedContainers (queuedOpportunisticContainers .size (),
192
246
queuedGuaranteedContainers .size ());
193
247
break ;
@@ -243,7 +297,7 @@ private void onUpdateContainer(UpdateContainerSchedulerEvent updateEvent) {
243
297
LOG .warn (String .format ("Could not update resources on " +
244
298
"continer update of %s" , containerId ), ex );
245
299
}
246
- startPendingContainers (maxOppQueueLength <= 0 );
300
+ startPendingContainers (forceStartGuaranteedContainers );
247
301
metrics .setQueuedContainers (queuedOpportunisticContainers .size (),
248
302
queuedGuaranteedContainers .size ());
249
303
}
@@ -371,7 +425,6 @@ private void onResourcesReclaimed(Container container) {
371
425
ExecutionType .OPPORTUNISTIC ) {
372
426
this .metrics .completeOpportunisticContainer (container .getResource ());
373
427
}
374
- boolean forceStartGuaranteedContainers = (maxOppQueueLength <= 0 );
375
428
startPendingContainers (forceStartGuaranteedContainers );
376
429
}
377
430
this .metrics .setQueuedContainers (queuedOpportunisticContainers .size (),
@@ -380,13 +433,13 @@ private void onResourcesReclaimed(Container container) {
380
433
381
434
/**
382
435
* Start pending containers in the queue.
383
- * @param forceStartGuaranteedContaieners When this is true, start guaranteed
436
+ * @param forceStartGContainers When this is true, start guaranteed
384
437
* container without looking at available resource
385
438
*/
386
- private void startPendingContainers (boolean forceStartGuaranteedContaieners ) {
439
+ private void startPendingContainers (boolean forceStartGContainers ) {
387
440
// Start guaranteed containers that are paused, if resources available.
388
441
boolean resourcesAvailable = startContainers (
389
- queuedGuaranteedContainers .values (), forceStartGuaranteedContaieners );
442
+ queuedGuaranteedContainers .values (), forceStartGContainers );
390
443
// Start opportunistic containers, if resources available.
391
444
if (resourcesAvailable ) {
392
445
startContainers (queuedOpportunisticContainers .values (), false );
@@ -429,6 +482,21 @@ private boolean resourceAvailableToStartContainer(Container container) {
429
482
return this .utilizationTracker .hasResourcesAvailable (container );
430
483
}
431
484
485
+ private boolean resourceAvailableToQueueOppContainer (
486
+ Container newOppContainer ) {
487
+ final Resource cumulativeResource = Resource .newInstance (Resources .none ());
488
+ for (final Container container : queuedGuaranteedContainers .values ()) {
489
+ Resources .addTo (cumulativeResource , container .getResource ());
490
+ }
491
+
492
+ for (final Container container : queuedOpportunisticContainers .values ()) {
493
+ Resources .addTo (cumulativeResource , container .getResource ());
494
+ }
495
+
496
+ Resources .addTo (cumulativeResource , newOppContainer .getResource ());
497
+ return this .utilizationTracker .hasResourcesAvailable (cumulativeResource );
498
+ }
499
+
432
500
private boolean enqueueContainer (Container container ) {
433
501
boolean isGuaranteedContainer = container .getContainerTokenIdentifier ().
434
502
getExecutionType () == ExecutionType .GUARANTEED ;
@@ -438,7 +506,21 @@ private boolean enqueueContainer(Container container) {
438
506
queuedGuaranteedContainers .put (container .getContainerId (), container );
439
507
isQueued = true ;
440
508
} else {
441
- if (queuedOpportunisticContainers .size () < maxOppQueueLength ) {
509
+ switch (oppContainersQueuePolicy ) {
510
+ case BY_RESOURCES :
511
+ isQueued = resourceAvailableToQueueOppContainer (container );
512
+ break ;
513
+ case BY_QUEUE_LEN :
514
+ default :
515
+ if (maxOppQueueLength <= 0 ) {
516
+ isQueued = false ;
517
+ } else {
518
+ isQueued =
519
+ queuedOpportunisticContainers .size () < maxOppQueueLength ;
520
+ }
521
+ }
522
+
523
+ if (isQueued ) {
442
524
LOG .info ("Opportunistic container {} will be queued at the NM." ,
443
525
container .getContainerId ());
444
526
queuedOpportunisticContainers .put (
@@ -451,7 +533,6 @@ private boolean enqueueContainer(Container container) {
451
533
container .sendKillEvent (
452
534
ContainerExitStatus .KILLED_BY_CONTAINER_SCHEDULER ,
453
535
"Opportunistic container queue is full." );
454
- isQueued = false ;
455
536
}
456
537
}
457
538
@@ -484,7 +565,6 @@ protected void scheduleContainer(Container container) {
484
565
// When opportunistic container not allowed (which is determined by
485
566
// max-queue length of pending opportunistic containers <= 0), start
486
567
// guaranteed containers without looking at available resources.
487
- boolean forceStartGuaranteedContainers = (maxOppQueueLength <= 0 );
488
568
startPendingContainers (forceStartGuaranteedContainers );
489
569
490
570
// if the guaranteed container is queued, we need to preempt opportunistic
0 commit comments