36
36
RayClusterStatus ,
37
37
)
38
38
39
+ from kubernetes import client , config
40
+
41
+ import yaml
42
+
39
43
40
44
class Cluster :
41
45
"""
@@ -108,8 +112,17 @@ def up(self):
108
112
"""
109
113
namespace = self .config .namespace
110
114
try :
111
- with oc .project (namespace ):
112
- oc .invoke ("apply" , ["-f" , self .app_wrapper_yaml ])
115
+ config .load_kube_config ()
116
+ api_instance = client .CustomObjectsApi ()
117
+ with open (self .app_wrapper_yaml ) as f :
118
+ aw = yaml .load (f , Loader = yaml .FullLoader )
119
+ api_instance .create_namespaced_custom_object (
120
+ group = "mcad.ibm.com" ,
121
+ version = "v1beta1" ,
122
+ namespace = namespace ,
123
+ plural = "appwrappers" ,
124
+ body = aw ,
125
+ )
113
126
except oc .OpenShiftPythonException as osp : # pragma: no cover
114
127
error_msg = osp .result .err ()
115
128
if "Unauthorized" in error_msg :
@@ -125,8 +138,15 @@ def down(self):
125
138
"""
126
139
namespace = self .config .namespace
127
140
try :
128
- with oc .project (namespace ):
129
- oc .invoke ("delete" , ["AppWrapper" , self .app_wrapper_name ])
141
+ config .load_kube_config ()
142
+ api_instance = client .CustomObjectsApi ()
143
+ api_instance .delete_namespaced_custom_object (
144
+ group = "mcad.ibm.com" ,
145
+ version = "v1beta1" ,
146
+ namespace = namespace ,
147
+ plural = "appwrappers" ,
148
+ name = self .app_wrapper_name ,
149
+ )
130
150
except oc .OpenShiftPythonException as osp : # pragma: no cover
131
151
error_msg = osp .result .err ()
132
152
if (
@@ -320,14 +340,16 @@ def list_all_queued(namespace: str, print_to_console: bool = True):
320
340
321
341
322
342
def _app_wrapper_status (name , namespace = "default" ) -> Optional [AppWrapper ]:
323
- cluster = None
324
343
try :
325
- with oc .project (namespace ), oc .timeout (10 * 60 ):
326
- cluster = oc .selector (f"appwrapper/{ name } " ).object ()
344
+ config .load_kube_config ()
345
+ api_instance = client .CustomObjectsApi ()
346
+ aws = api_instance .list_namespaced_custom_object (
347
+ group = "mcad.ibm.com" ,
348
+ version = "v1beta1" ,
349
+ namespace = namespace ,
350
+ plural = "appwrappers" ,
351
+ )
327
352
except oc .OpenShiftPythonException as osp : # pragma: no cover
328
- msg = osp .msg
329
- if "Expected a single object, but selected 0" in msg :
330
- return cluster
331
353
error_msg = osp .result .err ()
332
354
if not (
333
355
'the server doesn\' t have a resource type "appwrapper"' in error_msg
@@ -337,21 +359,23 @@ def _app_wrapper_status(name, namespace="default") -> Optional[AppWrapper]:
337
359
):
338
360
raise osp
339
361
340
- if cluster :
341
- return _map_to_app_wrapper ( cluster )
342
-
343
- return cluster
362
+ for aw in aws [ "items" ] :
363
+ if aw [ "metadata" ][ "name" ] == name :
364
+ return _map_to_app_wrapper ( aw )
365
+ return None
344
366
345
367
346
368
def _ray_cluster_status (name , namespace = "default" ) -> Optional [RayCluster ]:
347
- cluster = None
348
369
try :
349
- with oc .project (namespace ), oc .timeout (10 * 60 ):
350
- cluster = oc .selector (f"rayclusters/{ name } " ).object ()
370
+ config .load_kube_config ()
371
+ api_instance = client .CustomObjectsApi ()
372
+ rcs = api_instance .list_namespaced_custom_object (
373
+ group = "ray.io" ,
374
+ version = "v1alpha1" ,
375
+ namespace = namespace ,
376
+ plural = "rayclusters" ,
377
+ )
351
378
except oc .OpenShiftPythonException as osp : # pragma: no cover
352
- msg = osp .msg
353
- if "Expected a single object, but selected 0" in msg :
354
- return cluster
355
379
error_msg = osp .result .err ()
356
380
if not (
357
381
'the server doesn\' t have a resource type "rayclusters"' in error_msg
@@ -361,17 +385,23 @@ def _ray_cluster_status(name, namespace="default") -> Optional[RayCluster]:
361
385
):
362
386
raise osp
363
387
364
- if cluster :
365
- return _map_to_ray_cluster ( cluster )
366
-
367
- return cluster
388
+ for rc in rcs [ "items" ] :
389
+ if rc [ "metadata" ][ "name" ] == name :
390
+ return _map_to_ray_cluster ( rc )
391
+ return None
368
392
369
393
370
394
def _get_ray_clusters (namespace = "default" ) -> List [RayCluster ]:
371
395
list_of_clusters = []
372
396
try :
373
- with oc .project (namespace ), oc .timeout (10 * 60 ):
374
- ray_clusters = oc .selector ("rayclusters" ).objects ()
397
+ config .load_kube_config ()
398
+ api_instance = client .CustomObjectsApi ()
399
+ rcs = api_instance .list_namespaced_custom_object (
400
+ group = "ray.io" ,
401
+ version = "v1alpha1" ,
402
+ namespace = namespace ,
403
+ plural = "rayclusters" ,
404
+ )
375
405
except oc .OpenShiftPythonException as osp : # pragma: no cover
376
406
error_msg = osp .result .err ()
377
407
if (
@@ -386,8 +416,8 @@ def _get_ray_clusters(namespace="default") -> List[RayCluster]:
386
416
else :
387
417
raise osp
388
418
389
- for cluster in ray_clusters :
390
- list_of_clusters .append (_map_to_ray_cluster (cluster ))
419
+ for rc in rcs [ "items" ] :
420
+ list_of_clusters .append (_map_to_ray_cluster (rc ))
391
421
return list_of_clusters
392
422
393
423
@@ -397,8 +427,14 @@ def _get_app_wrappers(
397
427
list_of_app_wrappers = []
398
428
399
429
try :
400
- with oc .project (namespace ), oc .timeout (10 * 60 ):
401
- app_wrappers = oc .selector ("appwrappers" ).objects ()
430
+ config .load_kube_config ()
431
+ api_instance = client .CustomObjectsApi ()
432
+ aws = api_instance .list_namespaced_custom_object (
433
+ group = "mcad.ibm.com" ,
434
+ version = "v1beta1" ,
435
+ namespace = namespace ,
436
+ plural = "appwrappers" ,
437
+ )
402
438
except oc .OpenShiftPythonException as osp : # pragma: no cover
403
439
error_msg = osp .result .err ()
404
440
if (
@@ -413,7 +449,7 @@ def _get_app_wrappers(
413
449
else :
414
450
raise osp
415
451
416
- for item in app_wrappers :
452
+ for item in aws [ "items" ] :
417
453
app_wrapper = _map_to_app_wrapper (item )
418
454
if filter and app_wrapper .status in filter :
419
455
list_of_app_wrappers .append (app_wrapper )
@@ -423,48 +459,46 @@ def _get_app_wrappers(
423
459
return list_of_app_wrappers
424
460
425
461
426
- def _map_to_ray_cluster (cluster ) -> Optional [RayCluster ]:
427
- cluster_model = cluster .model
428
- if type (cluster_model .status .state ) == oc .model .MissingModel :
429
- status = RayClusterStatus .UNKNOWN
462
+ def _map_to_ray_cluster (rc ) -> Optional [RayCluster ]:
463
+ if "state" in rc ["status" ]:
464
+ status = RayClusterStatus (rc ["status" ]["state" ].lower ())
430
465
else :
431
- status = RayClusterStatus ( cluster_model . status . state . lower ())
466
+ status = RayClusterStatus . UNKNOWN
432
467
433
- with oc .project (cluster . namespace () ), oc .timeout (10 * 60 ):
468
+ with oc .project (rc [ "metadata" ][ " namespace" ] ), oc .timeout (10 * 60 ):
434
469
route = (
435
- oc .selector (f"route/ray-dashboard-{ cluster . name () } " )
470
+ oc .selector (f"route/ray-dashboard-{ rc [ 'metadata' ][ ' name' ] } " )
436
471
.object ()
437
472
.model .spec .host
438
473
)
439
474
440
475
return RayCluster (
441
- name = cluster . name () ,
476
+ name = rc [ "metadata" ][ " name" ] ,
442
477
status = status ,
443
478
# for now we are not using autoscaling so same replicas is fine
444
- min_workers = cluster_model . spec . workerGroupSpecs [0 ]. replicas ,
445
- max_workers = cluster_model . spec . workerGroupSpecs [0 ]. replicas ,
446
- worker_mem_max = cluster_model . spec . workerGroupSpecs [0 ]
447
- . template . spec . containers [ 0 ]
448
- . resources . limits . memory ,
449
- worker_mem_min = cluster_model . spec . workerGroupSpecs [0 ]
450
- . template . spec . containers [ 0 ]
451
- . resources . requests . memory ,
452
- worker_cpu = cluster_model . spec . workerGroupSpecs [0 ]
453
- . template . spec . containers [ 0 ]
454
- . resources . limits . cpu ,
479
+ min_workers = rc [ " spec" ][ " workerGroupSpecs" ] [0 ][ " replicas" ] ,
480
+ max_workers = rc [ " spec" ][ " workerGroupSpecs" ] [0 ][ " replicas" ] ,
481
+ worker_mem_max = rc [ " spec" ][ " workerGroupSpecs" ] [0 ][ "template" ][ "spec" ][
482
+ " containers"
483
+ ][ 0 ][ " resources" ][ " limits" ][ " memory" ] ,
484
+ worker_mem_min = rc [ " spec" ][ " workerGroupSpecs" ] [0 ][ "template" ][ "spec" ][
485
+ " containers"
486
+ ][ 0 ][ " resources" ][ " requests" ][ " memory" ] ,
487
+ worker_cpu = rc [ " spec" ][ " workerGroupSpecs" ] [0 ][ "template" ][ "spec" ][ "containers" ][
488
+ 0
489
+ ][ " resources" ][ " limits" ][ " cpu" ] ,
455
490
worker_gpu = 0 , # hard to detect currently how many gpus, can override it with what the user asked for
456
- namespace = cluster . namespace () ,
491
+ namespace = rc [ "metadata" ][ " namespace" ] ,
457
492
dashboard = route ,
458
493
)
459
494
460
495
461
- def _map_to_app_wrapper (cluster ) -> AppWrapper :
462
- cluster_model = cluster .model
496
+ def _map_to_app_wrapper (aw ) -> AppWrapper :
463
497
return AppWrapper (
464
- name = cluster . name () ,
465
- status = AppWrapperStatus (cluster_model . status . state .lower ()),
466
- can_run = cluster_model . status . canrun ,
467
- job_state = cluster_model . status . queuejobstate ,
498
+ name = aw [ "metadata" ][ " name" ] ,
499
+ status = AppWrapperStatus (aw [ " status" ][ " state" ] .lower ()),
500
+ can_run = aw [ " status" ][ " canrun" ] ,
501
+ job_state = aw [ " status" ][ " queuejobstate" ] ,
468
502
)
469
503
470
504
0 commit comments