30
30
from .config import JobConfiguration
31
31
32
32
import typing
33
+
33
34
if typing .TYPE_CHECKING :
34
35
from ..cluster .cluster import Cluster
35
36
36
37
all_jobs : List ["Job" ] = []
37
38
torchx_runner : Runner = get_runner ()
38
39
40
+
39
41
class JobDefinition (metaclass = abc .ABCMeta ):
40
42
"""
41
43
A job definition to be submitted to a generic backend cluster.
@@ -54,6 +56,7 @@ def submit(self, cluster: "Cluster"):
54
56
"""
55
57
pass
56
58
59
+
57
60
class Job (metaclass = abc .ABCMeta ):
58
61
"""
59
62
An abstract class that defines the necessary methods for authenticating to a remote environment.
@@ -91,13 +94,13 @@ def _dry_run(self, cluster: "Cluster", *script_args) -> AppDryRunInfo:
91
94
return torchx_runner .dryrun (
92
95
app = ddp (
93
96
* script_args ,
94
- script = self .config .script ,
97
+ script = self .config .script ,
95
98
m = self .config .m ,
96
99
name = self .config .name ,
97
100
h = None , # for custom resource types
98
101
cpu = cluster .config .max_cpus ,
99
- gpu = cluster .config .gpu ,
100
- memMB = 1024 * cluster .config .max_memory , # cluster memory is in GB
102
+ gpu = cluster .config .gpu ,
103
+ memMB = 1024 * cluster .config .max_memory , # cluster memory is in GB
101
104
j = j ,
102
105
env = self .config .env ,
103
106
# max_retries=0, # default
@@ -110,7 +113,7 @@ def _dry_run(self, cluster: "Cluster", *script_args) -> AppDryRunInfo:
110
113
"working_dir" : self .config .working_dir ,
111
114
"requirements" : self .config .requirements ,
112
115
},
113
- workspace = f"file://{ Path .cwd ()} "
116
+ workspace = f"file://{ Path .cwd ()} " ,
114
117
)
115
118
116
119
def submit (self , cluster : "Cluster" ) -> "TorchXRayJob" :
@@ -124,13 +127,18 @@ class TorchXRayJob(Job):
124
127
"""
125
128
Active submission of a dist.ddp job to a Ray cluster which can be used to get logs and status.
126
129
"""
127
- def __init__ (self , job_definition : TorchXJobDefinition , cluster : "Cluster" , * script_args ):
130
+
131
+ def __init__ (
132
+ self , job_definition : TorchXJobDefinition , cluster : "Cluster" , * script_args
133
+ ):
128
134
"""
129
135
Creates job which maximizes resource usage on the passed cluster.
130
136
"""
131
137
self .job_definition : TorchXJobDefinition = job_definition
132
138
self .cluster : "Cluster" = cluster
133
- self ._app_handle = torchx_runner .schedule (job_definition ._dry_run (cluster , * script_args ))
139
+ self ._app_handle = torchx_runner .schedule (
140
+ job_definition ._dry_run (cluster , * script_args )
141
+ )
134
142
all_jobs .append (self )
135
143
136
144
@property
0 commit comments