Skip to content

Commit 1937527

Browse files
skshetryefiop
andauthored
dvc: implement multistage dvcfile (#3676)
* dvc: use lockfile as yaml instead of json * refactor Dvcfile into Pipeline file and Single stage file * fix tests * dvc: fix outputs * add more tests for collection of outputs * add tests for data cloud/get/import/ls * tests: test for checkouts * Allow other checksums other than md5 * tests: use iterdir instead of os.listdir * cleanup errors reported by cc and ds * utils: throw DvcException instead of plain Exception * tests: use yaml.load instead of json.load * run: split assignments * Update dvc/stage/exceptions.py Co-authored-by: Ruslan Kuprieiev <[email protected]>
1 parent e03128a commit 1937527

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+1159
-663
lines changed

dvc/command/pipeline.py

Lines changed: 9 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -8,25 +8,15 @@
88
logger = logging.getLogger(__name__)
99

1010

11-
def _stage_repr(stage):
12-
from dvc.stage import PipelineStage
13-
14-
return (
15-
"{}:{}".format(stage.relpath, stage.name)
16-
if isinstance(stage, PipelineStage)
17-
else stage.relpath
18-
)
19-
20-
2111
class CmdPipelineShow(CmdBase):
2212
def _show(self, target, commands, outs, locked):
2313
import networkx
2414
from dvc import dvcfile
2515
from dvc.utils import parse_target
2616

27-
path, name = parse_target(target)
28-
stage = dvcfile.Dvcfile(self.repo, path).stages[name]
29-
G = self.repo.pipeline_graph
17+
path, name, tag = parse_target(target)
18+
stage = dvcfile.Dvcfile(self.repo, path, tag=tag).stages[name]
19+
G = self.repo.graph
3020
stages = networkx.dfs_postorder_nodes(G, stage)
3121
if locked:
3222
stages = [s for s in stages if s.locked]
@@ -40,16 +30,16 @@ def _show(self, target, commands, outs, locked):
4030
for out in stage.outs:
4131
logger.info(str(out))
4232
else:
43-
logger.info(_stage_repr(stage))
33+
logger.info(stage.addressing)
4434

4535
def _build_graph(self, target, commands=False, outs=False):
4636
import networkx
4737
from dvc import dvcfile
4838
from dvc.repo.graph import get_pipeline
4939
from dvc.utils import parse_target
5040

51-
path, name = parse_target(target)
52-
target_stage = dvcfile.Dvcfile(self.repo, path).stages[name]
41+
path, name, tag = parse_target(target)
42+
target_stage = dvcfile.Dvcfile(self.repo, path, tag=tag).stages[name]
5343
G = get_pipeline(self.repo.pipelines, target_stage)
5444

5545
nodes = set()
@@ -62,7 +52,7 @@ def _build_graph(self, target, commands=False, outs=False):
6252
for out in stage.outs:
6353
nodes.add(str(out))
6454
else:
65-
nodes.add(_stage_repr(stage))
55+
nodes.add(stage.addressing)
6656

6757
edges = []
6858
for from_stage, to_stage in networkx.edge_dfs(G, target_stage):
@@ -75,7 +65,7 @@ def _build_graph(self, target, commands=False, outs=False):
7565
for to_out in to_stage.outs:
7666
edges.append((str(from_out), str(to_out)))
7767
else:
78-
edges.append((_stage_repr(from_stage), _stage_repr(to_stage)))
68+
edges.append((from_stage.addressing, to_stage.addressing))
7969

8070
return list(nodes), edges, networkx.is_tree(G)
8171

@@ -163,7 +153,7 @@ def run(self):
163153
pipelines = self.repo.pipelines
164154
for pipeline in pipelines:
165155
for stage in pipeline:
166-
logger.info(_stage_repr(stage))
156+
logger.info(stage.addressing)
167157
if len(pipeline) != 0:
168158
logger.info("=" * 80)
169159
logger.info("{} pipelines total".format(len(pipelines)))

0 commit comments

Comments
 (0)