21
21
from time import sleep
22
22
from typing import List , Optional , Tuple , Dict
23
23
24
+ import openshift as oc
25
+ from kubernetes import config
24
26
from ray .job_submission import JobSubmissionClient
27
+ import urllib3
25
28
26
29
from .auth import config_check , api_config_handler
27
30
from ..utils import pretty_print
28
31
from ..utils .generate_yaml import generate_appwrapper
29
32
from ..utils .kube_api_helpers import _kube_api_error_handling
33
+ from ..utils .openshift_oauth import (
34
+ create_openshift_oauth_objects ,
35
+ delete_openshift_oauth_objects ,
36
+ download_tls_cert ,
37
+ )
30
38
from .config import ClusterConfiguration
31
39
from .model import (
32
40
AppWrapper ,
40
48
import os
41
49
import requests
42
50
51
+ from kubernetes import config
52
+
43
53
44
54
class Cluster :
45
55
"""
@@ -61,6 +71,39 @@ def __init__(self, config: ClusterConfiguration):
61
71
self .config = config
62
72
self .app_wrapper_yaml = self .create_app_wrapper ()
63
73
self .app_wrapper_name = self .app_wrapper_yaml .split ("." )[0 ]
74
+ self ._client = None
75
+
76
+ @property
77
+ def _client_headers (self ):
78
+ k8_client = api_config_handler () or client .ApiClient ()
79
+ return {
80
+ "Authorization" : k8_client .configuration .get_api_key_with_prefix (
81
+ "authorization"
82
+ )
83
+ }
84
+
85
+ @property
86
+ def _client_verify_tls (self ):
87
+ return not self .config .openshift_oauth
88
+
89
+ @property
90
+ def client (self ):
91
+ if self ._client :
92
+ return self ._client
93
+ if self .config .openshift_oauth :
94
+ print (
95
+ api_config_handler ().configuration .get_api_key_with_prefix (
96
+ "authorization"
97
+ )
98
+ )
99
+ self ._client = JobSubmissionClient (
100
+ self .cluster_dashboard_uri (),
101
+ headers = self ._client_headers ,
102
+ verify = self ._client_verify_tls ,
103
+ )
104
+ else :
105
+ self ._client = JobSubmissionClient (self .cluster_dashboard_uri ())
106
+ return self ._client
64
107
65
108
def evaluate_dispatch_priority (self ):
66
109
priority_class = self .config .dispatch_priority
@@ -147,6 +190,7 @@ def create_app_wrapper(self):
147
190
image_pull_secrets = image_pull_secrets ,
148
191
dispatch_priority = dispatch_priority ,
149
192
priority_val = priority_val ,
193
+ openshift_oauth = self .config .openshift_oauth ,
150
194
)
151
195
152
196
# creates a new cluster with the provided or default spec
@@ -156,6 +200,11 @@ def up(self):
156
200
the MCAD queue.
157
201
"""
158
202
namespace = self .config .namespace
203
+ if self .config .openshift_oauth :
204
+ create_openshift_oauth_objects (
205
+ cluster_name = self .config .name , namespace = namespace
206
+ )
207
+
159
208
try :
160
209
config_check ()
161
210
api_instance = client .CustomObjectsApi (api_config_handler ())
@@ -190,6 +239,11 @@ def down(self):
190
239
except Exception as e : # pragma: no cover
191
240
return _kube_api_error_handling (e )
192
241
242
+ if self .config .openshift_oauth :
243
+ delete_openshift_oauth_objects (
244
+ cluster_name = self .config .name , namespace = namespace
245
+ )
246
+
193
247
def status (
194
248
self , print_to_console : bool = True
195
249
) -> Tuple [CodeFlareClusterStatus , bool ]:
@@ -258,7 +312,16 @@ def status(
258
312
return status , ready
259
313
260
314
def is_dashboard_ready (self ) -> bool :
261
- response = requests .get (self .cluster_dashboard_uri (), timeout = 5 )
315
+ try :
316
+ response = requests .get (
317
+ self .cluster_dashboard_uri (),
318
+ headers = self ._client_headers ,
319
+ timeout = 5 ,
320
+ verify = self ._client_verify_tls ,
321
+ )
322
+ except requests .exceptions .SSLError :
323
+ # SSL exception occurs when oauth ingress has been created but cluster is not up
324
+ return False
262
325
if response .status_code == 200 :
263
326
return True
264
327
else :
@@ -330,7 +393,13 @@ def cluster_dashboard_uri(self) -> str:
330
393
return _kube_api_error_handling (e )
331
394
332
395
for route in routes ["items" ]:
333
- if route ["metadata" ]["name" ] == f"ray-dashboard-{ self .config .name } " :
396
+ if route ["metadata" ][
397
+ "name"
398
+ ] == f"ray-dashboard-{ self .config .name } " or route ["metadata" ][
399
+ "name"
400
+ ].startswith (
401
+ f"{ self .config .name } -ingress"
402
+ ):
334
403
protocol = "https" if route ["spec" ].get ("tls" ) else "http"
335
404
return f"{ protocol } ://{ route ['spec' ]['host' ]} "
336
405
return "Dashboard route not available yet, have you run cluster.up()?"
@@ -339,30 +408,24 @@ def list_jobs(self) -> List:
339
408
"""
340
409
This method accesses the head ray node in your cluster and lists the running jobs.
341
410
"""
342
- dashboard_route = self .cluster_dashboard_uri ()
343
- client = JobSubmissionClient (dashboard_route )
344
- return client .list_jobs ()
411
+ return self .client .list_jobs ()
345
412
346
413
def job_status (self , job_id : str ) -> str :
347
414
"""
348
415
This method accesses the head ray node in your cluster and returns the job status for the provided job id.
349
416
"""
350
- dashboard_route = self .cluster_dashboard_uri ()
351
- client = JobSubmissionClient (dashboard_route )
352
- return client .get_job_status (job_id )
417
+ return self .client .get_job_status (job_id )
353
418
354
419
def job_logs (self , job_id : str ) -> str :
355
420
"""
356
421
This method accesses the head ray node in your cluster and returns the logs for the provided job id.
357
422
"""
358
- dashboard_route = self .cluster_dashboard_uri ()
359
- client = JobSubmissionClient (dashboard_route )
360
- return client .get_job_logs (job_id )
423
+ return self .client .get_job_logs (job_id )
361
424
362
425
def torchx_config (
363
426
self , working_dir : str = None , requirements : str = None
364
427
) -> Dict [str , str ]:
365
- dashboard_address = f" { self .cluster_dashboard_uri (). lstrip ( 'http://' ) } "
428
+ dashboard_address = urllib3 . util . parse_url ( self .cluster_dashboard_uri ()). host
366
429
to_return = {
367
430
"cluster_name" : self .config .name ,
368
431
"dashboard_address" : dashboard_address ,
@@ -591,7 +654,7 @@ def _get_app_wrappers(
591
654
592
655
593
656
def _map_to_ray_cluster (rc ) -> Optional [RayCluster ]:
594
- if "status" in rc and " state" in rc ["status" ]:
657
+ if "state" in rc ["status" ]:
595
658
status = RayClusterStatus (rc ["status" ]["state" ].lower ())
596
659
else :
597
660
status = RayClusterStatus .UNKNOWN
@@ -606,7 +669,13 @@ def _map_to_ray_cluster(rc) -> Optional[RayCluster]:
606
669
)
607
670
ray_route = None
608
671
for route in routes ["items" ]:
609
- if route ["metadata" ]["name" ] == f"ray-dashboard-{ rc ['metadata' ]['name' ]} " :
672
+ if route ["metadata" ][
673
+ "name"
674
+ ] == f"ray-dashboard-{ rc ['metadata' ]['name' ]} " or route ["metadata" ][
675
+ "name"
676
+ ].startswith (
677
+ f"{ rc ['metadata' ]['name' ]} -ingress"
678
+ ):
610
679
protocol = "https" if route ["spec" ].get ("tls" ) else "http"
611
680
ray_route = f"{ protocol } ://{ route ['spec' ]['host' ]} "
612
681
0 commit comments