85
85
86
86
logger : logging .Logger = logging .getLogger (__name__ )
87
87
88
+ RESERVED_MILLICPU = 100
89
+ RESERVED_MEMMB = 1024
90
+
88
91
RETRY_POLICIES : Mapping [str , Iterable [Mapping [str , str ]]] = {
89
92
RetryPolicy .REPLICA : [],
90
93
RetryPolicy .APPLICATION : [
152
155
153
156
ANNOTATION_ISTIO_SIDECAR = "sidecar.istio.io/inject"
154
157
158
+ LABEL_INSTANCE_TYPE = "node.kubernetes.io/instance-type"
159
+
155
160
156
161
def sanitize_for_serialization (obj : object ) -> object :
157
162
from kubernetes import client
@@ -176,21 +181,35 @@ def role_to_pod(name: str, role: Role, service_account: Optional[str]) -> "V1Pod
176
181
V1EmptyDirVolumeSource ,
177
182
)
178
183
184
+ # limits puts an upper cap on the resources a pod may consume.
185
+ # requests is how much the scheduler allocates. We assume that the jobs will
186
+ # be allocation the whole machine so requests is slightly lower than the
187
+ # requested resources to account for the Kubernetes node reserved resources.
188
+ limits = {}
179
189
requests = {}
180
190
181
191
resource = role .resource
182
- if resource .cpu >= 0 :
183
- requests ["cpu" ] = f"{ int (resource .cpu * 1000 )} m"
184
- if resource .memMB >= 0 :
185
- requests ["memory" ] = f"{ int (resource .memMB )} M"
186
- if resource .gpu >= 0 :
187
- requests ["nvidia.com/gpu" ] = str (resource .gpu )
192
+ if resource .cpu > 0 :
193
+ mcpu = int (resource .cpu * 1000 )
194
+ limits ["cpu" ] = f"{ mcpu } m"
195
+ request_mcpu = max (mcpu - RESERVED_MILLICPU , 0 )
196
+ requests ["cpu" ] = f"{ request_mcpu } m"
197
+ if resource .memMB > 0 :
198
+ limits ["memory" ] = f"{ int (resource .memMB )} M"
199
+ request_memMB = max (int (resource .memMB ) - RESERVED_MEMMB , 0 )
200
+ requests ["memory" ] = f"{ request_memMB } M"
201
+ if resource .gpu > 0 :
202
+ requests ["nvidia.com/gpu" ] = limits ["nvidia.com/gpu" ] = str (resource .gpu )
188
203
189
204
resources = V1ResourceRequirements (
190
- limits = requests ,
205
+ limits = limits ,
191
206
requests = requests ,
192
207
)
193
208
209
+ node_selector : Dict [str , str ] = {}
210
+ if LABEL_INSTANCE_TYPE in resource .capabilities :
211
+ node_selector [LABEL_INSTANCE_TYPE ] = resource .capabilities [LABEL_INSTANCE_TYPE ]
212
+
194
213
# To support PyTorch dataloaders we need to set /dev/shm to larger than the
195
214
# 64M default so we mount an unlimited sized tmpfs directory on it.
196
215
SHM_VOL = "dshm"
@@ -264,6 +283,7 @@ def role_to_pod(name: str, role: Role, service_account: Optional[str]) -> "V1Pod
264
283
restart_policy = "Never" ,
265
284
service_account_name = service_account ,
266
285
volumes = volumes ,
286
+ node_selector = node_selector ,
267
287
),
268
288
metadata = V1ObjectMeta (
269
289
annotations = {
@@ -416,6 +436,29 @@ class KubernetesScheduler(Scheduler, DockerWorkspace):
416
436
417
437
External docs: https://kubernetes.io/docs/concepts/storage/persistent-volumes/
418
438
439
+ **Resources / Allocation**
440
+
441
+ To select a specific machine type you can add a capability to your resources
442
+ with ``node.kubernetes.io/instance-type`` which will constrain the launched
443
+ jobs to nodes of that instance type.
444
+
445
+ >>> from torchx import specs
446
+ >>> specs.Resource(
447
+ ... cpu=4,
448
+ ... memMB=16000,
449
+ ... gpu=2,
450
+ ... capabilities={
451
+ ... "node.kubernetes.io/instance-type": "<cloud instance type>",
452
+ ... },
453
+ ... )
454
+ Resource(...)
455
+
456
+ Kubernetes may reserve some memory for the host. TorchX assumes you're
457
+ scheduling on whole hosts and thus will automatically reduce the resource
458
+ request by a small amount to account for the node reserved CPU and memory.
459
+ If you run into scheduling issues you may need to reduce the requested CPU
460
+ and memory from the host values.
461
+
419
462
**Compatibility**
420
463
421
464
.. compatibility::
0 commit comments