Skip to content

cache: drop dos2unix behavior and move cache to files/md5/ prefix #9538

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 38 additions & 3 deletions dvc/cachemgr.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,32 @@
import os
from typing import Optional, Tuple

from dvc.fs import GitFileSystem, Schemes
from dvc_data.hashfile.db import get_odb
from dvc_data.hashfile.hash import DEFAULT_ALGORITHM

LEGACY_HASH_NAMES = {"md5-dos2unix", "params"}

def _get_odb(repo, settings, fs=None):

def _get_odb(
repo,
settings,
fs=None,
prefix: Optional[Tuple[str, ...]] = None,
hash_name: Optional[str] = None,
**kwargs,
):
from dvc.fs import get_cloud_fs

if not settings:
return None

cls, config, fs_path = get_cloud_fs(repo, **settings)
fs = fs or cls(**config)
if prefix:
fs_path = fs.path.join(fs_path, *prefix)
if hash_name:
config["hash_name"] = hash_name
return get_odb(fs, fs_path, state=repo.state, **config)


Expand All @@ -24,6 +39,7 @@ class CacheManager:
Schemes.HDFS,
Schemes.WEBHDFS,
]
FILES_DIR = "files"

def __init__(self, repo):
self._repo = repo
Expand Down Expand Up @@ -53,15 +69,26 @@ def __init__(self, repo):
if not isinstance(repo.fs, GitFileSystem):
kwargs["fs"] = repo.fs

odb = _get_odb(repo, settings, **kwargs)
odb = _get_odb(
repo,
settings,
prefix=(self.FILES_DIR, DEFAULT_ALGORITHM),
**kwargs,
)
self._odb["repo"] = odb
self._odb[Schemes.LOCAL] = odb
legacy_odb = _get_odb(repo, settings, hash_name="md5-dos2unix", **kwargs)
self._odb["legacy"] = legacy_odb

def _init_odb(self, schemes):
for scheme in schemes:
remote = self.config.get(scheme)
settings = {"name": remote} if remote else None
self._odb[scheme] = _get_odb(self._repo, settings)
self._odb[scheme] = _get_odb(
self._repo,
settings,
prefix=(self.FILES_DIR, DEFAULT_ALGORITHM),
)

def __getattr__(self, name):
if name not in self._odb and name in self.CLOUD_SCHEMES:
Expand All @@ -75,3 +102,11 @@ def __getattr__(self, name):
def by_scheme(self):
self._init_odb(self.CLOUD_SCHEMES)
yield from self._odb.items()

@property
def local_cache_dir(self) -> str:
"""Return base local cache directory without any prefixes.

(i.e. `dvc cache dir`).
"""
return self.legacy.path
145 changes: 131 additions & 14 deletions dvc/data_cloud.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
"""Manages dvc remotes that user can use with push/pull/status commands."""

import logging
from typing import TYPE_CHECKING, Iterable, Optional
from typing import TYPE_CHECKING, Iterable, Optional, Set, Tuple

from dvc.config import NoRemoteError, RemoteConfigError
from dvc.utils.objects import cached_property
from dvc_data.hashfile.db import get_index
from dvc_data.hashfile.transfer import TransferResult

if TYPE_CHECKING:
from dvc.fs import FileSystem
from dvc_data.hashfile.db import HashFileDB
from dvc_data.hashfile.hash_info import HashInfo
from dvc_data.hashfile.status import CompareStatusResult
from dvc_data.hashfile.transfer import TransferResult

logger = logging.getLogger(__name__)

Expand All @@ -29,13 +29,40 @@ def __init__(self, name: str, path: str, fs: "FileSystem", *, index=None, **conf

@cached_property
def odb(self) -> "HashFileDB":
from dvc.cachemgr import CacheManager
from dvc_data.hashfile.db import get_odb
from dvc_data.hashfile.hash import DEFAULT_ALGORITHM

path = self.path
if self.worktree:
path = self.fs.path.join(path, ".dvc", "cache")
path = self.fs.path.join(
path, ".dvc", CacheManager.FILES_DIR, DEFAULT_ALGORITHM
)
else:
path = self.fs.path.join(path, CacheManager.FILES_DIR, DEFAULT_ALGORITHM)
return get_odb(self.fs, path, hash_name=DEFAULT_ALGORITHM, **self.config)

@cached_property
def legacy_odb(self) -> "HashFileDB":
from dvc_data.hashfile.db import get_odb

path = self.path
return get_odb(self.fs, path, hash_name="md5-dos2unix", **self.config)

return get_odb(self.fs, path, hash_name="md5", **self.config)

def _split_legacy_hash_infos(
hash_infos: Iterable["HashInfo"],
) -> Tuple[Set["HashInfo"], Set["HashInfo"]]:
from dvc.cachemgr import LEGACY_HASH_NAMES

legacy = set()
default = set()
for hi in hash_infos:
if hi.name in LEGACY_HASH_NAMES:
legacy.add(hi)
else:
default.add(hi)
return legacy, default


class DataCloud:
Expand Down Expand Up @@ -101,12 +128,17 @@ def get_remote_odb(
self,
name: Optional[str] = None,
command: str = "<command>",
hash_name: str = "md5",
) -> "HashFileDB":
from dvc.cachemgr import LEGACY_HASH_NAMES

remote = self.get_remote(name=name, command=command)
if remote.fs.version_aware or remote.worktree:
raise NoRemoteError(
f"'{command}' is unsupported for cloud versioned remotes"
)
if hash_name in LEGACY_HASH_NAMES:
return remote.legacy_odb
return remote.odb

def _log_missing(self, status: "CompareStatusResult"):
Expand Down Expand Up @@ -150,14 +182,40 @@ def push(
By default remote from core.remote config option is used.
odb: optional ODB to push to. Overrides remote.
"""
odb = odb or self.get_remote_odb(remote, "push")
if odb is not None:
return self._push(objs, jobs=jobs, odb=odb)
legacy_objs, default_objs = _split_legacy_hash_infos(objs)
result = TransferResult(set(), set())
if legacy_objs:
odb = self.get_remote_odb(remote, "push", hash_name="md5-dos2unix")
t, f = self._push(legacy_objs, jobs=jobs, odb=odb)
result.transferred.update(t)
result.failed.update(f)
if default_objs:
odb = self.get_remote_odb(remote, "push")
t, f = self._push(default_objs, jobs=jobs, odb=odb)
result.transferred.update(t)
result.failed.update(f)
return result

def _push(
self,
objs: Iterable["HashInfo"],
*,
jobs: Optional[int] = None,
odb: "HashFileDB",
) -> "TransferResult":
if odb.hash_name == "md5-dos2unix":
cache = self.repo.cache.legacy
else:
cache = self.repo.cache.local
return self.transfer(
self.repo.cache.local,
cache,
odb,
objs,
jobs=jobs,
dest_index=get_index(odb),
cache_odb=self.repo.cache.local,
cache_odb=cache,
validate_status=self._log_missing,
)

Expand All @@ -177,14 +235,41 @@ def pull(
By default remote from core.remote config option is used.
odb: optional ODB to pull from. Overrides remote.
"""
odb = odb or self.get_remote_odb(remote, "pull")
if odb is not None:
return self._pull(objs, jobs=jobs, odb=odb)
legacy_objs, default_objs = _split_legacy_hash_infos(objs)
result = TransferResult(set(), set())
if legacy_objs:
odb = self.get_remote_odb(remote, "pull", hash_name="md5-dos2unix")
assert odb.hash_name == "md5-dos2unix"
t, f = self._pull(legacy_objs, jobs=jobs, odb=odb)
result.transferred.update(t)
result.failed.update(f)
if default_objs:
odb = self.get_remote_odb(remote, "pull")
t, f = self._pull(default_objs, jobs=jobs, odb=odb)
result.transferred.update(t)
result.failed.update(f)
return result

def _pull(
self,
objs: Iterable["HashInfo"],
*,
jobs: Optional[int] = None,
odb: "HashFileDB",
) -> "TransferResult":
if odb.hash_name == "md5-dos2unix":
cache = self.repo.cache.legacy
else:
cache = self.repo.cache.local
return self.transfer(
odb,
self.repo.cache.local,
cache,
objs,
jobs=jobs,
src_index=get_index(odb),
cache_odb=self.repo.cache.local,
cache_odb=cache,
verify=odb.verify,
validate_status=self._log_missing,
)
Expand All @@ -206,17 +291,49 @@ def status(
is used.
odb: optional ODB to check status from. Overrides remote.
"""
from dvc_data.hashfile.status import CompareStatusResult

if odb is not None:
return self._status(objs, jobs=jobs, odb=odb)
result = CompareStatusResult(set(), set(), set(), set())
legacy_objs, default_objs = _split_legacy_hash_infos(objs)
if legacy_objs:
odb = self.get_remote_odb(remote, "status", hash_name="md5-dos2unix")
assert odb.hash_name == "md5-dos2unix"
o, m, n, d = self._status(legacy_objs, jobs=jobs, odb=odb)
result.ok.update(o)
result.missing.update(m)
result.new.update(n)
result.deleted.update(d)
if default_objs:
odb = self.get_remote_odb(remote, "status")
o, m, n, d = self._status(default_objs, jobs=jobs, odb=odb)
result.ok.update(o)
result.missing.update(m)
result.new.update(n)
result.deleted.update(d)
return result

def _status(
self,
objs: Iterable["HashInfo"],
*,
jobs: Optional[int] = None,
odb: "HashFileDB",
):
from dvc_data.hashfile.status import compare_status

if not odb:
odb = self.get_remote_odb(remote, "status")
if odb.hash_name == "md5-dos2unix":
cache = self.repo.cache.legacy
else:
cache = self.repo.cache.local
return compare_status(
self.repo.cache.local,
cache,
odb,
objs,
jobs=jobs,
dest_index=get_index(odb),
cache_odb=self.repo.cache.local,
cache_odb=cache,
)

def get_url_for(self, remote, checksum):
Expand Down
3 changes: 2 additions & 1 deletion dvc/dependency/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ def loadd_from(stage, d_list):
for d in d_list:
p = d.pop(Output.PARAM_PATH, None)
files = d.pop(Output.PARAM_FILES, None)
ret.append(_get(stage, p, d, files=files))
hash_name = d.pop(Output.PARAM_HASH, None)
ret.append(_get(stage, p, d, files=files, hash_name=hash_name))
return ret


Expand Down
1 change: 1 addition & 0 deletions dvc/dependency/param.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def __init__(self, stage, path, params=None, repo=None):
repo = repo or stage.repo
path = path or os.path.join(repo.root_dir, self.DEFAULT_PARAMS_FILE)
super().__init__(stage, path, repo=repo)
self.hash_name = self.PARAM_PARAMS
self.hash_info = hash_info

def dumpd(self, **kwargs):
Expand Down
2 changes: 1 addition & 1 deletion dvc/dependency/repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def _make_fs(
from dvc.fs import DVCFileSystem

config = {"cache": self.repo.config["cache"]}
config["cache"]["dir"] = self.repo.cache.local.path
config["cache"]["dir"] = self.repo.cache.local_cache_dir

return DVCFileSystem(
url=self.def_repo[self.PARAM_URL],
Expand Down
2 changes: 1 addition & 1 deletion dvc/info.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def _get_caches(cache):
caches = (
cache_type
for cache_type, cache_instance in cache.by_scheme()
if cache_instance and cache_type != "repo"
if cache_instance and cache_type not in ("repo", "legacy")
)

# Caches will be always non-empty including the local cache
Expand Down
Loading