Skip to content

[jobs] two-hop file_mounts upload #4708

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion sky/backends/cloud_vm_ray_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -3233,7 +3233,14 @@ def _sync_file_mounts(
all_file_mounts: Optional[Dict[Path, Path]],
storage_mounts: Optional[Dict[Path, storage_lib.Storage]],
) -> None:
"""Mounts all user files to the remote nodes."""
"""Mounts all user files to the remote nodes.

Note: This does not handle COPY storage_mounts. These should have
already been translated into file_mounts by task.sync_storage_mounts().

TODO: Delete COPY storage_mounts in task.sync_storage_mounts(), and
assert here that all storage_mounts are MOUNT mode.
"""
with rich_utils.safe_status(ux_utils.spinner_message('Syncing files')):
controller_utils.replace_skypilot_config_path_in_file_mounts(
handle.launched_resources.cloud, all_file_mounts)
Expand Down
17 changes: 17 additions & 0 deletions sky/jobs/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import multiprocessing
import os
import pathlib
import shutil
import time
import traceback
import typing
Expand All @@ -17,6 +18,7 @@
from sky import sky_logging
from sky.backends import backend_utils
from sky.backends import cloud_vm_ray_backend
from sky.data import data_utils
from sky.jobs import recovery_strategy
from sky.jobs import scheduler
from sky.jobs import state as managed_job_state
Expand Down Expand Up @@ -488,6 +490,7 @@ def _cleanup(job_id: int, dag_yaml: str):
cluster_name = managed_job_utils.generate_managed_job_cluster_name(
task.name, job_id)
managed_job_utils.terminate_cluster(cluster_name)

# Clean up Storages with persistent=False.
# TODO(zhwu): this assumes the specific backend.
backend = cloud_vm_ray_backend.CloudVmRayBackend()
Expand All @@ -499,6 +502,20 @@ def _cleanup(job_id: int, dag_yaml: str):
storage.construct()
backend.teardown_ephemeral_storage(task)

# Clean up any files mounted from the local disk, such as two-hop file
# mounts.
for file_mount in (task.file_mounts or {}).values():
try:
if not data_utils.is_cloud_store_url(file_mount):
path = os.path.expanduser(file_mount)
if os.path.isdir(path):
shutil.rmtree(path)
else:
os.remove(path)
except Exception as e: # pylint: disable=broad-except
logger.warning(
f'Failed to clean up file mount {file_mount}: {e}')


def start(job_id, dag_yaml):
"""Start the controller."""
Expand Down
34 changes: 31 additions & 3 deletions sky/jobs/server/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from sky import task as task_lib
from sky.backends import backend_utils
from sky.clouds.service_catalog import common as service_catalog_common
from sky.data import storage as storage_lib
from sky.jobs import constants as managed_job_constants
from sky.jobs import utils as managed_job_utils
from sky.provision import common
Expand Down Expand Up @@ -100,9 +101,35 @@ def launch(

with rich_utils.safe_status(
ux_utils.spinner_message('Initializing managed job')):
for task_ in dag.tasks:
controller_utils.maybe_translate_local_file_mounts_and_sync_up(
task_, task_type='jobs')

local_to_controller_file_mounts = {}

if storage_lib.get_cached_enabled_storage_clouds_or_refresh():
for task_ in dag.tasks:
controller_utils.maybe_translate_local_file_mounts_and_sync_up(
task_, task_type='jobs')

else:
# We do not have any cloud storage available, so fall back to
# two-hop file_mount uploading.
# Note: we can't easily hack sync_storage_mounts() to upload
# directly to the controller, because the controller may not
# even be up yet.
for task_ in dag.tasks:
if task_.storage_mounts:
# Technically, we could convert COPY storage_mounts that
# have a local source and do not specify `store`, but we
# will not do that for now. Only plain file_mounts are
# supported.
raise exceptions.NotSupportedError(
'Cloud-based file_mounts are specified, but no cloud '
'storage is available. Please specify local '
'file_mounts only.')

# Merge file mounts from all tasks.
local_to_controller_file_mounts.update(
controller_utils.translate_local_file_mounts_to_two_hop(
task_))

with tempfile.NamedTemporaryFile(prefix=f'managed-dag-{dag.name}-',
mode='w') as f:
Expand All @@ -119,6 +146,7 @@ def launch(
vars_to_fill = {
'remote_user_yaml_path': remote_user_yaml_path,
'user_yaml_path': f.name,
'local_to_controller_file_mounts': local_to_controller_file_mounts,
'jobs_controller': controller_name,
# Note: actual cluster name will be <task.name>-<managed job ID>
'dag_name': dag.name,
Expand Down
9 changes: 3 additions & 6 deletions sky/skylet/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,12 +281,9 @@
# linking. E.g., in our API server deployment on k8s, ~/.sky/ is mounted from a
# persistent volume, so any contents in ~/.sky/ cannot be hard linked elsewhere.
FILE_MOUNTS_LOCAL_TMP_BASE_PATH = '~/.sky/tmp/'

# Used when an managed jobs are created and
# files are synced up to the cloud.
FILE_MOUNTS_WORKDIR_SUBPATH = 'job-{run_id}/workdir'
FILE_MOUNTS_SUBPATH = 'job-{run_id}/local-file-mounts/{i}'
FILE_MOUNTS_TMP_SUBPATH = 'job-{run_id}/tmp-files'
# Base path for two-hop file mounts translation. See
# controller_utils.translate_local_file_mounts_to_two_hop().
FILE_MOUNTS_CONTROLLER_TMP_BASE_PATH = '~/.sky/tmp/controller'

# Used when an managed jobs are created and
# files are synced up to the cloud.
Expand Down
6 changes: 6 additions & 0 deletions sky/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -1132,6 +1132,12 @@ def sync_storage_mounts(self) -> None:
raise ValueError(f'Storage Type {store_type} '
'does not exist!')

# TODO: Delete from storage_mounts, now that the storage is
# translated into file_mounts. Note: as is, this will break
# controller_utils.
# _maybe_translate_local_file_mounts_and_sync_up(), which still
# needs the storage, but not the file_mounts.

def get_local_to_remote_file_mounts(self) -> Optional[Dict[str, str]]:
"""Returns file mounts of the form (dst=VM path, src=local path).

Expand Down
3 changes: 3 additions & 0 deletions sky/templates/jobs-controller.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ file_mounts:
{%- for remote_catalog_path, local_catalog_path in modified_catalogs.items() %}
{{remote_catalog_path}}: {{local_catalog_path}}
{%- endfor %}
{%- for controller_file_mount_path, local_file_mount_path in local_to_controller_file_mounts.items() %}
{{controller_file_mount_path}}: {{local_file_mount_path}}
{%- endfor %}

setup: |
{{ sky_activate_python_env }}
Expand Down
68 changes: 66 additions & 2 deletions sky/utils/controller_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,66 @@ def replace_skypilot_config_path_in_file_mounts(
f'with the real path in file mounts: {file_mounts}')


def _generate_run_uuid() -> str:
"""Generates a unique run id for the job."""
return common_utils.base36_encode(uuid.uuid4().hex)[:8]


def translate_local_file_mounts_to_two_hop(
task: 'task_lib.Task') -> Dict[str, str]:
"""Translates local->VM mounts into two-hop file mounts.

This strategy will upload the local files to the controller first, using a
normal rsync as part of sky.launch() for the controller. Then, when the
controller launches the task, it will also use local file_mounts from the
destination path of the first hop.

Local machine/API server Controller Job cluster
------------------------ ----------------------- --------------------
| local path ----|--|-> controller path --|--|-> job dst path |
------------------------ ----------------------- --------------------

Returns:
A dict mapping from controller file mount path to local file mount path
for the first hop. The task is updated in-place to do the second hop.
"""
first_hop_file_mounts = {}
second_hop_file_mounts = {}

run_id = _generate_run_uuid()
base_tmp_dir = os.path.join(constants.FILE_MOUNTS_CONTROLLER_TMP_BASE_PATH,
run_id)

# Use a simple counter to create unique paths within the base_tmp_dir for
# each mount.
file_mount_id = 0

file_mounts_to_translate = task.file_mounts or {}
if task.workdir is not None:
file_mounts_to_translate[constants.SKY_REMOTE_WORKDIR] = task.workdir
task.workdir = None

for job_cluster_path, local_path in file_mounts_to_translate.items():
if data_utils.is_cloud_store_url(
local_path) or data_utils.is_cloud_store_url(job_cluster_path):
raise exceptions.NotSupportedError(
'Cloud-based file_mounts are specified, but no cloud storage '
'is available. Please specify local file_mounts only.')

controller_path = os.path.join(base_tmp_dir, f'{file_mount_id}')
file_mount_id += 1
first_hop_file_mounts[controller_path] = local_path
second_hop_file_mounts[job_cluster_path] = controller_path

# Use set_file_mounts to override existing file mounts, if they exist.
task.set_file_mounts(second_hop_file_mounts)

# Return the first hop info so that it can be added to the jobs-controller
# YAML.
return first_hop_file_mounts


# (maybe translate local file mounts) and (sync up)
def maybe_translate_local_file_mounts_and_sync_up(task: 'task_lib.Task',
task_type: str) -> None:
"""Translates local->VM mounts into Storage->VM, then syncs up any Storage.
Expand Down Expand Up @@ -694,7 +754,7 @@ def _sub_path_join(sub_path: Optional[str], path: str) -> str:
# We should not use common_utils.get_usage_run_id() here, because when
# Python API is used, the run id will be the same across multiple
# jobs.launch/serve.up calls after the sky is imported.
run_id = common_utils.base36_encode(uuid.uuid4().hex)[:8]
run_id = _generate_run_uuid()
user_hash = common_utils.get_user_hash()
original_file_mounts = task.file_mounts if task.file_mounts else {}
original_storage_mounts = task.storage_mounts if task.storage_mounts else {}
Expand Down Expand Up @@ -853,7 +913,11 @@ def _sub_path_join(sub_path: Optional[str], path: str) -> str:
# Step 4: Upload storage from sources
# Upload the local source to a bucket. The task will not be executed
# locally, so we need to upload the files/folders to the bucket manually
# here before sending the task to the remote jobs controller.
# here before sending the task to the remote jobs controller. This will
# also upload any storage mounts that are not translated. After
# sync_storage_mounts, we will also have file_mounts in the task, but
# these aren't used since the storage_mounts for the same paths take
# precedence.
if task.storage_mounts:
# There may be existing (non-translated) storage mounts, so log this
# whenever task.storage_mounts is non-empty.
Expand Down