diff --git a/ci/travis/ci.sh b/ci/travis/ci.sh index c1abacac75cb..5ab102f115ea 100755 --- a/ci/travis/ci.sh +++ b/ci/travis/ci.sh @@ -165,6 +165,7 @@ test_python() { -python/ray/serve:test_get_deployment # address violation -python/ray/tests:test_global_gc -python/ray/tests:test_job + -python/ray/tests:test_job_manager -python/ray/tests:test_memstat -python/ray/tests:test_metrics -python/ray/tests:test_metrics_agent # timeout diff --git a/dashboard/modules/job/data_types.py b/dashboard/modules/job/data_types.py index cc7c879d9074..7663ae3fa381 100644 --- a/dashboard/modules/job/data_types.py +++ b/dashboard/modules/job/data_types.py @@ -1,11 +1,6 @@ from enum import Enum from typing import Any, Dict - -try: - from pydantic import BaseModel -except ImportError: - # Lazy import without breaking class def - BaseModel = object +from dataclasses import dataclass class JobStatus(str, Enum): @@ -19,67 +14,45 @@ def __str__(self): FAILED = "FAILED" -class JobSpec(BaseModel): - # Dict to setup execution environment, better to have schema for this - runtime_env: Dict[str, Any] - # Command to start execution, ex: "python script.py" - entrypoint: str - # Metadata to pass in to configure job behavior or use as tags - # Required by Anyscale product and already supported in Ray drivers - metadata: Dict[str, str] - # Likely there will be more fields needed later on for different apps - # but we should keep it minimal and delegate policies to job manager - - # ==== Get Package ==== -class GetPackageRequest(BaseModel): - package_uri: str - - -class GetPackageResponse(BaseModel): +@dataclass +class GetPackageResponse: package_exists: bool -# ==== Upload Package ==== - - -class UploadPackageRequest(BaseModel): - package_uri: str - encoded_package_bytes: str - - # ==== Job Submit ==== -class JobSubmitRequest(BaseModel): - job_spec: JobSpec +@dataclass +class JobSubmitRequest: + # Dict to setup execution environment. + runtime_env: Dict[str, Any] + # Command to start execution, ex: "python script.py" + entrypoint: str + # Metadata to pass in to the JobConfig. + metadata: Dict[str, str] -class JobSubmitResponse(BaseModel): +@dataclass +class JobSubmitResponse: job_id: str # ==== Job Status ==== -class JobStatusRequest(BaseModel): - job_id: str - - -class JobStatusResponse(BaseModel): +@dataclass +class JobStatusResponse: job_status: JobStatus # ==== Job Logs ==== -class JobLogsRequest(BaseModel): - job_id: str - - # TODO(jiaodong): Support log streaming #19415 -class JobLogsResponse(BaseModel): +@dataclass +class JobLogsResponse: stdout: str stderr: str diff --git a/dashboard/modules/job/job_head.py b/dashboard/modules/job/job_head.py index 366553f21cda..7cb905a81257 100644 --- a/dashboard/modules/job/job_head.py +++ b/dashboard/modules/job/job_head.py @@ -1,8 +1,9 @@ import aiohttp.web -from base64 import b64decode from functools import wraps import logging from typing import Callable +import json +import dataclasses import ray import ray.dashboard.utils as dashboard_utils @@ -10,9 +11,8 @@ from ray._private.runtime_env.packaging import (package_exists, upload_package_to_gcs) from ray.dashboard.modules.job.data_types import ( - GetPackageRequest, GetPackageResponse, UploadPackageRequest, JobStatus, - JobSubmitRequest, JobSubmitResponse, JobStatusRequest, JobStatusResponse, - JobLogsRequest, JobLogsResponse) + GetPackageResponse, JobStatus, JobSubmitRequest, JobSubmitResponse, + JobStatusResponse, JobLogsResponse) logger = logging.getLogger(__name__) routes = dashboard_utils.ClassMethodRouteTable @@ -40,81 +40,61 @@ def __init__(self, dashboard_head): @_ensure_ray_initialized async def get_package(self, req: aiohttp.web.Request) -> aiohttp.web.Response: - req_data = await req.json() - package_uri = GetPackageRequest(**req_data).package_uri - already_exists = package_exists(package_uri) - exists_str = "exists" if already_exists else "does not exist" - return dashboard_utils.rest_response( - success=True, - convert_google_style=False, - data=GetPackageResponse(package_exists=already_exists).dict(), - message=f"Package {package_uri} {exists_str}.") + package_uri = req.query["package_uri"] + resp = GetPackageResponse(package_exists=package_exists(package_uri)) + return aiohttp.web.Response( + text=json.dumps(dataclasses.asdict(resp)), + content_type="application/json") @routes.put("/package") @_ensure_ray_initialized - async def upload_package(self, - req: aiohttp.web.Request) -> aiohttp.web.Response: - req_data = await req.json() - upload_req = UploadPackageRequest(**req_data) - package_uri = upload_req.package_uri + async def upload_package(self, req: aiohttp.web.Request): + package_uri = req.query["package_uri"] logger.info(f"Uploading package {package_uri} to the GCS.") - upload_package_to_gcs(package_uri, - b64decode(upload_req.encoded_package_bytes)) - return dashboard_utils.rest_response( - success=True, - convert_google_style=False, - message=f"Successfully uploaded {package_uri}.") + upload_package_to_gcs(package_uri, await req.read()) + + return aiohttp.web.Response() @routes.post("/submit") @_ensure_ray_initialized async def submit(self, req: aiohttp.web.Request) -> aiohttp.web.Response: - req_data = await req.json() - submit_request = JobSubmitRequest(**req_data) + # TODO: (jiaodong) Validate if job request is valid without using + # pydantic. + submit_request = JobSubmitRequest(**(await req.json())) job_id = self._job_manager.submit_job( - submit_request.job_spec.entrypoint, - runtime_env=submit_request.job_spec.runtime_env, - metadata=submit_request.job_spec.metadata) + entrypoint=submit_request.entrypoint, + runtime_env=submit_request.runtime_env, + metadata=submit_request.metadata) resp = JobSubmitResponse(job_id=job_id) - return dashboard_utils.rest_response( - success=True, - convert_google_style=False, - data=resp.dict(), - message=f"Submitted job {job_id}.") + return aiohttp.web.Response( + text=json.dumps(dataclasses.asdict(resp)), + content_type="application/json") @routes.get("/status") @_ensure_ray_initialized async def status(self, req: aiohttp.web.Request) -> aiohttp.web.Response: - req_data = dict(await req.json()) - status_request = JobStatusRequest(**req_data) + job_id = req.query["job_id"] + status: JobStatus = self._job_manager.get_job_status(job_id) - status: JobStatus = self._job_manager.get_job_status( - status_request.job_id) resp = JobStatusResponse(job_status=status) - return dashboard_utils.rest_response( - success=True, - convert_google_style=False, - data=resp.dict(), - message=f"Queried status for job {status_request.job_id}") + return aiohttp.web.Response( + text=json.dumps(dataclasses.asdict(resp)), + content_type="application/json") @routes.get("/logs") @_ensure_ray_initialized async def logs(self, req: aiohttp.web.Request) -> aiohttp.web.Response: - req_data = dict(await req.json()) - logs_request = JobLogsRequest(**req_data) - - stdout: bytes = self._job_manager.get_job_stdout(logs_request.job_id) - stderr: bytes = self._job_manager.get_job_stderr(logs_request.job_id) + job_id = req.query["job_id"] + stdout: bytes = self._job_manager.get_job_stdout(job_id) + stderr: bytes = self._job_manager.get_job_stderr(job_id) # TODO(jiaodong): Support log streaming #19415 resp = JobLogsResponse( stdout=stdout.decode("utf-8"), stderr=stderr.decode("utf-8")) - - return dashboard_utils.rest_response( - success=True, - convert_google_style=False, - data=resp.dict(), - message=f"Logs returned for job {logs_request.job_id}") + return aiohttp.web.Response( + text=json.dumps(dataclasses.asdict(resp)), + content_type="application/json") async def run(self, server): if not self._job_manager: diff --git a/dashboard/modules/job/sdk.py b/dashboard/modules/job/sdk.py index 1c64c80c0e3e..ddaccf9e62c5 100644 --- a/dashboard/modules/job/sdk.py +++ b/dashboard/modules/job/sdk.py @@ -1,25 +1,17 @@ -from base64 import b64encode +import dataclasses import logging from pathlib import Path import tempfile from typing import Any, Dict, List, Optional, Tuple -try: - from pydantic import BaseModel - from pydantic.main import ModelMetaclass -except ImportError: - BaseModel = object - ModelMetaclass = object - import requests from ray._private.runtime_env.packaging import ( create_package, get_uri_for_directory, parse_uri) from ray._private.job_manager import JobStatus from ray.dashboard.modules.job.data_types import ( - GetPackageRequest, GetPackageResponse, UploadPackageRequest, JobSpec, - JobSubmitRequest, JobSubmitResponse, JobStatusRequest, JobStatusResponse, - JobLogsRequest, JobLogsResponse) + GetPackageResponse, JobSubmitRequest, JobSubmitResponse, JobStatusResponse, + JobLogsResponse) logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) @@ -37,32 +29,33 @@ def _test_connection(self): raise ConnectionError( f"Failed to connect to Ray at address: {self._address}.") - def _do_request( - self, - method: str, - endpoint: str, - data: BaseModel, - response_type: Optional[ModelMetaclass] = None) -> Dict[Any, Any]: + def _do_request(self, + method: str, + endpoint: str, + *, + data: Optional[bytes] = None, + json_data: Optional[dict] = None, + params: Optional[dict] = None, + response_type: Optional[type] = None) -> Optional[object]: url = f"{self._address}/{endpoint}" - json_payload = data.dict() - logger.debug(f"Sending request to {url} with payload {json_payload}.") - r = requests.request(method, url, json=json_payload) - r.raise_for_status() - - response_json = r.json() - if not response_json["result"]: # Indicates failure. - raise Exception(response_json["msg"]) + logger.info(f"Sending request to {url} with json: {json_data}.") + r = requests.request( + method, url, data=data, json=json_data, params=params) + r.raise_for_status() if response_type is None: return None else: - # Dashboard "framework" returns double-nested "data" field... - return response_type(**response_json["data"]["data"]) + response = r.json() + logger.info(f"Got response: {response}.") + return response_type(**response) def _package_exists(self, package_uri: str) -> bool: - req = GetPackageRequest(package_uri=package_uri) resp = self._do_request( - "GET", "package", req, response_type=GetPackageResponse) + "GET", + "package", + params={"package_uri": package_uri}, + response_type=GetPackageResponse) return resp.package_exists def _upload_package(self, @@ -78,10 +71,11 @@ def _upload_package(self, package_file, include_parent_dir=include_parent_dir, excludes=excludes) - req = UploadPackageRequest( - package_uri=package_uri, - encoded_package_bytes=b64encode(package_file.read_bytes())) - self._do_request("PUT", "package", req) + self._do_request( + "PUT", + "package", + data=package_file.read_bytes(), + params={"package_uri": package_uri}) package_file.unlink() def _upload_package_if_needed(self, @@ -122,18 +116,27 @@ def submit_job(self, metadata = metadata or {} self._upload_working_dir_if_needed(runtime_env) - job_spec = JobSpec( + req = JobSubmitRequest( entrypoint=entrypoint, runtime_env=runtime_env, metadata=metadata) - req = JobSubmitRequest(job_spec=job_spec) - resp = self._do_request("POST", "submit", req, JobSubmitResponse) + resp = self._do_request( + "POST", + "submit", + json_data=dataclasses.asdict(req), + response_type=JobSubmitResponse) return resp.job_id def get_job_status(self, job_id: str) -> JobStatus: - req = JobStatusRequest(job_id=job_id) - resp = self._do_request("GET", "status", req, JobStatusResponse) + resp = self._do_request( + "GET", + "status", + params={"job_id": job_id}, + response_type=JobStatusResponse) return resp.job_status def get_job_logs(self, job_id: str) -> Tuple[str, str]: - req = JobLogsRequest(job_id=job_id) - resp = self._do_request("GET", "logs", req, JobLogsResponse) + resp = self._do_request( + "GET", + "logs", + params={"job_id": job_id}, + response_type=JobLogsResponse) return resp.stdout, resp.stderr diff --git a/dashboard/modules/job/tests/test_http_job_server.py b/dashboard/modules/job/tests/test_http_job_server.py index 4640113519ef..ce735f958d75 100644 --- a/dashboard/modules/job/tests/test_http_job_server.py +++ b/dashboard/modules/job/tests/test_http_job_server.py @@ -12,6 +12,7 @@ from ray.dashboard.modules.job.sdk import JobSubmissionClient logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) @pytest.fixture diff --git a/python/ray/_private/job_manager/job_manager.py b/python/ray/_private/job_manager/job_manager.py index 5788b6b69d14..740bed1bd628 100644 --- a/python/ray/_private/job_manager/job_manager.py +++ b/python/ray/_private/job_manager/job_manager.py @@ -1,14 +1,18 @@ -import json -import os +import asyncio +from asyncio.tasks import FIRST_COMPLETED import pickle +import os +import json +import logging +import traceback import subprocess + from typing import Any, Dict, Tuple, Optional from uuid import uuid4 import ray import ray.ray_constants as ray_constants from ray.actor import ActorHandle -from ray.exceptions import GetTimeoutError, RayActorError from ray.experimental.internal_kv import ( _internal_kv_initialized, _internal_kv_get, @@ -16,6 +20,16 @@ ) from ray.dashboard.modules.job.data_types import JobStatus from ray._private.runtime_env.constants import RAY_JOB_CONFIG_JSON_ENV_VAR +# Used in testing only to cover job status under concurrency +from ray._private.test_utils import SignalActor + +logger = logging.getLogger(__name__) + +# asyncio python version compatibility +try: + create_task = asyncio.create_task +except AttributeError: + create_task = asyncio.ensure_future JOB_ID_METADATA_KEY = "job_submission_id" @@ -85,43 +99,20 @@ def get_status(self, job_id: str) -> JobStatus: return pickle.loads(pickled_status) -def exec_cmd_logs_to_file( - cmd: str, - stdout_file: str, - stderr_file: str, -) -> int: - """ - Runs a command as a child process, streaming stderr & stdout to given - log files. - """ - - with open(stdout_file, "a+") as stdout_in, open(stderr_file, - "a+") as stderr_in: - child = subprocess.Popen( - cmd, - shell=True, - universal_newlines=True, - stdout=stdout_in, - stderr=stderr_in) - - exit_code = child.wait() - return exit_code - - class JobSupervisor: """ Ray actor created by JobManager for each submitted job, responsible to setup runtime_env, execute given shell command in subprocess, update job - status and persist job logs. + status, persist job logs and manage subprocess group cleaning. One job supervisor actor maps to one subprocess, for one job_id. - Job supervisor actor should fate share with subprocess it created. """ + SUBPROCESS_POLL_PERIOD_S = 0.1 + def __init__(self, job_id: str, metadata: Dict[str, str]): self._job_id = job_id - self._status = JobStatus.PENDING self._status_client = JobStatusStorageClient() self._log_client = JobLogStorageClient() self._runtime_env = ray.get_runtime_context().runtime_env @@ -129,19 +120,100 @@ def __init__(self, job_id: str, metadata: Dict[str, str]): self._metadata = metadata self._metadata[JOB_ID_METADATA_KEY] = job_id - def ready(self): + # fire and forget call from outer job manager to this actor + self._stop_event = asyncio.Event() + + async def ready(self): + """Dummy object ref. Return of this function represents job supervisor + actor stated successfully with runtime_env configured, and is ready to + move on to running state. + """ pass - def run(self, cmd: str): - """Run the command, then exit afterwards. + async def _exec_entrypoint_cmd(self, entrypoint_cmd: str, stdout_path: str, + stderr_path: str) -> subprocess.Popen: + """ + Runs a command as a child process, streaming stderr & stdout to given + log files. + + Meanwhile we start a demon process and group driver + subprocess in same pgid, such that if job actor dies, entire process + group also fate share with it. + + Args: + entrypoint_cmd: Driver command to execute in subprocess. + stdout_path: File path on head node's local disk to store driver + command's stdout. + stderr_path: File path on head node's local disk to store driver + command's stderr. + Returns: + child_process: Child process that runs the driver command. Can be + terminated or killed upon user calling stop(). + """ + with open(stdout_path, "a+") as stdout, open(stderr_path, + "a+") as stderr: + child_process = subprocess.Popen( + entrypoint_cmd, + shell=True, + start_new_session=True, + stdout=stdout, + stderr=stderr) + parent_pid = os.getpid() + # Create new pgid with new subprocess to execute driver command + child_pid = child_process.pid + child_pgid = os.getpgid(child_pid) + + # Open a new subprocess to kill the child process when the parent + # process dies kill -s 0 parent_pid will succeed if the parent is + # alive. If it fails, SIGKILL the child process group and exit + subprocess.Popen( + f"while kill -s 0 {parent_pid}; do sleep 1; done; kill -9 -{child_pgid}", # noqa: E501 + shell=True, + # Suppress output + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + return child_process + + async def _polling(self, child_process) -> int: + try: + while child_process is not None: + return_code = child_process.poll() + if return_code is not None: + # subprocess finished with return code + return return_code + else: + # still running, yield control, 0.1s by default + await asyncio.sleep(self.SUBPROCESS_POLL_PERIOD_S) + except Exception: + if child_process: + # TODO (jiaodong): Improve this with SIGTERM then SIGKILL + child_process.kill() + return 1 + + async def run( + self, + entrypoint_cmd: str, + # Signal actor used in testing to capture PENDING -> RUNNING cases + _start_signal_actor: Optional[SignalActor] = None): + """ + Stop and start both happen asynchrously, coordinated by asyncio event + and coroutine, respectively. - Should update state and logs. + 1) Sets job status as running + 2) Pass runtime env and metadata to subprocess as serialized env + variables. + 3) Handle concurrent events of driver execution and """ - assert self._status == JobStatus.PENDING, ( + cur_status = self._get_status() + assert cur_status == JobStatus.PENDING, ( "Run should only be called once.") - self._status = JobStatus.RUNNING - self._status_client.put_status(self._job_id, self._status) - exit_code = None + + if _start_signal_actor: + # Block in PENDING state until start signal received. + await _start_signal_actor.wait.remote() + + self._status_client.put_status(self._job_id, JobStatus.RUNNING) try: # Set JobConfig for the child process (runtime_env, metadata). @@ -153,24 +225,50 @@ def run(self, cmd: str): ) os.environ[ray_constants. RAY_ADDRESS_ENVIRONMENT_VARIABLE] = ray_redis_address + stdout_path, stderr_path = self._log_client.get_log_file_paths( self._job_id) - - exit_code = exec_cmd_logs_to_file(cmd, stdout_path, stderr_path) - finally: - # 3) Once command finishes, update status to SUCCEEDED or FAILED. - if exit_code == 0: - self._status = JobStatus.SUCCEEDED + child_process = await self._exec_entrypoint_cmd( + entrypoint_cmd, stdout_path, stderr_path) + + polling_task = create_task(self._polling(child_process)) + finished, _ = await asyncio.wait( + [polling_task, self._stop_event.wait()], + return_when=FIRST_COMPLETED) + + if self._stop_event.is_set(): + polling_task.cancel() + # TODO (jiaodong): Improve this with SIGTERM then SIGKILL + child_process.kill() + self._status_client.put_status(self._job_id, JobStatus.STOPPED) else: - self._status = JobStatus.FAILED - self._status_client.put_status(self._job_id, self.get_status()) + # Child process finished execution and no stop event is set + # at the same time + assert len( + finished) == 1, "Should have only one coroutine done" + [child_process_task] = finished + return_code = child_process_task.result() + if return_code == 0: + self._status_client.put_status(self._job_id, + JobStatus.SUCCEEDED) + else: + self._status_client.put_status(self._job_id, + JobStatus.FAILED) + except Exception: + logger.error( + "Got unexpected exception while trying to execute driver " + f"command. {traceback.format_exc()}") + finally: + # clean up actor after tasks are finished ray.actor.exit_actor() - def get_status(self) -> JobStatus: - return self._status + def _get_status(self) -> JobStatus: + return self._status_client.get_status(self._job_id) def stop(self): - pass + """Set step_event and let run() handle the rest in its asyncio.wait(). + """ + self._stop_event.set() class JobManager: @@ -180,7 +278,6 @@ class JobManager: as lost once the ray cluster running job manager instance is down. """ JOB_ACTOR_NAME = "_ray_internal_job_actor_{job_id}" - START_ACTOR_TIMEOUT_S = 10 def __init__(self): self._status_client = JobStatusStorageClient() @@ -211,64 +308,117 @@ def _get_current_node_resource_key(self) -> str: raise ValueError( "Cannot found the node dictionary for current node.") - def submit_job( - self, - entrypoint: str, - runtime_env: Optional[Dict[str, Any]] = None, - metadata: Optional[Dict[str, str]] = None, - ) -> str: + def submit_job(self, + entrypoint: str, + runtime_env: Optional[Dict[str, Any]] = None, + metadata: Optional[Dict[str, str]] = None, + _start_signal_actor: Optional[SignalActor] = None) -> str: """ - 1) Create new detached actor with same runtime_env as job spec - 2) Get task / actor level runtime_env as env var and pass into - subprocess - 3) subprocess.run(entrypoint) - - Returns unique job_id. + Job execution happens asynchronously. + + 1) Generate a new unique id for this job submission, each call of this + method assumes they're independent submission with its own new + uuid, job supervisor actor and child process. + 2) Create new detached actor with same runtime_env as job spec + + Actual setting up runtime_env, subprocess group, driver command + execution, subprocess cleaning up and running status update to GCS + is all handled by job supervisor actor. + + Args: + entrypoint: Driver command to execute in subprocess shell. + Represents the entrypoint to start user application. + runtime_env: Runtime environment used to execute driver command, + which could contain its own ray.init() to configure runtime + env at ray cluster, task and actor level. For now, we + assume same runtime_env used for job supervisor actor and + driver command. + metadata: Support passing arbitrary data to driver command in + case needed. + _start_signal_actor: Used in testing only to capture state + transitions between PENDING -> RUNNING. Regular user shouldn't + need this. + + Returns: + job_id: Generated uuid for further job management. Only valid + within the same ray cluster. """ job_id = str(uuid4()) - supervisor = self._supervisor_actor_cls.options( - lifetime="detached", - name=self.JOB_ACTOR_NAME.format(job_id=job_id), - # Currently we assume JobManager is created by dashboard server - # running on headnode, same for job supervisor actors scheduled - resources={ - self._get_current_node_resource_key(): 0.001, - }, - # For now we assume supervisor actor and driver script have same - # runtime_env. - runtime_env=runtime_env, - ).remote(job_id, metadata or {}) + self._status_client.put_status(job_id, JobStatus.PENDING) + supervisor = None try: - ray.get( - supervisor.ready.remote(), timeout=self.START_ACTOR_TIMEOUT_S) - except GetTimeoutError: - ray.kill(supervisor, no_restart=True) - raise RuntimeError(f"Failed to start actor for job {job_id}.") + logger.debug( + f"Submitting job with generated internal job_id: {job_id}") + supervisor = self._supervisor_actor_cls.options( + lifetime="detached", + name=self.JOB_ACTOR_NAME.format(job_id=job_id), + num_cpus=0, + # Currently we assume JobManager is created by dashboard server + # running on headnode, same for job supervisor actors scheduled + resources={ + self._get_current_node_resource_key(): 0.001, + }, + # For now we assume supervisor actor and driver script have + # same runtime_env. + runtime_env=runtime_env).remote(job_id, metadata or {}) + ray.get(supervisor.ready.remote()) + except Exception as e: + if supervisor: + ray.kill(supervisor, no_restart=True) + self._status_client.put_status(job_id, JobStatus.FAILED) + raise RuntimeError( + f"Failed to start actor for job {job_id}. This could be " + "runtime_env configuration failure or invalid runtime_env." + f"Exception message: {str(e)}") # Kick off the job to run in the background. - supervisor.run.remote(entrypoint) + supervisor.run.remote(entrypoint, _start_signal_actor) return job_id def stop_job(self, job_id) -> bool: - """Request job to exit.""" + """Request job to exit, fire and forget. + + Args: + job_id: Generated uuid from submit_job. Only valid in same ray + cluster. + Returns: + stopped: + True if there's running job + False if no running job found + """ job_supervisor_actor = self._get_actor_for_job(job_id) if job_supervisor_actor is not None: - # Actor is still alive, signal it to stop the driver. + # Actor is still alive, signal it to stop the driver, fire and + # forget job_supervisor_actor.stop.remote() + return True + else: + return False + + def get_job_status(self, job_id: str) -> JobStatus: + """Get latest status of a job. If job supervisor actor is no longer + alive, it will also attempt to make adjustments needed to bring job + to correct terminiation state. + + All job status is stored and read only from GCS. - def get_job_status(self, job_id: str): + Args: + job_id: Generated uuid from submit_job. Only valid in same ray + cluster. + Returns: + job_status: Latest known job status + """ job_supervisor_actor = self._get_actor_for_job(job_id) - # Actor is still alive, try to get status from it. - if job_supervisor_actor is not None: - try: - return ray.get(job_supervisor_actor.get_status.remote()) - except RayActorError: - # Actor exited, so we should fall back to internal_kv. - pass + if job_supervisor_actor is None: + # Job actor either exited or failed, we need to ensure never + # left job in non-terminal status in case actor failed without + # updating GCS with latest status. + last_status = self._status_client.get_status(job_id) + if last_status in {JobStatus.PENDING, JobStatus.RUNNING}: + self._status_client.put_status(job_id, JobStatus.FAILED) - # Fall back to storage if the actor is dead. return self._status_client.get_status(job_id) def get_job_stdout(self, job_id: str) -> bytes: diff --git a/python/ray/_private/job_manager/tests/test_job_manager.py b/python/ray/_private/job_manager/tests/test_job_manager.py deleted file mode 100644 index bb20687c2a03..000000000000 --- a/python/ray/_private/job_manager/tests/test_job_manager.py +++ /dev/null @@ -1,198 +0,0 @@ -import pytest - -import ray -from ray._private.job_manager import (JobManager, JobStatus, - JOB_ID_METADATA_KEY) -from ray._private.test_utils import wait_for_condition - -TEST_NAMESPACE = "jobs_test_namespace" - - -@pytest.fixture(scope="session") -def shared_ray_instance(): - yield ray.init(num_cpus=16, namespace=TEST_NAMESPACE) - - -@pytest.fixture -def job_manager(shared_ray_instance): - yield JobManager() - - -def check_job_succeeded(job_manager, job_id): - status = job_manager.get_job_status(job_id) - if status == JobStatus.FAILED: - stdout = job_manager.get_job_stdout(job_id) - stderr = job_manager.get_job_stderr(job_id) - raise RuntimeError(f"Job failed! stdout:\n{stdout}\nstderr:\n{stderr}") - assert status in {JobStatus.RUNNING, JobStatus.SUCCEEDED} - return status == JobStatus.SUCCEEDED - - -def check_job_failed(job_manager, job_id): - status = job_manager.get_job_status(job_id) - assert status in {JobStatus.RUNNING, JobStatus.FAILED} - return status == JobStatus.FAILED - - -def test_submit_basic_echo(job_manager): - job_id = job_manager.submit_job("echo hello") - - wait_for_condition( - check_job_succeeded, job_manager=job_manager, job_id=job_id) - assert job_manager.get_job_stdout(job_id) == b"hello" - - -def test_submit_stderr(job_manager): - job_id = job_manager.submit_job("echo error 1>&2") - - wait_for_condition( - check_job_succeeded, job_manager=job_manager, job_id=job_id) - assert job_manager.get_job_stderr(job_id) == b"error" - - -def test_submit_ls_grep(job_manager): - job_id = job_manager.submit_job("ls | grep test_job_manager.py") - - wait_for_condition( - check_job_succeeded, job_manager=job_manager, job_id=job_id) - assert job_manager.get_job_stdout(job_id) == b"test_job_manager.py" - - -def test_subprocess_exception(job_manager): - """ - Run a python script with exception, ensure: - 1) Job status is marked as failed - 2) Job manager can surface exception message back to stderr api - 3) Job no hanging job supervisor actor - 4) Empty stdout - """ - job_id = job_manager.submit_job( - "python subprocess_driver_scripts/script_with_exception.py") - - wait_for_condition( - check_job_failed, job_manager=job_manager, job_id=job_id) - stderr = job_manager.get_job_stderr(job_id).decode("utf-8") - last_line = stderr.strip().splitlines()[-1] - assert last_line == "Exception: Script failed with exception !" - assert job_manager._get_actor_for_job(job_id) is None - assert job_manager.get_job_stdout(job_id) == b"" - - -def test_submit_with_s3_runtime_env(job_manager): - job_id = job_manager.submit_job( - "python script.py", - runtime_env={"working_dir": "s3://runtime-env-test/script.zip"}) - - wait_for_condition( - check_job_succeeded, job_manager=job_manager, job_id=job_id) - assert job_manager.get_job_stdout( - job_id) == b"Executing main() from script.py !!" - - -class TestRuntimeEnv: - def test_inheritance(self, job_manager): - # Test that the driver and actors/tasks inherit the right runtime_env. - pass - - def test_pass_env_var(self, job_manager): - """Test we can pass env vars in the subprocess that executes job's - driver script. - """ - job_id = job_manager.submit_job( - "echo $TEST_SUBPROCESS_JOB_CONFIG_ENV_VAR", - runtime_env={ - "env_vars": { - "TEST_SUBPROCESS_JOB_CONFIG_ENV_VAR": "233" - } - }) - - wait_for_condition( - check_job_succeeded, job_manager=job_manager, job_id=job_id) - assert job_manager.get_job_stdout(job_id) == b"233" - - def test_multiple_runtime_envs(self, job_manager): - # Test that you can run two jobs in different envs without conflict. - job_id_1 = job_manager.submit_job( - "python subprocess_driver_scripts/print_runtime_env.py", - runtime_env={ - "env_vars": { - "TEST_SUBPROCESS_JOB_CONFIG_ENV_VAR": "JOB_1_VAR" - } - }) - - wait_for_condition( - check_job_succeeded, job_manager=job_manager, job_id=job_id_1) - assert job_manager.get_job_stdout( - job_id_1 - ) == b"{'env_vars': {'TEST_SUBPROCESS_JOB_CONFIG_ENV_VAR': 'JOB_1_VAR'}}" # noqa: E501 - - job_id_2 = job_manager.submit_job( - "python subprocess_driver_scripts/print_runtime_env.py", - runtime_env={ - "env_vars": { - "TEST_SUBPROCESS_JOB_CONFIG_ENV_VAR": "JOB_2_VAR" - } - }) - - wait_for_condition( - check_job_succeeded, job_manager=job_manager, job_id=job_id_2) - assert job_manager.get_job_stdout( - job_id_2 - ) == b"{'env_vars': {'TEST_SUBPROCESS_JOB_CONFIG_ENV_VAR': 'JOB_2_VAR'}}" # noqa: E501 - - def test_env_var_and_driver_job_config_warning(self, job_manager): - """Ensure we got error message from worker.py and job stderr - if user provided runtime_env in both driver script and submit() - """ - job_id = job_manager.submit_job( - "python subprocess_driver_scripts/override_env_var.py", - runtime_env={ - "env_vars": { - "TEST_SUBPROCESS_JOB_CONFIG_ENV_VAR": "JOB_1_VAR" - } - }) - - wait_for_condition( - check_job_succeeded, job_manager=job_manager, job_id=job_id) - assert job_manager.get_job_stdout(job_id) == b"JOB_1_VAR" - stderr = job_manager.get_job_stderr(job_id).decode("utf-8") - assert stderr.startswith( - "Both RAY_JOB_CONFIG_JSON_ENV_VAR and ray.init(runtime_env) " - "are provided") - - -def test_pass_metadata(job_manager): - def dict_to_binary(d): - return str(dict(sorted(d.items()))).encode("utf-8") - - print_metadata_cmd = ( - "python -c\"" - "import ray;" - "ray.init();" - "job_config=ray.worker.global_worker.core_worker.get_job_config();" - "print(dict(sorted(job_config.metadata.items())))" - "\"") - - # Check that we default to no metadata. - job_id = job_manager.submit_job(print_metadata_cmd) - - wait_for_condition( - check_job_succeeded, job_manager=job_manager, job_id=job_id) - assert job_manager.get_job_stdout(job_id) == dict_to_binary({ - JOB_ID_METADATA_KEY: job_id - }) - - # Check that we can pass custom metadata. - job_id = job_manager.submit_job( - print_metadata_cmd, metadata={ - "key1": "val1", - "key2": "val2" - }) - - wait_for_condition( - check_job_succeeded, job_manager=job_manager, job_id=job_id) - assert job_manager.get_job_stdout(job_id) == dict_to_binary({ - JOB_ID_METADATA_KEY: job_id, - "key1": "val1", - "key2": "val2" - }) diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index a5cc8aabd822..4db5d7e25ba0 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -295,6 +295,14 @@ py_test( deps = ["//:ray_lib"], ) +py_test( + name = "test_job_manager", + size = "small", + srcs = SRCS + ["test_job_manager.py"] + glob(["subprocess_driver_scripts/**/*.py"]), + tags = ["exclusive", "team:serve"], + deps = ["//:ray_lib"], +) + # TODO: use py_test(env = ...) in the build file with bazel 4.0 py_test( name = "test_tracing", diff --git a/python/ray/_private/job_manager/tests/subprocess_driver_scripts/__init__.py b/python/ray/tests/subprocess_driver_scripts/__init__.py similarity index 100% rename from python/ray/_private/job_manager/tests/subprocess_driver_scripts/__init__.py rename to python/ray/tests/subprocess_driver_scripts/__init__.py diff --git a/python/ray/_private/job_manager/tests/subprocess_driver_scripts/override_env_var.py b/python/ray/tests/subprocess_driver_scripts/override_env_var.py similarity index 100% rename from python/ray/_private/job_manager/tests/subprocess_driver_scripts/override_env_var.py rename to python/ray/tests/subprocess_driver_scripts/override_env_var.py diff --git a/python/ray/_private/job_manager/tests/subprocess_driver_scripts/print_namespace.py b/python/ray/tests/subprocess_driver_scripts/print_namespace.py similarity index 100% rename from python/ray/_private/job_manager/tests/subprocess_driver_scripts/print_namespace.py rename to python/ray/tests/subprocess_driver_scripts/print_namespace.py diff --git a/python/ray/_private/job_manager/tests/subprocess_driver_scripts/print_runtime_env.py b/python/ray/tests/subprocess_driver_scripts/print_runtime_env.py similarity index 100% rename from python/ray/_private/job_manager/tests/subprocess_driver_scripts/print_runtime_env.py rename to python/ray/tests/subprocess_driver_scripts/print_runtime_env.py diff --git a/python/ray/_private/job_manager/tests/subprocess_driver_scripts/script_with_exception.py b/python/ray/tests/subprocess_driver_scripts/script_with_exception.py similarity index 100% rename from python/ray/_private/job_manager/tests/subprocess_driver_scripts/script_with_exception.py rename to python/ray/tests/subprocess_driver_scripts/script_with_exception.py diff --git a/python/ray/tests/test_job_manager.py b/python/ray/tests/test_job_manager.py new file mode 100644 index 000000000000..3f54cbda6b34 --- /dev/null +++ b/python/ray/tests/test_job_manager.py @@ -0,0 +1,391 @@ +from uuid import uuid4 +import tempfile +import os +import time +import psutil + +import pytest + +import ray +from ray._private.job_manager import (JobManager, JobStatus, + JOB_ID_METADATA_KEY) +from ray._private.test_utils import SignalActor, wait_for_condition + +TEST_NAMESPACE = "jobs_test_namespace" + + +@pytest.fixture(scope="session") +def shared_ray_instance(): + yield ray.init(num_cpus=16, namespace=TEST_NAMESPACE, log_to_driver=True) + + +@pytest.fixture +def job_manager(shared_ray_instance): + yield JobManager() + + +def _driver_script_path(file_name: str) -> str: + return os.path.join( + os.path.dirname(__file__), "subprocess_driver_scripts", file_name) + + +def check_job_succeeded(job_manager, job_id): + status = job_manager.get_job_status(job_id) + if status == JobStatus.FAILED: + stdout = job_manager.get_job_stdout(job_id) + stderr = job_manager.get_job_stderr(job_id) + raise RuntimeError(f"Job failed! stdout:\n{stdout}\nstderr:\n{stderr}") + assert status in { + JobStatus.PENDING, JobStatus.RUNNING, JobStatus.SUCCEEDED + } + return status == JobStatus.SUCCEEDED + + +def check_job_failed(job_manager, job_id): + status = job_manager.get_job_status(job_id) + assert status in {JobStatus.PENDING, JobStatus.RUNNING, JobStatus.FAILED} + return status == JobStatus.FAILED + + +def check_job_stopped(job_manager, job_id): + status = job_manager.get_job_status(job_id) + assert status in {JobStatus.PENDING, JobStatus.RUNNING, JobStatus.STOPPED} + return status == JobStatus.STOPPED + + +def check_subprocess_cleaned(pid): + return psutil.pid_exists(pid) is False + + +class TestShellScriptExecution: + def test_submit_basic_echo(self, job_manager): + job_id = job_manager.submit_job("echo hello") + + wait_for_condition( + check_job_succeeded, job_manager=job_manager, job_id=job_id) + assert job_manager.get_job_stdout(job_id) == b"hello" + + def test_submit_stderr(self, job_manager): + job_id = job_manager.submit_job("echo error 1>&2") + + wait_for_condition( + check_job_succeeded, job_manager=job_manager, job_id=job_id) + assert job_manager.get_job_stderr(job_id) == b"error" + + def test_submit_ls_grep(self, job_manager): + job_id = job_manager.submit_job( + f"ls {os.path.dirname(__file__)} | grep test_job_manager.py") + + wait_for_condition( + check_job_succeeded, job_manager=job_manager, job_id=job_id) + assert job_manager.get_job_stdout(job_id) == b"test_job_manager.py" + + def test_subprocess_exception(self, job_manager): + """ + Run a python script with exception, ensure: + 1) Job status is marked as failed + 2) Job manager can surface exception message back to stderr api + 3) Job no hanging job supervisor actor + 4) Empty stdout + """ + job_id = job_manager.submit_job( + f"python {_driver_script_path('script_with_exception.py')}") + + wait_for_condition( + check_job_failed, job_manager=job_manager, job_id=job_id) + stderr = job_manager.get_job_stderr(job_id).decode("utf-8") + last_line = stderr.strip().splitlines()[-1] + assert last_line == "Exception: Script failed with exception !" + assert job_manager._get_actor_for_job(job_id) is None + assert job_manager.get_job_stdout(job_id) == b"" + + def test_submit_with_s3_runtime_env(self, job_manager): + job_id = job_manager.submit_job( + "python script.py", + runtime_env={"working_dir": "s3://runtime-env-test/script.zip"}) + + wait_for_condition( + check_job_succeeded, job_manager=job_manager, job_id=job_id) + assert job_manager.get_job_stdout( + job_id) == b"Executing main() from script.py !!" + + +class TestRuntimeEnv: + def test_inheritance(self, job_manager): + # Test that the driver and actors/tasks inherit the right runtime_env. + pass + + def test_pass_env_var(self, job_manager): + """Test we can pass env vars in the subprocess that executes job's + driver script. + """ + job_id = job_manager.submit_job( + "echo $TEST_SUBPROCESS_JOB_CONFIG_ENV_VAR", + runtime_env={ + "env_vars": { + "TEST_SUBPROCESS_JOB_CONFIG_ENV_VAR": "233" + } + }) + + wait_for_condition( + check_job_succeeded, job_manager=job_manager, job_id=job_id) + assert job_manager.get_job_stdout(job_id) == b"233" + + def test_multiple_runtime_envs(self, job_manager): + # Test that you can run two jobs in different envs without conflict. + job_id_1 = job_manager.submit_job( + f"python {_driver_script_path('print_runtime_env.py')}", + runtime_env={ + "env_vars": { + "TEST_SUBPROCESS_JOB_CONFIG_ENV_VAR": "JOB_1_VAR" + } + }) + + wait_for_condition( + check_job_succeeded, job_manager=job_manager, job_id=job_id_1) + assert job_manager.get_job_stdout( + job_id_1 + ) == b"{'env_vars': {'TEST_SUBPROCESS_JOB_CONFIG_ENV_VAR': 'JOB_1_VAR'}}" # noqa: E501 + + job_id_2 = job_manager.submit_job( + f"python {_driver_script_path('print_runtime_env.py')}", + runtime_env={ + "env_vars": { + "TEST_SUBPROCESS_JOB_CONFIG_ENV_VAR": "JOB_2_VAR" + } + }) + + wait_for_condition( + check_job_succeeded, job_manager=job_manager, job_id=job_id_2) + assert job_manager.get_job_stdout( + job_id_2 + ) == b"{'env_vars': {'TEST_SUBPROCESS_JOB_CONFIG_ENV_VAR': 'JOB_2_VAR'}}" # noqa: E501 + + def test_env_var_and_driver_job_config_warning(self, job_manager): + """Ensure we got error message from worker.py and job stderr + if user provided runtime_env in both driver script and submit() + """ + job_id = job_manager.submit_job( + f"python {_driver_script_path('override_env_var.py')}", + runtime_env={ + "env_vars": { + "TEST_SUBPROCESS_JOB_CONFIG_ENV_VAR": "JOB_1_VAR" + } + }) + + wait_for_condition( + check_job_succeeded, job_manager=job_manager, job_id=job_id) + assert job_manager.get_job_stdout(job_id) == b"JOB_1_VAR" + stderr = job_manager.get_job_stderr(job_id).decode("utf-8") + assert stderr.startswith( + "Both RAY_JOB_CONFIG_JSON_ENV_VAR and ray.init(runtime_env) " + "are provided") + + def test_failed_runtime_env_configuration(self, job_manager): + """Ensure job status is correctly set as failed if job supervisor + actor failed to setup runtime_env. + """ + with pytest.raises(RuntimeError): + job_id = job_manager.submit_job( + f"python {_driver_script_path('override_env_var.py')}", + runtime_env={"working_dir": "path_not_exist"}) + + assert job_manager.get_job_status(job_id) == JobStatus.FAILED + + def test_pass_metadata(self, job_manager): + def dict_to_binary(d): + return str(dict(sorted(d.items()))).encode("utf-8") + + print_metadata_cmd = ( + "python -c\"" + "import ray;" + "ray.init();" + "job_config=ray.worker.global_worker.core_worker.get_job_config();" + "print(dict(sorted(job_config.metadata.items())))" + "\"") + + # Check that we default to only the job ID. + job_id = job_manager.submit_job(print_metadata_cmd) + + wait_for_condition( + check_job_succeeded, job_manager=job_manager, job_id=job_id) + assert job_manager.get_job_stdout(job_id) == dict_to_binary({ + JOB_ID_METADATA_KEY: job_id + }) + + # Check that we can pass custom metadata. + job_id = job_manager.submit_job( + print_metadata_cmd, metadata={ + "key1": "val1", + "key2": "val2" + }) + + wait_for_condition( + check_job_succeeded, job_manager=job_manager, job_id=job_id) + assert job_manager.get_job_stdout(job_id) == dict_to_binary({ + JOB_ID_METADATA_KEY: job_id, + "key1": "val1", + "key2": "val2" + }) + + +class TestAsyncAPI: + def _run_hanging_command(self, + job_manager, + tmp_dir, + _start_signal_actor=None): + tmp_file = os.path.join(tmp_dir, "hello") + pid_file = os.path.join(tmp_dir, "pid") + + # Write subprocess pid to pid_file and block until tmp_file is present. + wait_for_file_cmd = (f"echo $$ > {pid_file} && " + f"until [ -f {tmp_file} ]; " + "do echo 'Waiting...' && sleep 1; " + "done") + job_id = job_manager.submit_job( + wait_for_file_cmd, _start_signal_actor=_start_signal_actor) + + for _ in range(10): + time.sleep(0.1) + status = job_manager.get_job_status(job_id) + if _start_signal_actor: + assert status == JobStatus.PENDING + stdout = job_manager.get_job_stdout(job_id) + assert b"No stdout log available yet." in stdout + else: + assert status == JobStatus.RUNNING + stdout = job_manager.get_job_stdout(job_id) + assert b"Waiting..." in stdout + + return pid_file, tmp_file, job_id + + def test_status_and_logs_while_blocking(self, job_manager): + with tempfile.TemporaryDirectory() as tmp_dir: + pid_file, tmp_file, job_id = self._run_hanging_command( + job_manager, tmp_dir) + with open(pid_file, "r") as file: + pid = int(file.read()) + assert psutil.pid_exists(pid), ( + "driver subprocess should be running") + + # Signal the job to exit by writing to the file. + with open(tmp_file, "w") as f: + print("hello", file=f) + + wait_for_condition( + check_job_succeeded, job_manager=job_manager, job_id=job_id) + # Ensure driver subprocess gets cleaned up after job reached + # termination state + wait_for_condition(check_subprocess_cleaned, pid=pid) + + def test_stop_job(self, job_manager): + with tempfile.TemporaryDirectory() as tmp_dir: + _, _, job_id = self._run_hanging_command(job_manager, tmp_dir) + + assert job_manager.stop_job(job_id) is True + wait_for_condition( + check_job_stopped, job_manager=job_manager, job_id=job_id) + + # Assert re-stopping a stopped job also returns False + assert job_manager.stop_job(job_id) is False + # Assert stopping non-existent job returns False + assert job_manager.stop_job(str(uuid4())) is False + + def test_kill_job_actor_in_before_driver_finish(self, job_manager): + """ + Test submitting a long running / blocker driver script, and kill + the job supervisor actor before script returns and ensure + + 1) Job status is correctly marked as failed + 2) No hanging subprocess from failed job + """ + + with tempfile.TemporaryDirectory() as tmp_dir: + pid_file, _, job_id = self._run_hanging_command( + job_manager, tmp_dir) + with open(pid_file, "r") as file: + pid = int(file.read()) + assert psutil.pid_exists(pid), ( + "driver subprocess should be running") + + actor = job_manager._get_actor_for_job(job_id) + ray.kill(actor, no_restart=True) + wait_for_condition( + check_job_failed, job_manager=job_manager, job_id=job_id) + + # Ensure driver subprocess gets cleaned up after job reached + # termination state + wait_for_condition(check_subprocess_cleaned, pid=pid) + + def test_stop_job_in_pending(self, job_manager): + """ + Kick off a job that is in PENDING state, stop the job and ensure + + 1) Job can correctly be stop immediately with correct JobStatus + 2) No dangling subprocess left. + """ + _start_signal_actor = SignalActor.remote() + + with tempfile.TemporaryDirectory() as tmp_dir: + pid_file, _, job_id = self._run_hanging_command( + job_manager, tmp_dir, _start_signal_actor=_start_signal_actor) + assert not os.path.exists(pid_file), ( + "driver subprocess should NOT be running while job is " + "still PENDING.") + + assert job_manager.stop_job(job_id) is True + # Send run signal to unblock run function + ray.get(_start_signal_actor.send.remote()) + wait_for_condition( + check_job_stopped, job_manager=job_manager, job_id=job_id) + + def test_kill_job_actor_in_pending(self, job_manager): + """ + Kick off a job that is in PENDING state, kill the job actor and ensure + + 1) Job can correctly be stop immediately with correct JobStatus + 2) No dangling subprocess left. + """ + _start_signal_actor = SignalActor.remote() + + with tempfile.TemporaryDirectory() as tmp_dir: + pid_file, _, job_id = self._run_hanging_command( + job_manager, tmp_dir, _start_signal_actor=_start_signal_actor) + + assert not os.path.exists(pid_file), ( + "driver subprocess should NOT be running while job is " + "still PENDING.") + + actor = job_manager._get_actor_for_job(job_id) + ray.kill(actor, no_restart=True) + wait_for_condition( + check_job_failed, job_manager=job_manager, job_id=job_id) + + def test_stop_job_subprocess_cleanup_upon_stop(self, job_manager): + """ + Ensure driver scripts' subprocess is cleaned up properly when we + stopped a running job. + + SIGTERM first, SIGKILL after 3 seconds. + """ + with tempfile.TemporaryDirectory() as tmp_dir: + pid_file, _, job_id = self._run_hanging_command( + job_manager, tmp_dir) + with open(pid_file, "r") as file: + pid = int(file.read()) + assert psutil.pid_exists(pid), ( + "driver subprocess should be running") + + assert job_manager.stop_job(job_id) is True + wait_for_condition( + check_job_stopped, job_manager=job_manager, job_id=job_id) + + # Ensure driver subprocess gets cleaned up after job reached + # termination state + wait_for_condition(check_subprocess_cleaned, pid=pid) + + +if __name__ == "__main__": + import sys + import pytest + sys.exit(pytest.main(["-v", __file__]))