Skip to content

tests: StageLoader for Pipeline file stages #3976

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 17 additions & 33 deletions dvc/stage/loader.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,19 @@
import logging
import os
from collections.abc import Mapping
from copy import deepcopy
from itertools import chain

from funcy import lcat, project
from funcy import get_in, lcat, project

from dvc import dependency, output

from ..dependency import ParamsDependency
from . import fill_stage_dependencies
from . import PipelineStage, Stage, loads_from
from .exceptions import StageNameUnspecified, StageNotFound
from .params import StageParams
from .utils import fill_stage_dependencies, resolve_paths

logger = logging.getLogger(__name__)

DEFAULT_PARAMS_FILE = ParamsDependency.DEFAULT_PARAMS_FILE


def resolve_paths(path, wdir=None):
path = os.path.abspath(path)
wdir = wdir or os.curdir
wdir = os.path.abspath(os.path.join(os.path.dirname(path), wdir))
return path, wdir


class StageLoader(Mapping):
def __init__(self, dvcfile, stages_data, lockfile_data=None):
Expand All @@ -31,10 +22,12 @@ def __init__(self, dvcfile, stages_data, lockfile_data=None):
self.lockfile_data = lockfile_data or {}

@staticmethod
def fill_from_lock(stage, lock_data):
def fill_from_lock(stage, lock_data=None):
"""Fill values for params, checksums for outs and deps from lock."""
from .params import StageParams
if not lock_data:
return

assert isinstance(lock_data, dict)
items = chain(
((StageParams.PARAM_DEPS, dep) for dep in stage.deps),
((StageParams.PARAM_OUTS, out) for out in stage.outs),
Expand All @@ -45,21 +38,16 @@ def fill_from_lock(stage, lock_data):
for key in [StageParams.PARAM_DEPS, StageParams.PARAM_OUTS]
}
for key, item in items:
if isinstance(item, ParamsDependency):
# load the params with values inside lock dynamically
lock_params = lock_data.get(stage.PARAM_PARAMS, {})
item.fill_values(lock_params.get(item.def_path, {}))
path = item.def_path
if isinstance(item, dependency.ParamsDependency):
item.fill_values(get_in(lock_data, [stage.PARAM_PARAMS, path]))
continue

item.checksum = (
checksums.get(key, {})
.get(item.def_path, {})
.get(item.checksum_type)
)
item.checksum = get_in(checksums, [key, path, item.checksum_type])

@classmethod
def load_stage(cls, dvcfile, name, stage_data, lock_data):
from . import PipelineStage, Stage, loads_from
def load_stage(cls, dvcfile, name, stage_data, lock_data=None):
assert all([name, dvcfile, dvcfile.repo, dvcfile.path])
assert stage_data and isinstance(stage_data, dict)

path, wdir = resolve_paths(
dvcfile.path, stage_data.get(Stage.PARAM_WDIR)
Expand All @@ -80,11 +68,9 @@ def load_stage(cls, dvcfile, name, stage_data, lock_data):
)

if lock_data:
stage.cmd_changed = lock_data.get(
Stage.PARAM_CMD
) != stage_data.get(Stage.PARAM_CMD)
cls.fill_from_lock(stage, lock_data)
stage.cmd_changed = lock_data.get(Stage.PARAM_CMD) != stage.cmd

cls.fill_from_lock(stage, lock_data)
return stage

def __getitem__(self, name):
Expand Down Expand Up @@ -137,8 +123,6 @@ def __getitem__(self, item):

@classmethod
def load_stage(cls, dvcfile, d, stage_text):
from dvc.stage import Stage, loads_from

path, wdir = resolve_paths(dvcfile.path, d.get(Stage.PARAM_WDIR))
stage = loads_from(Stage, dvcfile.repo, path, wdir, d)
stage._stage_text = stage_text
Expand Down
7 changes: 7 additions & 0 deletions dvc/stage/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,13 @@ def resolve_wdir(wdir, path):
return pathlib.PurePath(rel_wdir).as_posix() if rel_wdir != "." else None


def resolve_paths(path, wdir=None):
path = os.path.abspath(path)
wdir = wdir or os.curdir
wdir = os.path.abspath(os.path.join(os.path.dirname(path), wdir))
return path, wdir


def get_dump(stage):
return {
key: value
Expand Down
229 changes: 229 additions & 0 deletions tests/unit/stage/test_loader_pipeline_file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
import os
from copy import deepcopy
from itertools import chain

import pytest

from dvc.dvcfile import PIPELINE_FILE, Dvcfile
from dvc.serialize import get_params_deps
from dvc.stage import PipelineStage, create_stage
from dvc.stage.loader import StageLoader


@pytest.fixture
def stage_data():
return {"cmd": "command", "deps": ["foo"], "outs": ["bar"]}


@pytest.fixture
def lock_data():
return {
"cmd": "command",
"deps": [{"path": "foo", "md5": "foo_checksum"}],
"outs": [{"path": "bar", "md5": "bar_checksum"}],
}


def test_fill_from_lock_deps_outs(dvc, lock_data):
stage = create_stage(
PipelineStage, dvc, PIPELINE_FILE, deps=["foo"], outs=["bar"]
)

for item in chain(stage.deps, stage.outs):
assert not item.checksum and not item.info

StageLoader.fill_from_lock(stage, lock_data)

assert stage.deps[0].info == {"md5": "foo_checksum"}
assert stage.outs[0].info == {"md5": "bar_checksum"}


def test_fill_from_lock_params(dvc, lock_data):
stage = create_stage(
PipelineStage,
dvc,
PIPELINE_FILE,
deps=["foo"],
outs=["bar"],
params=[
"lorem",
"lorem.ipsum",
{"myparams.yaml": ["ipsum", "foobar"]},
],
)
lock_data["params"] = {
"params.yaml": {
"lorem": "lorem",
"lorem.ipsum": ["i", "p", "s", "u", "m"],
},
"myparams.yaml": {
# missing value in lock for `foobar` params
"ipsum": "ipsum"
},
}
params_deps = get_params_deps(stage)[0]
assert set(params_deps[0].params) == {"lorem", "lorem.ipsum"}
assert set(params_deps[1].params) == {"ipsum", "foobar"}
assert not params_deps[0].info
assert not params_deps[1].info

StageLoader.fill_from_lock(stage, lock_data)
assert params_deps[0].info == lock_data["params"]["params.yaml"]
assert params_deps[1].info == lock_data["params"]["myparams.yaml"]


def test_fill_from_lock_missing_params_section(dvc, lock_data):
stage = create_stage(
PipelineStage,
dvc,
PIPELINE_FILE,
deps=["foo"],
outs=["bar"],
params=["lorem", "lorem.ipsum", {"myparams.yaml": ["ipsum"]}],
)
params_deps = get_params_deps(stage)[0]
StageLoader.fill_from_lock(stage, lock_data)
assert not params_deps[0].info and not params_deps[1].info


def test_fill_from_lock_missing_checksums(dvc, lock_data):
stage = create_stage(
PipelineStage,
dvc,
PIPELINE_FILE,
deps=["foo", "foo1"],
outs=["bar", "bar1"],
)

StageLoader.fill_from_lock(stage, lock_data)

assert stage.deps[0].info == {"md5": "foo_checksum"}
assert stage.outs[0].info == {"md5": "bar_checksum"}
assert not stage.deps[1].checksum and not stage.outs[1].checksum


def test_fill_from_lock_use_appropriate_checksum(dvc, lock_data):
stage = create_stage(
PipelineStage,
dvc,
PIPELINE_FILE,
deps=["s3://dvc-temp/foo"],
outs=["bar"],
)
lock_data["deps"] = [
{"path": "s3://dvc-temp/foo", "md5": "high five", "etag": "e-tag"}
]
StageLoader.fill_from_lock(stage, lock_data)
assert stage.deps[0].checksum == "e-tag"
assert stage.outs[0].checksum == "bar_checksum"


def test_fill_from_lock_with_missing_sections(dvc, lock_data):
stage = create_stage(
PipelineStage, dvc, PIPELINE_FILE, deps=["foo"], outs=["bar"]
)
lock = deepcopy(lock_data)
del lock["deps"]
StageLoader.fill_from_lock(stage, lock)
assert not stage.deps[0].checksum
assert stage.outs[0].checksum == "bar_checksum"

lock = deepcopy(lock_data)
del lock["outs"]
StageLoader.fill_from_lock(stage, lock)
assert stage.deps[0].checksum == "foo_checksum"
assert not stage.outs[0].checksum


def test_fill_from_lock_empty_data(dvc):
stage = create_stage(
PipelineStage, dvc, PIPELINE_FILE, deps=["foo"], outs=["bar"]
)
StageLoader.fill_from_lock(stage, None)
assert not stage.deps[0].checksum and not stage.outs[0].checksum
StageLoader.fill_from_lock(stage, {})
assert not stage.deps[0].checksum and not stage.outs[0].checksum


def test_load_stage(dvc, stage_data, lock_data):
dvcfile = Dvcfile(dvc, PIPELINE_FILE)
stage = StageLoader.load_stage(dvcfile, "stage-1", stage_data, lock_data)

assert stage.wdir == os.path.abspath(os.curdir)
assert stage.name == "stage-1"
assert stage.cmd == "command"
assert stage.path == os.path.abspath(PIPELINE_FILE)
assert stage.deps[0].def_path == "foo"
assert stage.deps[0].checksum == "foo_checksum"
assert stage.outs[0].def_path == "bar"
assert stage.outs[0].checksum == "bar_checksum"


def test_load_stage_outs_with_flags(dvc, stage_data, lock_data):
stage_data["outs"] = [{"foo": {"cache": False}}]
dvcfile = Dvcfile(dvc, PIPELINE_FILE)
stage = StageLoader.load_stage(dvcfile, "stage-1", stage_data, lock_data)
assert stage.outs[0].use_cache is False


def test_load_stage_no_lock(dvc, stage_data):
dvcfile = Dvcfile(dvc, PIPELINE_FILE)
stage = StageLoader.load_stage(dvcfile, "stage-1", stage_data)
assert stage.deps[0].def_path == "foo" and stage.outs[0].def_path == "bar"
assert not stage.deps[0].checksum
assert not stage.outs[0].checksum


def test_load_stage_with_params(dvc, stage_data, lock_data):
lock_data["params"] = {"params.yaml": {"lorem": "ipsum"}}
stage_data["params"] = ["lorem"]
dvcfile = Dvcfile(dvc, PIPELINE_FILE)
stage = StageLoader.load_stage(dvcfile, "stage-1", stage_data, lock_data)

params, deps = get_params_deps(stage)
assert deps[0].def_path == "foo" and stage.outs[0].def_path == "bar"
assert params[0].def_path == "params.yaml"
assert params[0].info == {"lorem": "ipsum"}
assert deps[0].checksum == "foo_checksum"
assert stage.outs[0].checksum == "bar_checksum"


@pytest.mark.parametrize("typ", ["metrics", "plots"])
def test_load_stage_with_metrics_and_plots(dvc, stage_data, lock_data, typ):
stage_data[typ] = stage_data.pop("outs")
dvcfile = Dvcfile(dvc, PIPELINE_FILE)
stage = StageLoader.load_stage(dvcfile, "stage-1", stage_data, lock_data)

assert stage.outs[0].def_path == "bar"
assert stage.outs[0].checksum == "bar_checksum"


def test_load_changed_command(dvc, stage_data, lock_data):
dvcfile = Dvcfile(dvc, PIPELINE_FILE)
stage = StageLoader.load_stage(dvcfile, "stage-1", stage_data)
assert not stage.cmd_changed
assert stage.cmd == "command"

lock_data["cmd"] = "different-command"
stage = StageLoader.load_stage(dvcfile, "stage-1", stage_data, lock_data)
assert stage.cmd_changed
assert stage.cmd == "command"


def test_load_stage_wdir_and_path_correctly(dvc, stage_data, lock_data):
stage_data["wdir"] = "dir"
dvcfile = Dvcfile(dvc, PIPELINE_FILE)
stage = StageLoader.load_stage(dvcfile, "stage-1", stage_data, lock_data)

assert stage.wdir == os.path.abspath("dir")
assert stage.path == os.path.abspath(PIPELINE_FILE)


def test_load_stage_mapping(dvc, stage_data, lock_data):
dvcfile = Dvcfile(dvc, PIPELINE_FILE)
loader = StageLoader(dvcfile, {"stage": stage_data}, {"stage": lock_data})
assert len(loader) == 1
assert "stage" in loader
assert "stage1" not in loader
assert loader.keys() == {"stage"}
assert isinstance(loader["stage"], PipelineStage)
20 changes: 20 additions & 0 deletions tests/unit/stage/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import os

from dvc.stage.utils import resolve_paths


def test_resolve_paths():
p = os.path.join("dir", "subdir")
file_path = os.path.join(p, "dvc.yaml")

path, wdir = resolve_paths(path=file_path, wdir="dir")
assert path == os.path.abspath(file_path)
assert wdir == os.path.abspath(os.path.join(p, "dir"))

path, wdir = resolve_paths(path=file_path)
assert path == os.path.abspath(file_path)
assert wdir == os.path.abspath(p)

path, wdir = resolve_paths(path=file_path, wdir="../../some-dir")
assert path == os.path.abspath(file_path)
assert wdir == os.path.abspath("some-dir")