-
Notifications
You must be signed in to change notification settings - Fork 1.2k
checkpoints: exp run
and exp res[ume]
refactor
#4855
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
5ce2482
to
414d513
Compare
* since executor has to move to threadpool execution anyways, we no longer have to work around the pickle repro result limitation
* regular outputs may not be written at the time a checkpoint is generated, but subsequent checkouts of that stage should not fail due to the missing outs
* refactor signaling (use threading.Event instead of Condition) * fix windows bug (kill all child processes via taskkill)
import networkx as nx | ||
|
||
if single_item: | ||
all_pipelines = stages | ||
else: | ||
all_pipelines = [] | ||
for stage in stages: | ||
if downstream: | ||
# NOTE (py3 only): | ||
# Python's `deepcopy` defaults to pickle/unpickle the object. | ||
# Stages are complex objects (with references to `repo`, | ||
# `outs`, and `deps`) that cause struggles when you try | ||
# to serialize them. We need to create a copy of the graph | ||
# itself, and then reverse it, instead of using | ||
# graph.reverse() directly because it calls `deepcopy` | ||
# underneath -- unless copy=False is specified. | ||
nodes = nx.dfs_postorder_nodes( | ||
G.copy().reverse(copy=False), stage | ||
) | ||
all_pipelines += reversed(list(nodes)) | ||
else: | ||
all_pipelines += nx.dfs_postorder_nodes(G, stage) | ||
|
||
pipeline = [] | ||
for stage in all_pipelines: | ||
if stage not in pipeline: | ||
pipeline.append(stage) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code is just moved into _get_pipeline()
to fix flake8 complexity error
def _kill_nt(proc): | ||
# windows stages are spawned with shell=True, proc is the shell process and | ||
# not the actual stage process - we have to kill the entire tree | ||
subprocess.call(["taskkill", "/F", "/T", "/PID", str(proc.pid)]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is something that we could consider doing via psutil
, which has the logic needed to actually walk Windows process trees and find each child process individually (native python does not provide an easy way to do that). psutil has Windows wheels for <3.9 but still does not have any wheels for linux/mac, so if we were going to re-introduce it as a dependency it would have to be Windows only.
But calling taskkill
here also works for our purposes
exp run
and exp res[ume]
refactorexp run
and exp res[ume]
refactor
…limit * 'master' of github.com:iterative/dvc: dag: add --outs option (iterative#4739) Add test server and tests for webdav (iterative#4827) Simpler param updates with python-benedict (iterative#4780) checkpoints: set DVC_ROOT environment variable (iterative#4877) api: add support for simple wildcards (iterative#4864) tests: mark azure test as flaky (iterative#4881) setup.py: limit responses version for moto (iterative#4879) remote: avoid chunking on webdav. Fixes iterative#4796 (iterative#4828) checkpoints: `exp run` and `exp res[ume]` refactor (iterative#4855)
❗ I have followed the Contributing to DVC checklist.
📖 If this PR requires documentation updates, I have created a separate PR (or issue, at least) in dvc.org and linked it here.
Thank you for the contribution - we'll try to review it as soon as possible. 🙏
Related to #4821.
checkpoint
outputsdvc exp run --checkpoint
is replaced withdvc exp run
dvc exp run -f
.dvc exp run --continue
is replaced withdvc exp res[ume]