From 2dc536d8641642c5c65e4721b7a11c173dcf2b27 Mon Sep 17 00:00:00 2001 From: James Taylor Date: Thu, 13 Jul 2017 17:56:01 -0400 Subject: [PATCH 1/5] Retain "outdir" in WorkflowJob.job. Otherwise, the `--tmp-outdir-prefix` option passed on the command line is not respected, and outputs are instead written to the system temp directory. --- cwltool/workflow.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/cwltool/workflow.py b/cwltool/workflow.py index 2a34a757e..8133ffa2d 100644 --- a/cwltool/workflow.py +++ b/cwltool/workflow.py @@ -361,9 +361,6 @@ def job(self, joborder, output_callback, **kwargs): self.state = {} self.processStatus = "success" - if "outdir" in kwargs: - del kwargs["outdir"] - for e, i in enumerate(self.tool["inputs"]): with SourceLine(self.tool["inputs"], e, WorkflowException, _logger.isEnabledFor(logging.DEBUG)): iid = shortname(i["id"]) From 53354ecce6aa693ab973e03fd2feeac368ecef07 Mon Sep 17 00:00:00 2001 From: "Michael R. Crusoe" Date: Fri, 11 May 2018 07:51:37 -0700 Subject: [PATCH 2/5] Revert "Retain "outdir" in WorkflowJob.job." This reverts commit dd77756bbb6d59abc55d956c9ef947ce4b8282a7. --- cwltool/workflow.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cwltool/workflow.py b/cwltool/workflow.py index 8133ffa2d..2a34a757e 100644 --- a/cwltool/workflow.py +++ b/cwltool/workflow.py @@ -361,6 +361,9 @@ def job(self, joborder, output_callback, **kwargs): self.state = {} self.processStatus = "success" + if "outdir" in kwargs: + del kwargs["outdir"] + for e, i in enumerate(self.tool["inputs"]): with SourceLine(self.tool["inputs"], e, WorkflowException, _logger.isEnabledFor(logging.DEBUG)): iid = shortname(i["id"]) From 257c102ab5463334cd64c562d49af68283ebb6bf Mon Sep 17 00:00:00 2001 From: "Michael R. Crusoe" Date: Fri, 11 May 2018 07:52:13 -0700 Subject: [PATCH 3/5] respect tmp-outdir-prefix when containerless --- cwltool/process.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cwltool/process.py b/cwltool/process.py index 374679402..d427e95a7 100644 --- a/cwltool/process.py +++ b/cwltool/process.py @@ -566,6 +566,7 @@ def _init_job(self, joborder, **kwargs): select_resources: callback to select compute resources debug: enable debugging output js_console: enable javascript console output + tmp_outdir_prefix: Path prefix for intermediate output directories """ builder = Builder() @@ -622,7 +623,8 @@ def _init_job(self, joborder, **kwargs): builder.tmpdir = builder.fs_access.docker_compatible_realpath(kwargs.get("docker_tmpdir") or "/tmp") builder.stagedir = builder.fs_access.docker_compatible_realpath(kwargs.get("docker_stagedir") or "/var/lib/cwl") else: - builder.outdir = builder.fs_access.realpath(kwargs.get("outdir") or tempfile.mkdtemp()) + builder.outdir = builder.fs_access.realpath(kwargs.get("outdir") + or tempfile.mkdtemp(prefix=kwargs["tmp_outdir_prefix"])) if self.tool[u"class"] != 'Workflow': builder.tmpdir = builder.fs_access.realpath(kwargs.get("tmpdir") or tempfile.mkdtemp()) builder.stagedir = builder.fs_access.realpath(kwargs.get("stagedir") or tempfile.mkdtemp()) From 58bc0d4156e9ad70973af67032037a103de0e596 Mon Sep 17 00:00:00 2001 From: "Michael R. Crusoe" Date: Fri, 11 May 2018 08:09:49 -0700 Subject: [PATCH 4/5] report docker version --- jenkins.bat | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jenkins.bat b/jenkins.bat index 2b7a84cba..fa6439835 100644 --- a/jenkins.bat +++ b/jenkins.bat @@ -2,7 +2,7 @@ set PATH=%PATH%;"C:\\Program Files\\Docker Toolbox\\" docker-machine start default REM Set the environment variables to use docker-machine and docker commands FOR /f "tokens=*" %%i IN ('docker-machine env --shell cmd default') DO %%i - +docker version python setup.py test --addopts "--junit-xml=tests.xml --cov-report xml --cov cwltool" pip install codecov codecov From 1cdc519bf711af37c21825a295fb509046da4e3f Mon Sep 17 00:00:00 2001 From: "Michael R. Crusoe" Date: Mon, 14 May 2018 23:54:08 -0700 Subject: [PATCH 5/5] remove dry-run, add tmp-outdir-prefix to docker --- cwltool/docker.py | 72 ++++++++++++++++++++++-------------------- cwltool/executors.py | 11 +++---- cwltool/job.py | 60 ++++++++++++++++++++--------------- cwltool/singularity.py | 21 ++++++------ cwltool/workflow.py | 10 ++---- tox.ini | 1 + 6 files changed, 89 insertions(+), 86 deletions(-) diff --git a/cwltool/docker.py b/cwltool/docker.py index c07282089..acea09bd6 100644 --- a/cwltool/docker.py +++ b/cwltool/docker.py @@ -67,8 +67,11 @@ def _check_docker_machine_path(path): # type: (Text) -> None class DockerCommandLineJob(ContainerCommandLineJob): @staticmethod - def get_image(dockerRequirement, pull_image, dry_run=False, force_pull=False): - # type: (Dict[Text, Text], bool, bool, bool) -> bool + def get_image(dockerRequirement, # type: Dict[Text, Text] + pull_image, # type: bool + force_pull=False, # type: bool + tmp_outdir_prefix=None # type: Text + ): # type: (...) -> bool found = False if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement: @@ -108,48 +111,44 @@ def get_image(dockerRequirement, pull_image, dry_run=False, force_pull=False): if "dockerPull" in dockerRequirement: cmd = ["docker", "pull", str(dockerRequirement["dockerPull"])] _logger.info(Text(cmd)) - if not dry_run: - subprocess.check_call(cmd, stdout=sys.stderr) - found = True + subprocess.check_call(cmd, stdout=sys.stderr) + found = True elif "dockerFile" in dockerRequirement: - dockerfile_dir = str(tempfile.mkdtemp()) + dockerfile_dir = str(tempfile.mkdtemp(prefix=tmp_outdir_prefix)) with open(os.path.join(dockerfile_dir, "Dockerfile"), "wb") as df: df.write(dockerRequirement["dockerFile"].encode('utf-8')) cmd = ["docker", "build", "--tag=%s" % str(dockerRequirement["dockerImageId"]), dockerfile_dir] _logger.info(Text(cmd)) - if not dry_run: - subprocess.check_call(cmd, stdout=sys.stderr) - found = True + subprocess.check_call(cmd, stdout=sys.stderr) + found = True elif "dockerLoad" in dockerRequirement: cmd = ["docker", "load"] _logger.info(Text(cmd)) - if not dry_run: - if os.path.exists(dockerRequirement["dockerLoad"]): - _logger.info(u"Loading docker image from %s", dockerRequirement["dockerLoad"]) - with open(dockerRequirement["dockerLoad"], "rb") as f: - loadproc = subprocess.Popen(cmd, stdin=f, stdout=sys.stderr) - else: - loadproc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=sys.stderr) - _logger.info(u"Sending GET request to %s", dockerRequirement["dockerLoad"]) - req = requests.get(dockerRequirement["dockerLoad"], stream=True) - n = 0 - for chunk in req.iter_content(1024 * 1024): - n += len(chunk) - _logger.info("\r%i bytes" % (n)) - loadproc.stdin.write(chunk) - loadproc.stdin.close() - rcode = loadproc.wait() - if rcode != 0: - raise WorkflowException("Docker load returned non-zero exit status %i" % (rcode)) - found = True + if os.path.exists(dockerRequirement["dockerLoad"]): + _logger.info(u"Loading docker image from %s", dockerRequirement["dockerLoad"]) + with open(dockerRequirement["dockerLoad"], "rb") as f: + loadproc = subprocess.Popen(cmd, stdin=f, stdout=sys.stderr) + else: + loadproc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=sys.stderr) + _logger.info(u"Sending GET request to %s", dockerRequirement["dockerLoad"]) + req = requests.get(dockerRequirement["dockerLoad"], stream=True) + n = 0 + for chunk in req.iter_content(1024 * 1024): + n += len(chunk) + _logger.info("\r%i bytes" % (n)) + loadproc.stdin.write(chunk) + loadproc.stdin.close() + rcode = loadproc.wait() + if rcode != 0: + raise WorkflowException("Docker load returned non-zero exit status %i" % (rcode)) + found = True elif "dockerImport" in dockerRequirement: cmd = ["docker", "import", str(dockerRequirement["dockerImport"]), str(dockerRequirement["dockerImageId"])] _logger.info(Text(cmd)) - if not dry_run: - subprocess.check_call(cmd, stdout=sys.stderr) - found = True + subprocess.check_call(cmd, stdout=sys.stderr) + found = True if found: with found_images_lock: @@ -157,8 +156,13 @@ def get_image(dockerRequirement, pull_image, dry_run=False, force_pull=False): return found - def get_from_requirements(self, r, req, pull_image, dry_run=False, force_pull=False): - # type: (Dict[Text, Text], bool, bool, bool, bool) -> Text + def get_from_requirements(self, + r, # type: Dict[Text, Text] + req, # type: bool + pull_image, # type: bool + force_pull=False, # type: bool + tmp_outdir_prefix=None # type: Text + ): # type: (...) -> Text if r: errmsg = None try: @@ -174,7 +178,7 @@ def get_from_requirements(self, r, req, pull_image, dry_run=False, force_pull=Fa else: return None - if self.get_image(r, pull_image, dry_run, force_pull=force_pull): + if self.get_image(r, pull_image, force_pull, tmp_outdir_prefix): return r["dockerImageId"] else: if req: diff --git a/cwltool/executors.py b/cwltool/executors.py index 8727ab244..d291875a6 100644 --- a/cwltool/executors.py +++ b/cwltool/executors.py @@ -43,19 +43,18 @@ def run_jobs(self, ): pass - def execute(self, t, # type: Process + def execute(self, + t, # type: Process job_order_object, # type: Dict[Text, Any] logger=_logger, - **kwargs # type: Any - ): - # type: (...) -> Tuple[Dict[Text, Any], Text] + **kwargs # type: Any + ): # type: (...) -> Tuple[Dict[Text, Any], Text] if "basedir" not in kwargs: raise WorkflowException("Must provide 'basedir' in kwargs") finaloutdir = os.path.abspath(kwargs.get("outdir")) if kwargs.get("outdir") else None - kwargs["outdir"] = tempfile.mkdtemp(prefix=kwargs["tmp_outdir_prefix"]) if kwargs.get( - "tmp_outdir_prefix") else tempfile.mkdtemp() + kwargs["outdir"] = tempfile.mkdtemp(prefix=kwargs.get("tmp_outdir_prefix")) self.output_dirs.add(kwargs["outdir"]) kwargs["mutation_manager"] = MutationManager() kwargs["toplevel"] = True diff --git a/cwltool/job.py b/cwltool/job.py index 4ede40278..5afd42621 100644 --- a/cwltool/job.py +++ b/cwltool/job.py @@ -175,8 +175,14 @@ def _setup(self, kwargs): # type: (Dict) -> None _logger.debug(u"[job %s] initial work dir %s", self.name, json.dumps({p: self.generatemapper.mapper(p) for p in self.generatemapper.files()}, indent=4)) - def _execute(self, runtime, env, rm_tmpdir=True, move_outputs="move", secret_store=None): - # type: (List[Text], MutableMapping[Text, Text], bool, Text, SecretStore) -> None + def _execute(self, + runtime, # type:List[Text] + env, # type: MutableMapping[Text, Text] + rm_tmpdir=True, # type: bool + move_outputs="move", # type: Text + secret_store=None, # type: SecretStore + tmp_outdir_prefix=None # type: Text + ): # type: (...) -> None scr, _ = get_feature(self, "ShellCommandRequirement") @@ -228,14 +234,9 @@ def _execute(self, runtime, env, rm_tmpdir=True, move_outputs="move", secret_sto if builder is not None: job_script_contents = builder.build_job_script(commands) rcode = _job_popen( - commands, - stdin_path=stdin_path, - stdout_path=stdout_path, - stderr_path=stderr_path, - env=env, - cwd=self.outdir, - job_script_contents=job_script_contents, - ) + commands, stdin_path, stdout_path, stderr_path, env, + self.outdir, tempfile.mkdtemp(prefix=tmp_outdir_prefix), + job_script_contents) if self.successCodes and rcode in self.successCodes: processStatus = "success" @@ -334,15 +335,22 @@ def run(self, pull_image=True, rm_container=True, stageFiles(self.generatemapper, ignoreWritable=self.inplace_update, symLink=True, secret_store=kwargs.get("secret_store")) relink_initialworkdir(self.generatemapper, self.outdir, self.builder.outdir, inplace_update=self.inplace_update) - self._execute([], env, rm_tmpdir=rm_tmpdir, move_outputs=move_outputs, secret_store=kwargs.get("secret_store")) + self._execute( + [], env, rm_tmpdir, move_outputs, kwargs.get("secret_store"), + kwargs.get("tmp_outdir_prefix")) class ContainerCommandLineJob(JobBase): __metaclass__ = ABCMeta @abstractmethod - def get_from_requirements(self, r, req, pull_image, dry_run=False, force_pull=False): - # type: (Dict[Text, Text], bool, bool, bool, bool) -> Text + def get_from_requirements(self, + r, # type: Dict[Text, Text] + req, # type: bool + pull_image, # type: bool + force_pull=False, # type: bool + tmp_outdir_prefix=None # type: Text + ): # type: (...) -> Text pass @abstractmethod @@ -377,7 +385,9 @@ def run(self, pull_image=True, rm_container=True, try: env = cast(MutableMapping[Text, Text], os.environ) if docker_req and kwargs.get("use_container"): - img_id = str(self.get_from_requirements(docker_req, True, pull_image, force_pull=kwargs.get("force_docker_pull"))) + img_id = str(self.get_from_requirements(docker_req, True, + pull_image, kwargs.get("force_docker_pull"), + kwargs.get('tmp_outdir_prefix'))) if img_id is None: if self.builder.find_default_container: default_container = self.builder.find_default_container() @@ -404,20 +414,21 @@ def run(self, pull_image=True, rm_container=True, runtime = self.create_runtime(env, rm_container, record_container_id, cidfile_dir, cidfile_prefix, **kwargs) runtime.append(img_id) - self._execute(runtime, env, rm_tmpdir=rm_tmpdir, move_outputs=move_outputs, secret_store=kwargs.get("secret_store")) + self._execute( + runtime, env, rm_tmpdir, move_outputs, kwargs.get("secret_store"), + kwargs.get("tmp_outdir_prefix")) def _job_popen( - commands, # type: List[Text] - stdin_path, # type: Text - stdout_path, # type: Text - stderr_path, # type: Text + commands, # type: List[Text] + stdin_path, # type: Text + stdout_path, # type: Text + stderr_path, # type: Text env, # type: Union[MutableMapping[Text, Text], MutableMapping[str, str]] - cwd, # type: Text - job_dir=None, # type: Text + cwd, # type: Text + job_dir, # type: Text job_script_contents=None, # type: Text -): - # type: (...) -> int + ): # type: (...) -> int if not job_script_contents and not FORCE_SHELLED_POPEN: stdin = None # type: Union[IO[Any], int] @@ -464,9 +475,6 @@ def _job_popen( return rcode else: - if job_dir is None: - job_dir = tempfile.mkdtemp(prefix="cwltooljob") - if not job_script_contents: job_script_contents = SHELL_COMMAND_TEMPLATE diff --git a/cwltool/singularity.py b/cwltool/singularity.py index 47a6ca872..253d587de 100644 --- a/cwltool/singularity.py +++ b/cwltool/singularity.py @@ -50,7 +50,6 @@ class SingularityCommandLineJob(ContainerCommandLineJob): @staticmethod def get_image(dockerRequirement, # type: Dict[Text, Text] pull_image, # type: bool - dry_run=False, # type: bool force_pull=False # type: bool ): # type: (...) -> bool @@ -95,9 +94,8 @@ def get_image(dockerRequirement, # type: Dict[Text, Text] str(dockerRequirement["dockerImageId"]), str(dockerRequirement["dockerPull"])] _logger.info(Text(cmd)) - if not dry_run: - check_call(cmd, stdout=sys.stderr) - found = True + check_call(cmd, stdout=sys.stderr) + found = True elif "dockerFile" in dockerRequirement: raise WorkflowException(SourceLine( dockerRequirement, 'dockerFile').makeError( @@ -117,13 +115,12 @@ def get_image(dockerRequirement, # type: Dict[Text, Text] return found def get_from_requirements(self, - r, # type: Optional[Dict[Text, Text]] - req, # type: bool - pull_image, # type: bool - dry_run=False, # type: bool - force_pull=False # type: bool - ): - # type: (...) -> Text + r, # type: Optional[Dict[Text, Text]] + req, # type: bool + pull_image, # type: bool + force_pull=False, # type: bool + tmp_outdir_prefix=None # type: Text + ): # type: (...) -> Text """ Returns the filename of the Singularity image (e.g. hello-world-latest.img). @@ -144,7 +141,7 @@ def get_from_requirements(self, else: return None - if self.get_image(r, pull_image, dry_run, force_pull): + if self.get_image(r, pull_image, force_pull): return os.path.abspath(r["dockerImageId"]) else: if req: diff --git a/cwltool/workflow.py b/cwltool/workflow.py index 2a34a757e..57d04ec6f 100644 --- a/cwltool/workflow.py +++ b/cwltool/workflow.py @@ -179,14 +179,8 @@ def __init__(self, workflow, **kwargs): self.state = None # type: Dict[Text, WorkflowStateItem] self.processStatus = None # type: Text self.did_callback = False - - if "outdir" in kwargs: - self.outdir = kwargs["outdir"] - elif "tmp_outdir_prefix" in kwargs: - self.outdir = tempfile.mkdtemp(prefix=kwargs["tmp_outdir_prefix"]) - else: - # tmp_outdir_prefix defaults to tmp, so this is unlikely to be used - self.outdir = tempfile.mkdtemp() + self.outdir = kwargs.get("outdir") + self.outdir = tempfile.mkdtemp(prefix=kwargs.get("tmp_outdir_prefix")) self.name = uniquename(u"workflow %s" % kwargs.get("name", shortname(self.workflow.tool.get("id", "embedded")))) diff --git a/tox.ini b/tox.ini index fb4a9b530..50ae4e80b 100644 --- a/tox.ini +++ b/tox.ini @@ -21,6 +21,7 @@ passenv = CI TRAVIS TRAVIS_* deps = -rrequirements.txt py{27,34,35,36}-unit: codecov + py{27,34,35,36}-unit: pytest-xdist py{27,34,35,36}-lint: flake8 commands =