35
35
all_jobs : List ["Job" ] = []
36
36
torchx_runner : Runner = get_runner ()
37
37
38
- torchx_runner .run (app , scheduler , cfg , workspace )
39
-
40
- torchx_runner .run_component (component , component_args , scheduler , cfg , workspace )
41
-
42
38
class JobDefinition (metaclass = abc .ABCMeta ):
43
39
"""
44
40
A job definition to be submitted to a generic backend cluster.
45
41
"""
46
42
47
- def submit (self , cluster : Cluster ):
43
+ def submit (self , cluster : " Cluster" ):
48
44
"""
49
45
Method for creating a job on a specific cluster
50
46
"""
@@ -76,7 +72,7 @@ def __init__(self, config: JobConfiguration):
76
72
"""
77
73
self .config = config
78
74
79
- def submit (self , cluster : Cluster ) -> "TorchXRayJob" :
75
+ def submit (self , cluster : " Cluster" ) -> "TorchXRayJob" :
80
76
"""
81
77
Submit the job definition to a specific cluster, resulting in a Job object.
82
78
"""
@@ -87,48 +83,53 @@ class TorchXRayJob(Job):
87
83
"""
88
84
Active submission of a dist.ddp job to a Ray cluster which can be used to get logs and status.
89
85
"""
90
- def __init__ (self , job_definition : TorchXJobDefinition , cluster : Cluster , * script_args ):
86
+ def __init__ (self , job_definition : TorchXJobDefinition , cluster : " Cluster" , * script_args ):
91
87
"""
92
- TODO
88
+ Creates job which maximizes resource usage on the passed cluster.
93
89
"""
94
90
self .job_definition : TorchXJobDefinition = job_definition
95
- self .cluster : Cluster = cluster
91
+ self .cluster : " Cluster" = cluster
96
92
j = f"{ cluster .config .max_worker } x{ max (cluster .config .gpu , 1 )} " # # of proc. = # of gpus
97
- # TODO: allow user to override resource allocation for job
98
93
_app_handle : AppHandle = torchx_runner .run (
99
94
app = ddp (
100
95
* script_args ,
101
96
script = job_definition .config .script ,
102
- m = None , # python module to run (might be worth exposing)
103
- name = job_definition .config .name ,
97
+ m = job_definition . config . m ,
98
+ name = job_definition .config .name ,
104
99
h = None , # for custom resource types
105
- cpu = cluster .config .max_cpus , # TODO: get from cluster config
100
+ cpu = cluster .config .max_cpus ,
106
101
gpu = cluster .config .gpu ,
107
102
memMB = 1024 * cluster .config .max_memory , # cluster memory is in GB
108
103
j = j ,
109
- env = None , # TODO: should definitely expose Dict[str, str]
110
- max_retries = 0 , # TODO: maybe expose
111
- mounts = None , # TODO: maybe expose
112
- debug = False # TODO: expose
104
+ env = job_definition . config . env ,
105
+ debug = job_definition . config . debug # TODO: expose
106
+ # max_retries=0 , # default
107
+ # mounts=None, # default
113
108
),
114
- scheduler = "ray" , cfg = "fo" ,
109
+ scheduler = "ray" , # can be determined by type of cluster if more are introduced
110
+ cfg = {
111
+ "cluster_name" : self .cluster .config .name ,
112
+ "dashboard_address" : self .cluster .cluster_dashboard_uri (self .cluster .config .namespace ).split (":" )[0 ],
113
+ "working_dir" : job_definition .config .working_dir ,
114
+ "requirements" : job_definition .config .requirements ,
115
+ },
115
116
)
116
117
117
118
_ , _ , self .job_id = parse_app_handle (_app_handle )
118
119
all_jobs .append (self )
119
120
120
- def status (self ):
121
+ def status (self ) -> str :
121
122
"""
122
- TODO
123
+ Get running job status.
123
124
"""
124
- dashboard_route = self .cluster .cluster_dashboard_uri ()
125
+ dashboard_route = self .cluster .cluster_dashboard_uri (namespace = self . cluster . config . namespace )
125
126
client = JobSubmissionClient (dashboard_route )
126
127
return client .get_job_status (self .job_id )
127
128
128
- def logs (self ):
129
+ def logs (self ) -> str :
129
130
"""
130
- TODO
131
+ Get job logs.
131
132
"""
132
- dashboard_route = self .cluster_dashboard_uri (namespace = self .config .namespace )
133
+ dashboard_route = self .cluster_dashboard_uri (namespace = self .cluster . config .namespace )
133
134
client = JobSubmissionClient (dashboard_route )
134
- return client .get_job_logs (job_id )
135
+ return client .get_job_logs (self . job_id )
0 commit comments