Skip to content

dvc: implement params support for pipeline file #3694

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dvc/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
LOCK_FILE_STAGE_SCHEMA = {
Required(StageParams.PARAM_CMD): str,
StageParams.PARAM_DEPS: [DATA_SCHEMA],
StageParams.PARAM_PARAMS: {str: {str: object}},
StageParams.PARAM_OUTS: [DATA_SCHEMA],
}
LOCKFILE_SCHEMA = {str: LOCK_FILE_STAGE_SCHEMA}
Expand All @@ -30,6 +31,7 @@
StageParams.PARAM_CMD: str,
Optional(StageParams.PARAM_WDIR): str,
Optional(StageParams.PARAM_DEPS): [str],
Optional(StageParams.PARAM_PARAMS): [Any(str, {str: [str]})],
Optional(StageParams.PARAM_LOCKED): bool,
Optional(StageParams.PARAM_META): object,
Optional(StageParams.PARAM_ALWAYS_CHANGED): bool,
Expand Down
55 changes: 51 additions & 4 deletions dvc/serialize.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
from typing import TYPE_CHECKING

from funcy import rpartial, lsplit

from dvc.dependency import ParamsDependency
from dvc.utils.collections import apply_diff
from dvc.utils.stage import parse_stage_for_update
from typing import List

if TYPE_CHECKING:
from dvc.stage import PipelineStage, Stage

PARAM_PATH = ParamsDependency.PARAM_PATH
PARAM_PARAMS = ParamsDependency.PARAM_PARAMS
DEFAULT_PARAMS_FILE = ParamsDependency.DEFAULT_PARAMS_FILE


def _get_outs(stage: "PipelineStage"):
outs_bucket = {}
Expand All @@ -21,14 +29,50 @@ def _get_outs(stage: "PipelineStage"):
return outs_bucket


def get_params_deps(stage: "PipelineStage"):
return lsplit(rpartial(isinstance, ParamsDependency), stage.deps)


def _serialize_params(params: List[ParamsDependency]):
"""Return two types of values from stage:

`keys` - which is list of params without values, used in a pipeline file

which is in the shape of:
['lr', 'train', {'params2.yaml': ['lr']}]
`key_vals` - which is list of params with values, used in a lockfile
which is in the shape of:
{'params.yaml': {'lr': '1', 'train': 2}, {'params2.yaml': {'lr': '1'}}
"""
keys = []
key_vals = {}

for param_dep in params:
dump = param_dep.dumpd()
path, params = dump[PARAM_PATH], dump[PARAM_PARAMS]
k = list(params.keys())
if not k:
continue
# if it's not a default file, change the shape
# to: {path: k}
keys.extend(k if path == DEFAULT_PARAMS_FILE else [{path: k}])
key_vals.update({path: params})

return keys, key_vals


def to_pipeline_file(stage: "PipelineStage"):
params, deps = get_params_deps(stage)
serialized_params, _ = _serialize_params(params)

return {
stage.name: {
key: value
for key, value in {
stage.PARAM_CMD: stage.cmd,
stage.PARAM_WDIR: stage.resolve_wdir(),
stage.PARAM_DEPS: [d.def_path for d in stage.deps],
stage.PARAM_DEPS: [d.def_path for d in deps],
stage.PARAM_PARAMS: serialized_params,
**_get_outs(stage),
stage.PARAM_LOCKED: stage.locked,
stage.PARAM_ALWAYS_CHANGED: stage.always_changed,
Expand All @@ -43,17 +87,20 @@ def to_lockfile(stage: "PipelineStage") -> dict:
assert stage.name

res = {"cmd": stage.cmd}
params, deps = get_params_deps(stage)
deps = [
{"path": dep.def_path, dep.checksum_type: dep.get_checksum()}
for dep in stage.deps
for dep in deps
]
outs = [
{"path": out.def_path, out.checksum_type: out.get_checksum()}
for out in stage.outs
]
if stage.deps:
if deps:
res["deps"] = deps
if stage.outs:
if params:
_, res["params"] = _serialize_params(params)
if outs:
res["outs"] = outs

return {stage.name: res}
Expand Down
72 changes: 69 additions & 3 deletions dvc/stage/loader.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,30 @@
import collections
import logging
import os

from copy import deepcopy
from collections import defaultdict, Mapping
from itertools import chain

from funcy import first

from dvc import dependency, output
from .exceptions import StageNameUnspecified, StageNotFound
from ..dependency import ParamsDependency

logger = logging.getLogger(__name__)


DEFAULT_PARAMS_FILE = ParamsDependency.DEFAULT_PARAMS_FILE


def resolve_paths(path, wdir=None):
path = os.path.abspath(path)
wdir = wdir or os.curdir
wdir = os.path.abspath(os.path.join(os.path.dirname(path), wdir))
return path, wdir


class StageLoader(collections.abc.Mapping):
class StageLoader(Mapping):
def __init__(self, dvcfile, stages_data, lockfile_data=None):
self.dvcfile = dvcfile
self.stages_data = stages_data or {}
Expand Down Expand Up @@ -54,6 +60,64 @@ def _fill_lock_checksums(stage, lock_data):
.get(item.checksum_type)
)

@classmethod
def _load_params(cls, stage, pipeline_params, lock_params=None):
"""
File in pipeline file is expected to be in following format:
```
params:
- lr
- train.epochs
- params2.yaml: # notice the filename
- process.threshold
- process.bow
```

and, in lockfile, we keep it as following format:
```
params:
params.yaml:
lr: 0.0041
train.epochs: 100
params2.yaml:
process.threshold: 0.98
process.bow:
- 15000
- 123
```

So, here, we merge these two formats into one (ignoring one's only
specified on lockfile but missing on pipeline file), and load the
`ParamsDependency` for the given stage.

In the list of `params` inside pipeline file, if any of the item is
dict-like, the key will be treated as separate params file and it's
values to be part of that params file, else, the item is considered
as part of the `params.yaml` which is a default file.

(From example above: `lr` is considered to be part of `params.yaml`
whereas `process.bow` to be part of `params2.yaml`.)
"""
res = defaultdict(lambda: defaultdict(dict))
lock_params = lock_params or {}

def get_value(file, param):
return lock_params.get(file, {}).get(param)

for key in pipeline_params:
if isinstance(key, str):
path = DEFAULT_PARAMS_FILE
res[path][key] = get_value(path, key)
elif isinstance(key, dict):
path = first(key)
for k in key[path]:
res[path][k] = get_value(path, k)

stage.deps += dependency.loadd_from(
stage,
[{"path": key, "params": params} for key, params in res.items()],
)

@classmethod
def load_stage(cls, dvcfile, name, stage_data, lock_data):
from . import PipelineStage, Stage, loads_from
Expand All @@ -63,8 +127,10 @@ def load_stage(cls, dvcfile, name, stage_data, lock_data):
)
stage = loads_from(PipelineStage, dvcfile.repo, path, wdir, stage_data)
stage.name = name
params = stage_data.pop("params", {})
stage._fill_stage_dependencies(**stage_data)
stage._fill_stage_outputs(**stage_data)
cls._load_params(stage, params, lock_data.get("params"))
if lock_data:
stage.cmd_changed = lock_data.get(
Stage.PARAM_CMD
Expand Down Expand Up @@ -102,7 +168,7 @@ def __contains__(self, name):
return name in self.stages_data


class SingleStageLoader(collections.abc.Mapping):
class SingleStageLoader(Mapping):
def __init__(self, dvcfile, stage_data, stage_text=None, tag=None):
self.dvcfile = dvcfile
self.stage_data = stage_data or {}
Expand Down
1 change: 1 addition & 0 deletions dvc/stage/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ class StageParams:
PARAM_LOCKED = "locked"
PARAM_META = "meta"
PARAM_ALWAYS_CHANGED = "always_changed"
PARAM_PARAMS = "params"


class OutputParams(Enum):
Expand Down
17 changes: 17 additions & 0 deletions tests/func/params/test_diff.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,20 @@ def test_diff_with_unchanged(tmp_dir, scm, dvc):
"xyz": {"old": "val", "new": "val"},
}
}


def test_pipeline_tracked_params(tmp_dir, scm, dvc, run_copy):
from dvc.dvcfile import PIPELINE_FILE

tmp_dir.gen({"foo": "foo", "params.yaml": "foo: bar\nxyz: val"})
run_copy("foo", "bar", name="copy-foo-bar", params=["foo,xyz"])

scm.add(["params.yaml", PIPELINE_FILE])
scm.commit("add stage")

tmp_dir.scm_gen("params.yaml", "foo: baz\nxyz: val", commit="baz")
tmp_dir.scm_gen("params.yaml", "foo: qux\nxyz: val", commit="qux")

assert dvc.params.diff(a_rev="HEAD~2") == {
"params.yaml": {"foo": {"old": "bar", "new": "qux"}}
}
16 changes: 16 additions & 0 deletions tests/func/params/test_show.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,19 @@ def test_show_branch(tmp_dir, scm, dvc):
"working tree": {"params.yaml": {"foo": "bar"}},
"branch": {"params.yaml": {"foo": "baz"}},
}


def test_pipeline_tracked_params(tmp_dir, scm, dvc, run_copy):
from dvc.dvcfile import PIPELINE_FILE

tmp_dir.gen({"foo": "foo", "params.yaml": "foo: bar\nxyz: val"})
run_copy("foo", "bar", name="copy-foo-bar", params=["foo,xyz"])
scm.add(["params.yaml", PIPELINE_FILE])
scm.commit("add stage")

tmp_dir.scm_gen("params.yaml", "foo: baz\nxyz: val", commit="baz")
tmp_dir.scm_gen("params.yaml", "foo: qux\nxyz: val", commit="qux")

assert dvc.params.show(revs=["master"]) == {
"master": {"params.yaml": {"foo": "qux", "xyz": "val"}}
}
60 changes: 60 additions & 0 deletions tests/func/test_repro_multistage.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import os
from copy import deepcopy
from textwrap import dedent

import pytest
import yaml
from funcy import lsplit

from dvc.dvcfile import PIPELINE_FILE, PIPELINE_LOCK
from dvc.exceptions import CyclicGraphError
Expand Down Expand Up @@ -485,3 +488,60 @@ def test_cyclic_graph_error(tmp_dir, dvc, run_copy):
dump_stage_file(PIPELINE_FILE, data)
with pytest.raises(CyclicGraphError):
dvc.reproduce(":copy-baz-foo")


def test_repro_multiple_params(tmp_dir, dvc):
from tests.func.test_run_multistage import supported_params

from dvc.serialize import get_params_deps

with (tmp_dir / "params2.yaml").open("w+") as f:
yaml.dump(supported_params, f)

with (tmp_dir / "params.yaml").open("w+") as f:
yaml.dump(supported_params, f)

(tmp_dir / "foo").write_text("foo")
stage = dvc.run(
name="read_params",
deps=["foo"],
outs=["bar"],
params=[
"params2.yaml:lists,floats,name",
"answer,floats,nested.nested1",
],
cmd="cat params2.yaml params.yaml > bar",
)

params, deps = get_params_deps(stage)
assert len(params) == 2
assert len(deps) == 1
assert len(stage.outs) == 1

lockfile = stage.dvcfile._lockfile
assert lockfile.load()["read_params"]["params"] == {
"params2.yaml": {
"lists": [42, 42.0, "42"],
"floats": 42.0,
"name": "Answer",
},
"params.yaml": {
"answer": 42,
"floats": 42.0,
"nested.nested1": {"nested2": "42", "nested2-2": 41.99999},
},
}
data, _ = stage.dvcfile._load()
params = data["stages"]["read_params"]["params"]

custom, defaults = lsplit(lambda v: isinstance(v, dict), params)
assert set(custom[0]["params2.yaml"]) == {"name", "lists", "floats"}
assert set(defaults) == {"answer", "floats", "nested.nested1"}

assert not dvc.reproduce(stage.addressing)
with (tmp_dir / "params.yaml").open("w+") as f:
params = deepcopy(supported_params)
params["answer"] = 43
yaml.dump(params, f)

assert dvc.reproduce(stage.addressing) == [stage]
Loading