Skip to content

create classes for submitting and watching DDP jobs #70

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Mar 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,873 changes: 78 additions & 1,795 deletions demo-notebooks/batch-job/batch_mnist.ipynb

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
openshift-client==1.0.18
rich==12.5.1
ray[default]==2.1.0
git+https://github.com/project-codeflare/torchx@6517d5b060e4fe32b9ad41019c3bef647095c35f#egg=torchx
18 changes: 17 additions & 1 deletion src/codeflare_sdk/cluster/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

from os import stat
from time import sleep
from typing import List, Optional, Tuple
from typing import List, Optional, Tuple, Dict

import openshift as oc
from ray.job_submission import JobSubmissionClient
Expand All @@ -45,6 +45,8 @@ class Cluster:
Note that currently, the underlying implementation is a Ray cluster.
"""

torchx_scheduler = "ray"

def __init__(self, config: ClusterConfiguration):
"""
Create the resource cluster object by passing in a ClusterConfiguration
Expand Down Expand Up @@ -268,6 +270,20 @@ def job_logs(self, job_id: str) -> str:
client = JobSubmissionClient(dashboard_route)
return client.get_job_logs(job_id)

def torchx_config(
self, working_dir: str = None, requirements: str = None
) -> Dict[str, str]:
dashboard_address = f"{self.cluster_dashboard_uri().lstrip('http://')}"
to_return = {
"cluster_name": self.config.name,
"dashboard_address": dashboard_address,
}
if working_dir:
to_return["working_dir"] = working_dir
if requirements:
to_return["requirements"] = requirements
return to_return


def get_current_namespace() -> str:
"""
Expand Down
Empty file.
126 changes: 126 additions & 0 deletions src/codeflare_sdk/job/jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# Copyright 2023 IBM, Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import abc
from typing import TYPE_CHECKING, Optional, Dict, List
from pathlib import Path

from torchx.components.dist import ddp
from torchx.runner import get_runner
from torchx.specs import AppHandle, parse_app_handle, AppDryRunInfo

if TYPE_CHECKING:
from ..cluster.cluster import Cluster

all_jobs: List["Job"] = []
torchx_runner = get_runner()


class JobDefinition(metaclass=abc.ABCMeta):
def _dry_run(self, cluster: "Cluster"):
pass

def submit(self, cluster: "Cluster"):
pass


class Job(metaclass=abc.ABCMeta):
def status(self):
pass

def logs(self):
pass


class DDPJobDefinition(JobDefinition):
def __init__(
self,
script: Optional[str] = None,
m: Optional[str] = None,
script_args: Optional[List[str]] = None,
name: Optional[str] = None,
cpu: Optional[int] = None,
gpu: Optional[int] = None,
memMB: Optional[int] = None,
h: Optional[str] = None,
j: Optional[str] = None,
env: Optional[Dict[str, str]] = None,
max_retries: int = 0,
mounts: Optional[List[str]] = None,
rdzv_port: int = 29500,
scheduler_args: Optional[Dict[str, str]] = None,
):
if bool(script) == bool(m): # logical XOR
raise ValueError(
"Exactly one of the following arguments must be defined: [script, m]."
)
self.script = script
self.m = m
self.script_args: List[str] = script_args if script_args is not None else []
self.name = name
self.cpu = cpu
self.gpu = gpu
self.memMB = memMB
self.h = h
self.j = j
self.env: Dict[str, str] = env if env is not None else dict()
self.max_retries = max_retries
self.mounts: List[str] = mounts if mounts is not None else []
self.rdzv_port = rdzv_port
self.scheduler_args: Dict[str, str] = (
scheduler_args if scheduler_args is not None else dict()
)

def _dry_run(self, cluster: "Cluster"):
j = f"{cluster.config.max_worker}x{max(cluster.config.gpu, 1)}" # # of proc. = # of gpus
return torchx_runner.dryrun(
app=ddp(
*self.script_args,
script=self.script,
m=self.m,
name=self.name,
h=self.h,
cpu=self.cpu if self.cpu is not None else cluster.config.max_cpus,
gpu=self.gpu if self.gpu is not None else cluster.config.gpu,
memMB=self.memMB
if self.memMB is not None
else cluster.config.max_memory * 1024,
j=self.j if self.j is not None else j,
env=self.env,
max_retries=self.max_retries,
rdzv_port=self.rdzv_port,
mounts=self.mounts,
),
scheduler=cluster.torchx_scheduler,
cfg=cluster.torchx_config(**self.scheduler_args),
workspace=f"file://{Path.cwd()}",
)

def submit(self, cluster: "Cluster") -> "Job":
return DDPJob(self, cluster)


class DDPJob(Job):
def __init__(self, job_definition: "DDPJobDefinition", cluster: "Cluster"):
self.job_definition = job_definition
self.cluster = cluster
self._app_handle = torchx_runner.schedule(job_definition._dry_run(cluster))
all_jobs.append(self)

def status(self) -> str:
return torchx_runner.status(self._app_handle)

def logs(self) -> str:
return "".join(torchx_runner.log_lines(self._app_handle, None))
195 changes: 195 additions & 0 deletions tests/unit_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import sys
import filecmp
import os
import re

parent = Path(__file__).resolve().parents[1]
sys.path.append(str(parent) + "/src")
Expand Down Expand Up @@ -46,10 +47,20 @@
RayClusterStatus,
CodeFlareClusterStatus,
)
from codeflare_sdk.job.jobs import (
JobDefinition,
Job,
DDPJobDefinition,
DDPJob,
torchx_runner,
)
import openshift
from openshift import OpenShiftPythonException
from openshift.selector import Selector
import ray
from torchx.specs import AppDryRunInfo, AppDef
from torchx.runner import get_runner, Runner
from torchx.schedulers.ray_scheduler import RayJob
import pytest


Expand Down Expand Up @@ -1535,6 +1546,7 @@ def test_cluster_status(mocker):
mocker.patch(
"codeflare_sdk.cluster.cluster._ray_cluster_status", return_value=fake_ray
)

status, ready = cf.status()
assert status == CodeFlareClusterStatus.STARTING
assert ready == False
Expand Down Expand Up @@ -1594,3 +1606,186 @@ def test_cmd_line_generation():
def test_cleanup():
os.remove("test.yaml")
os.remove("raytest2.yaml")


def test_jobdefinition_coverage():
abstract = JobDefinition()
cluster = Cluster(test_config_creation())
abstract._dry_run(cluster)
abstract.submit(cluster)


def test_job_coverage():
abstract = Job()
abstract.status()
abstract.logs()


def test_DDPJobDefinition_creation():
ddp = DDPJobDefinition(
script="test.py",
m=None,
script_args=["test"],
name="test",
cpu=1,
gpu=0,
memMB=1024,
h=None,
j="2x1",
env={"test": "test"},
max_retries=0,
mounts=[],
rdzv_port=29500,
scheduler_args={"requirements": "test"},
)
assert ddp.script == "test.py"
assert ddp.m == None
assert ddp.script_args == ["test"]
assert ddp.name == "test"
assert ddp.cpu == 1
assert ddp.gpu == 0
assert ddp.memMB == 1024
assert ddp.h == None
assert ddp.j == "2x1"
assert ddp.env == {"test": "test"}
assert ddp.max_retries == 0
assert ddp.mounts == []
assert ddp.rdzv_port == 29500
assert ddp.scheduler_args == {"requirements": "test"}
return ddp


def test_DDPJobDefinition_dry_run():
"""
Test that the dry run method returns the correct type: AppDryRunInfo,
that the attributes of the returned object are of the correct type,
and that the values from cluster and job definition are correctly passed.
"""
ddp = test_DDPJobDefinition_creation()
cluster = Cluster(test_config_creation())
ddp_job = ddp._dry_run(cluster)
assert type(ddp_job) == AppDryRunInfo
assert ddp_job._fmt is not None
assert type(ddp_job.request) == RayJob
assert type(ddp_job._app) == AppDef
assert type(ddp_job._cfg) == type(dict())
assert type(ddp_job._scheduler) == type(str())

assert ddp_job.request.app_id.startswith("test")
assert ddp_job.request.working_dir.startswith("/tmp/torchx_workspace")
assert ddp_job.request.cluster_name == "unit-test-cluster"
assert ddp_job.request.requirements == "test"

assert ddp_job._app.roles[0].resource.cpu == 1
assert ddp_job._app.roles[0].resource.gpu == 0
assert ddp_job._app.roles[0].resource.memMB == 1024

assert ddp_job._cfg["cluster_name"] == "unit-test-cluster"
assert ddp_job._cfg["requirements"] == "test"

assert ddp_job._scheduler == "ray"


def test_DDPJobDefinition_dry_run_no_resource_args():
"""
Test that the dry run correctly gets resources from the cluster object
when the job definition does not specify resources.
"""
cluster = Cluster(test_config_creation())
ddp = DDPJobDefinition(
script="test.py",
m=None,
script_args=["test"],
name="test",
h=None,
env={"test": "test"},
max_retries=0,
mounts=[],
rdzv_port=29500,
scheduler_args={"requirements": "test"},
)
ddp_job = ddp._dry_run(cluster)

assert ddp_job._app.roles[0].resource.cpu == cluster.config.max_cpus
assert ddp_job._app.roles[0].resource.gpu == cluster.config.gpu
assert ddp_job._app.roles[0].resource.memMB == cluster.config.max_memory * 1024
assert (
parse_j(ddp_job._app.roles[0].args[1])
== f"{cluster.config.max_worker}x{cluster.config.gpu}"
)


def test_DDPJobDefinition_submit(mocker):
"""
Tests that the submit method returns the correct type: DDPJob
And that the attributes of the returned object are of the correct type
"""
ddp_def = test_DDPJobDefinition_creation()
cluster = Cluster(test_config_creation())
mocker.patch(
"codeflare_sdk.job.jobs.torchx_runner.schedule",
return_value="fake-dashboard-url",
) # a fake app_handle
ddp_job = ddp_def.submit(cluster)
assert type(ddp_job) == DDPJob
assert type(ddp_job.job_definition) == DDPJobDefinition
assert type(ddp_job.cluster) == Cluster
assert type(ddp_job._app_handle) == str
assert ddp_job._app_handle == "fake-dashboard-url"


def test_DDPJob_creation(mocker):
ddp_def = test_DDPJobDefinition_creation()
cluster = Cluster(test_config_creation())
mocker.patch(
"codeflare_sdk.job.jobs.torchx_runner.schedule",
return_value="fake-dashboard-url",
) # a fake app_handle
ddp_job = DDPJob(ddp_def, cluster)
assert type(ddp_job) == DDPJob
assert type(ddp_job.job_definition) == DDPJobDefinition
assert type(ddp_job.cluster) == Cluster
assert type(ddp_job._app_handle) == str
assert ddp_job._app_handle == "fake-dashboard-url"
_, args, kwargs = torchx_runner.schedule.mock_calls[0]
assert type(args[0]) == AppDryRunInfo
job_info = args[0]
assert type(job_info.request) == RayJob
assert type(job_info._app) == AppDef
assert type(job_info._cfg) == type(dict())
assert type(job_info._scheduler) == type(str())
return ddp_job


def test_DDPJob_status(mocker):
ddp_job = test_DDPJob_creation(mocker)
mocker.patch(
"codeflare_sdk.job.jobs.torchx_runner.status", return_value="fake-status"
)
assert ddp_job.status() == "fake-status"
_, args, kwargs = torchx_runner.status.mock_calls[0]
assert args[0] == "fake-dashboard-url"


def test_DDPJob_logs(mocker):
ddp_job = test_DDPJob_creation(mocker)
mocker.patch(
"codeflare_sdk.job.jobs.torchx_runner.log_lines", return_value="fake-logs"
)
assert ddp_job.logs() == "fake-logs"
_, args, kwargs = torchx_runner.log_lines.mock_calls[0]
assert args[0] == "fake-dashboard-url"


def parse_j(cmd):

pattern = r"--nnodes\s+\d+\s+--nproc_per_node\s+\d+"
match = re.search(pattern, cmd)
if match:
substring = match.group(0)
else:
return None
args = substring.split()
max_worker = args[1]
gpu = args[3]
return f"{max_worker}x{gpu}"