Skip to content

dvc: introduce merge-driver #4298

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 85 additions & 1 deletion dvc/cache/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@
from shortuuid import uuid

import dvc.prompt as prompt
from dvc.exceptions import CheckoutError, ConfirmRemoveError, DvcException
from dvc.exceptions import (
CheckoutError,
ConfirmRemoveError,
DvcException,
MergeError,
)
from dvc.path_info import WindowsPathInfo
from dvc.progress import Tqdm
from dvc.remote.slow_link_detection import slow_link_guard
Expand Down Expand Up @@ -552,3 +557,82 @@ def get_files_number(self, path_info, hash_, filter_info):
filter_info.isin_or_eq(path_info / entry[self.tree.PARAM_CHECKSUM])
for entry in self.get_dir_cache(hash_)
)

def _to_dict(self, dir_info):
return {
entry[self.tree.PARAM_RELPATH]: entry[self.tree.PARAM_CHECKSUM]
for entry in dir_info
}

def _from_dict(self, dir_dict):
return [
{
self.tree.PARAM_RELPATH: relpath,
self.tree.PARAM_CHECKSUM: checksum,
}
for relpath, checksum in dir_dict.items()
]

@staticmethod
def _diff(ancestor, other, allow_removed=False):
from dictdiffer import diff

allowed = ["add"]
if allow_removed:
allowed.append("remove")

result = list(diff(ancestor, other))
for typ, _, _ in result:
if typ not in allowed:
raise MergeError(
"unable to auto-merge directories with diff that contains "
f"'{typ}'ed files"
)
return result

def _merge_dirs(self, ancestor_info, our_info, their_info):
from operator import itemgetter

from dictdiffer import patch

ancestor = self._to_dict(ancestor_info)
our = self._to_dict(our_info)
their = self._to_dict(their_info)

our_diff = self._diff(ancestor, our)
if not our_diff:
return self._from_dict(their)

their_diff = self._diff(ancestor, their)
if not their_diff:
return self._from_dict(our)

# make sure there are no conflicting files
self._diff(our, their, allow_removed=True)

merged = patch(our_diff + their_diff, ancestor, in_place=True)

# Sorting the list by path to ensure reproducibility
return sorted(
self._from_dict(merged), key=itemgetter(self.tree.PARAM_RELPATH)
)

def merge(self, ancestor_info, our_info, their_info):
assert our_info
assert their_info

if ancestor_info:
ancestor_hash = ancestor_info[self.tree.PARAM_CHECKSUM]
ancestor = self.get_dir_cache(ancestor_hash)
else:
ancestor = []

our_hash = our_info[self.tree.PARAM_CHECKSUM]
our = self.get_dir_cache(our_hash)

their_hash = their_info[self.tree.PARAM_CHECKSUM]
their = self.get_dir_cache(their_hash)

merged = self._merge_dirs(ancestor, our, their)
typ, merged_hash = self.tree.save_dir_info(merged)
return {typ: merged_hash}
44 changes: 44 additions & 0 deletions dvc/command/git_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,26 @@ def _run(self):
return main(["push"])


class CmdMergeDriver(CmdHookBase):
def _run(self):
from dvc.dvcfile import Dvcfile
from dvc.repo import Repo

dvc = Repo()

try:
with dvc.state:
ancestor = Dvcfile(dvc, self.args.ancestor, verify=False)
our = Dvcfile(dvc, self.args.our, verify=False)
their = Dvcfile(dvc, self.args.their, verify=False)

our.merge(ancestor, their)

return 0
finally:
dvc.close()


def add_parser(subparsers, parent_parser):
GIT_HOOK_HELP = "Run GIT hook."

Expand Down Expand Up @@ -113,3 +133,27 @@ def add_parser(subparsers, parent_parser):
"args", nargs="*", help="Arguments passed by GIT or pre-commit tool.",
)
pre_push_parser.set_defaults(func=CmdPrePush)

MERGE_DRIVER_HELP = "Run GIT merge driver."
merge_driver_parser = git_hook_subparsers.add_parser(
"merge-driver",
parents=[parent_parser],
description=MERGE_DRIVER_HELP,
help=MERGE_DRIVER_HELP,
)
merge_driver_parser.add_argument(
"--ancestor",
required=True,
help="Ancestor's version of the conflicting file.",
)
merge_driver_parser.add_argument(
"--our",
required=True,
help="Current version of the conflicting file.",
)
merge_driver_parser.add_argument(
"--their",
required=True,
help="Other branch's version of the conflicting file.",
)
merge_driver_parser.set_defaults(func=CmdMergeDriver)
29 changes: 25 additions & 4 deletions dvc/dvcfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,10 @@ def check_dvc_filename(path):
class FileMixin:
SCHEMA = None

def __init__(self, repo, path, **kwargs):
def __init__(self, repo, path, verify=True, **kwargs):
self.repo = repo
self.path = path
self.verify = verify

def __repr__(self):
return "{}: {}".format(
Expand Down Expand Up @@ -90,7 +91,8 @@ def _load(self):
# 3. path doesn't represent a regular file
if not self.exists():
raise StageFileDoesNotExistError(self.path)
check_dvc_filename(self.path)
if self.verify:
check_dvc_filename(self.path)
if not self.repo.tree.isfile(self.path):
raise StageFileIsNotDvcFileError(self.path)

Expand All @@ -115,6 +117,9 @@ def remove(self, force=False): # pylint: disable=unused-argument
def dump(self, stage, **kwargs):
raise NotImplementedError

def merge(self, ancestor, other):
raise NotImplementedError


class SingleStageFile(FileMixin):
from dvc.schema import COMPILED_SINGLE_STAGE_SCHEMA as SCHEMA
Expand All @@ -134,7 +139,8 @@ def dump(self, stage, **kwargs):
from dvc.stage import PipelineStage

assert not isinstance(stage, PipelineStage)
check_dvc_filename(self.path)
if self.verify:
check_dvc_filename(self.path)
logger.debug(
"Saving information to '{file}'.".format(file=relpath(self.path))
)
Expand All @@ -144,6 +150,14 @@ def dump(self, stage, **kwargs):
def remove_stage(self, stage): # pylint: disable=unused-argument
self.remove()

def merge(self, ancestor, other):
assert isinstance(ancestor, SingleStageFile)
assert isinstance(other, SingleStageFile)

stage = self.stage
stage.merge(ancestor.stage, other.stage)
self.dump(stage)


class PipelineFile(FileMixin):
"""Abstraction for pipelines file, .yaml + .lock combined."""
Expand All @@ -161,7 +175,8 @@ def dump(
from dvc.stage import PipelineStage

assert isinstance(stage, PipelineStage)
check_dvc_filename(self.path)
if self.verify:
check_dvc_filename(self.path)

if update_pipeline and not stage.is_data_source:
self._dump_pipeline_file(stage)
Expand Down Expand Up @@ -239,6 +254,9 @@ def remove_stage(self, stage):
else:
super().remove()

def merge(self, ancestor, other):
raise NotImplementedError

Comment on lines +257 to +259
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These stubs are for the future, as merging dvcfiles is potentially a handy thing to have.


class Lockfile(FileMixin):
from dvc.schema import COMPILED_LOCKFILE_SCHEMA as SCHEMA
Expand Down Expand Up @@ -295,6 +313,9 @@ def remove_stage(self, stage):
else:
self.remove()

def merge(self, ancestor, other):
raise NotImplementedError


class Dvcfile:
def __new__(cls, repo, path, **kwargs):
Expand Down
4 changes: 4 additions & 0 deletions dvc/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,3 +351,7 @@ def __init__(self, target, file):
f"'{target}' "
f"does not exist as an output or a stage name in '{file}'"
)


class MergeError(DvcException):
pass
35 changes: 35 additions & 0 deletions dvc/output/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from dvc.exceptions import (
CollectCacheError,
DvcException,
MergeError,
RemoteCacheRequiredError,
)

Expand Down Expand Up @@ -516,3 +517,37 @@ def _validate_output_path(cls, path, stage=None):
check = stage.repo.tree.dvcignore.check_ignore(path)
if check.match:
raise cls.IsIgnoredError(check)

def _check_can_merge(self, out):
if self.scheme != out.scheme:
raise MergeError("unable to auto-merge outputs of different types")

my = self.dumpd()
other = out.dumpd()

my.pop(self.tree.PARAM_CHECKSUM)
other.pop(self.tree.PARAM_CHECKSUM)

if my != other:
raise MergeError(
"unable to auto-merge outputs with different options"
)

if not out.is_dir_checksum:
raise MergeError(
"unable to auto-merge outputs that are not directories"
)

def merge(self, ancestor, other):
assert other

if ancestor:
self._check_can_merge(ancestor)
ancestor_info = ancestor.info
else:
ancestor_info = None

self._check_can_merge(self)
self._check_can_merge(other)

self.info = self.cache.merge(ancestor_info, self.info, other.info)
14 changes: 14 additions & 0 deletions dvc/scm/git.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,21 @@ def _install_hook(self, name):

os.chmod(hook, 0o777)

def _install_merge_driver(self):
self.repo.git.config("merge.dvc.name", "DVC merge driver")
self.repo.git.config(
"merge.dvc.driver",
(
"dvc git-hook merge-driver "
"--ancestor %O "
"--our %A "
"--their %B "
),
)

def install(self, use_pre_commit_tool=False):
self._install_merge_driver()

if not use_pre_commit_tool:
self._verify_dvc_hooks()
self._install_hook("post-checkout")
Expand Down
43 changes: 42 additions & 1 deletion dvc/stage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import dvc.dependency as dependency
import dvc.prompt as prompt
from dvc.exceptions import CheckoutError, DvcException
from dvc.exceptions import CheckoutError, DvcException, MergeError
from dvc.utils import relpath

from . import params
Expand Down Expand Up @@ -538,6 +538,44 @@ def get_used_cache(self, *args, **kwargs):

return cache

@staticmethod
def _check_can_merge(stage, ancestor_out=None):
if isinstance(stage, PipelineStage):
raise MergeError("unable to auto-merge pipeline stages")

if not stage.is_data_source or stage.deps or len(stage.outs) > 1:
raise MergeError(
"unable to auto-merge DVC-files that weren't "
"created by `dvc add`"
)

if ancestor_out and not stage.outs:
raise MergeError(
"unable to auto-merge DVC-files with deleted outputs"
)

def merge(self, ancestor, other):
assert other

if not other.outs:
return

if not self.outs:
self.outs = other.outs
return

if ancestor:
self._check_can_merge(ancestor)
outs = ancestor.outs
ancestor_out = outs[0] if outs else None
else:
ancestor_out = None

self._check_can_merge(self, ancestor_out)
self._check_can_merge(other, ancestor_out)

self.outs[0].merge(ancestor_out, other.outs[0])


class PipelineStage(Stage):
def __init__(self, *args, name=None, **kwargs):
Expand Down Expand Up @@ -577,3 +615,6 @@ def changed_stage(self):

def _changed_stage_entry(self):
return f"'cmd' of {self} has changed."

def merge(self, ancestor, other):
raise NotImplementedError
Loading