From c988bb6f4d556a4636801c1692077bfbfed12e25 Mon Sep 17 00:00:00 2001 From: John Chilton Date: Tue, 31 May 2016 09:22:34 -0400 Subject: [PATCH 1/2] Refactor job.py to allow modification of job environment w/shell commands. --- cwltool/job.py | 195 ++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 167 insertions(+), 28 deletions(-) diff --git a/cwltool/job.py b/cwltool/job.py index 1a0b51548..8923e4c74 100644 --- a/cwltool/job.py +++ b/cwltool/job.py @@ -6,6 +6,7 @@ import json import logging import sys +import string import requests from . import docker from .process import get_feature, empty_subtree, stageFiles @@ -24,6 +25,9 @@ needs_shell_quoting_re = re.compile(r"""(^$|[\s|&;()<>\'"$@])""") +FORCE_SHELLED_POPEN = os.getenv("CWLTOOL_FORCE_SHELL_POPEN", "1") == "1" + + def deref_links(outputs): # type: (Any) -> None if isinstance(outputs, dict): if outputs.get("class") == "File": @@ -188,50 +192,36 @@ def linkoutdir(src, tgt): stageFiles(generatemapper, linkoutdir) if self.stdin: - stdin = open(self.pathmapper.reversemap(self.stdin)[1], "rb") + stdin_path = self.pathmapper.reversemap(self.stdin)[1] else: - stdin = subprocess.PIPE + stdin_path = None if self.stderr: abserr = os.path.join(self.outdir, self.stderr) dnerr = os.path.dirname(abserr) if dnerr and not os.path.exists(dnerr): os.makedirs(dnerr) - stderr = open(abserr, "wb") + stderr_path = abserr else: - stderr = sys.stderr + stderr_path = None if self.stdout: absout = os.path.join(self.outdir, self.stdout) dn = os.path.dirname(absout) if dn and not os.path.exists(dn): os.makedirs(dn) - stdout = open(absout, "wb") + stdout_path = absout else: - stdout = sys.stderr - - sp = subprocess.Popen([unicode(x).encode('utf-8') for x in runtime + self.command_line], - shell=False, - close_fds=True, - stdin=stdin, - stderr=stderr, - stdout=stdout, - env=env, - cwd=self.outdir) - - if sp.stdin: - sp.stdin.close() - - rcode = sp.wait() - - if isinstance(stdin, file): - stdin.close() + stdout_path = None - if stderr is not sys.stderr: - stderr.close() - - if stdout is not sys.stderr: - stdout.close() + rcode = shelled_popen( + [unicode(x).encode('utf-8') for x in runtime + self.command_line], + stdin_path=stdin_path, + stdout_path=stdout_path, + stderr_path=stderr_path, + env=env, + cwd=self.outdir, + ) if self.successCodes and rcode in self.successCodes: processStatus = "success" @@ -290,3 +280,152 @@ def linkoutdir(src, tgt): if move_outputs == "move" and empty_subtree(self.outdir): _logger.debug(u"[job %s] Removing empty output directory %s", self.name, self.outdir) shutil.rmtree(self.outdir, True) + + +SHELL_COMMAND_TEMPLATE = string.Template("""#!/bin/bash +$prefix +python "run_job.py" "job.json" +""") +PYTHON_RUN_SCRIPT = """ +import json +import sys +import subprocess + +with open(sys.argv[1], "r") as f: + popen_description = json.load(f) + commands = popen_description["commands"] + cwd = popen_description["cwd"] + env = popen_description["env"] + stdin_path = popen_description["stdin_path"] + stdout_path = popen_description["stdout_path"] + stderr_path = popen_description["stderr_path"] + + if stdin_path is not None: + stdin = open(stdin_path, "rd") + else: + stdin = subprocess.PIPE + + if stdout_path is not None: + stdout = open(stdout_path, "wb") + else: + stdout = sys.stderr + + if stderr_path is not None: + stderr = open(stderr_path, "wb) + else: + stderr = sys.stderr + + sp = subprocess.Popen(commands, + shell=False, + close_fds=True, + stdin=stdin, + stdout=stdout, + env=env, + cwd=cwd) + + if sp.stdin: + sp.stdin.close() + + rcode = sp.wait() + + if isinstance(stdin, file): + stdin.close() + + if stdout is not sys.stderr: + stdout.close() + + if stderr is not sys.stderr: + stderr.close() + + sys.exit(rcode) +""" + + +def shelled_popen(commands, + stdin_path, + stdout_path, + stderr_path, + env, + cwd, + prefix=None): + if prefix is None and not FORCE_SHELLED_POPEN: + if stdin_path is not None: + stdin = open(stdin_path, "rd") + else: + stdin = subprocess.PIPE + + if stdout_path is not None: + stdout = open(stdout_path, "wb") + else: + stdout = sys.stderr + + if stderr_path is not None: + stderr = open(stderr_path, "wb") + else: + stderr = sys.stderr + + sp = subprocess.Popen(commands, + shell=False, + close_fds=True, + stdin=stdin, + stdout=stdout, + stderr=stderr, + env=env, + cwd=cwd) + + if sp.stdin: + sp.stdin.close() + + rcode = sp.wait() + + if isinstance(stdin, file): + stdin.close() + + if stdout is not sys.stderr: + stdout.close() + + if stderr is not sys.stderr: + stderr.close() + + return rcode + else: + template_kwds = dict( + prefix=prefix or '', + ) + job_script_contents = SHELL_COMMAND_TEMPLATE.substitute( + **template_kwds + ) + job_dir = tempfile.mkdtemp(prefix="cwltooljob") + job_description = dict( + commands=commands, + cwd=cwd, + env=env, + stdout_path=stdout_path, + stderr_path=stderr_path, + stdin_path=stdin_path, + ) + with open(os.path.join(job_dir, "job.json"), "w") as f: + json.dump(job_description, f) + try: + job_script = os.path.join(job_dir, "run_job.bash") + with open(job_script, "w") as f: + f.write(job_script_contents) + job_run = os.path.join(job_dir, "run_job.py") + with open(job_run, "w") as f: + f.write(PYTHON_RUN_SCRIPT) + sp = subprocess.Popen( + ["bash", job_script], + shell=False, + cwd=job_dir, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + stdin=subprocess.PIPE, + ) + if sp.stdin: + sp.stdin.close() + + rcode = sp.wait() + + return rcode + finally: + shutil.rmtree(job_dir) From 4ab6082a28118e969ce1af72e1dcbd23d20df3c8 Mon Sep 17 00:00:00 2001 From: John Chilton Date: Mon, 6 Jun 2016 12:30:39 -0400 Subject: [PATCH 2/2] Implement ``Dependency`` hint to describe tool dependencies. --- cwltool/draft2tool.py | 1 + cwltool/job.py | 42 +++++++++++++++++++++++++++++++++++++++--- cwltool/main.py | 38 +++++++++++++++++++++++++++++++++++++- cwltool/process.py | 2 +- 4 files changed, 78 insertions(+), 5 deletions(-) diff --git a/cwltool/draft2tool.py b/cwltool/draft2tool.py index e5a8658ec..973a09817 100644 --- a/cwltool/draft2tool.py +++ b/cwltool/draft2tool.py @@ -220,6 +220,7 @@ def rm_pending_output_callback(output_callback, jobcachepending, reffiles = copy.deepcopy(builder.files) j = self.makeJobRunner() + j.tool_dependency_manager = self.tool_dependency_manager j.builder = builder j.joborder = builder.job j.stdin = None diff --git a/cwltool/job.py b/cwltool/job.py index 8923e4c74..1145ab1dc 100644 --- a/cwltool/job.py +++ b/cwltool/job.py @@ -20,12 +20,16 @@ from typing import Union, Iterable, Callable, Any, Mapping, IO, cast, Tuple from .pathmapper import PathMapper import functools +try: + from galaxy.tools.deps.requirements import ToolRequirement +except ImportError: + ToolRequirement = None _logger = logging.getLogger("cwltool") needs_shell_quoting_re = re.compile(r"""(^$|[\s|&;()<>\'"$@])""") -FORCE_SHELLED_POPEN = os.getenv("CWLTOOL_FORCE_SHELL_POPEN", "1") == "1" +FORCE_SHELLED_POPEN = os.getenv("CWLTOOL_FORCE_SHELL_POPEN", "0") == "1" def deref_links(outputs): # type: (Any) -> None @@ -64,6 +68,7 @@ def __init__(self): # type: () -> None self.environment = None # type: Dict[str,str] self.generatefiles = None # type: Dict[unicode, Union[List[Dict[str, str]], Dict[str,str], str]] self.stagedir = None # type: unicode + self.dependency_manager = None # type: DependencyManager def run(self, dry_run=False, pull_image=True, rm_container=True, rm_tmpdir=True, move_outputs="move", **kwargs): @@ -214,6 +219,17 @@ def linkoutdir(src, tgt): else: stdout_path = None + prefix = None + job_dir = None + if self.tool_dependency_manager is not None: + dependencies = self._find_tool_dependencies() + job_dir = tempfile.mkdtemp(prefix="cwltooljob") + shell_commands = self.tool_dependency_manager.dependency_shell_commands( + dependencies, + job_directory=job_dir, + ) + prefix = "\n".join(shell_commands) + rcode = shelled_popen( [unicode(x).encode('utf-8') for x in runtime + self.command_line], stdin_path=stdin_path, @@ -281,6 +297,23 @@ def linkoutdir(src, tgt): _logger.debug(u"[job %s] Removing empty output directory %s", self.name, self.outdir) shutil.rmtree(self.outdir, True) + def _find_tool_dependencies(self): + dependencies = [] + for hint in self.hints: + hint_class = hint.get("class", "") + if not hint_class: + continue + base_name = hint["class"].rsplit("/", 1)[-1] + if base_name == "Dependency": + requirement_desc = {} + requirement_desc["type"] = "package" + name = hint["name"].rsplit("#", 1)[-1] + version = hint.get("version", "").rsplit("#", 1)[-1] + requirement_desc["name"] = name + requirement_desc["version"] = version or None + dependencies.append(ToolRequirement.from_dict(requirement_desc)) + return dependencies + SHELL_COMMAND_TEMPLATE = string.Template("""#!/bin/bash $prefix @@ -347,6 +380,7 @@ def shelled_popen(commands, stderr_path, env, cwd, + job_dir=None, prefix=None): if prefix is None and not FORCE_SHELLED_POPEN: if stdin_path is not None: @@ -389,17 +423,19 @@ def shelled_popen(commands, return rcode else: + if job_dir is None: + job_dir = tempfile.mkdtemp(prefix="cwltooljob") + template_kwds = dict( prefix=prefix or '', ) job_script_contents = SHELL_COMMAND_TEMPLATE.substitute( **template_kwds ) - job_dir = tempfile.mkdtemp(prefix="cwltooljob") job_description = dict( commands=commands, cwd=cwd, - env=env, + env=env.copy(), stdout_path=stdout_path, stderr_path=stderr_path, stdin_path=stdin_path, diff --git a/cwltool/main.py b/cwltool/main.py index 4cb71f3eb..2873b4604 100755 --- a/cwltool/main.py +++ b/cwltool/main.py @@ -16,6 +16,10 @@ import rdflib from typing import Union, Any, cast, Callable, Dict, Tuple, IO +try: + from galaxy.tools import deps +except ImportError: + deps = None from schema_salad.ref_resolver import Loader import schema_salad.validate as validate @@ -37,6 +41,9 @@ _logger.addHandler(defaultStreamHandler) _logger.setLevel(logging.INFO) +if deps is not None: + deps.log = _logger + def arg_parser(): # type: () -> argparse.ArgumentParser parser = argparse.ArgumentParser(description='Reference executor for Common Workflow Language') @@ -138,6 +145,11 @@ def arg_parser(): # type: () -> argparse.ArgumentParser exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.") exgroup.add_argument("--debug", action="store_true", help="Print even more logging") + # help="Dependency resolver configuration file describing how to adapt 'Dependency' hints to current system." + parser.add_argument("--beta-dependency-resolvers-configuration", default=None, help=argparse.SUPPRESS) + # help="Defaut root directory used by dependency resolvers configuration." + parser.add_argument("--beta-dependencies-directory", default=None, help=argparse.SUPPRESS) + parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool") parser.add_argument("--relative-deps", choices=['primary', 'cwd'], default="primary", @@ -601,6 +613,12 @@ def main(argsl=None, if not hasattr(args, k): setattr(args, k, v) + if deps is not None: + tool_dependencies_configuartion = DependenciesConfigruation(args) + tool_dependency_manager = deps.build_dependency_manager(tool_dependencies_configuartion) + else: + tool_dependency_manager = None + if args.quiet: _logger.setLevel(logging.WARN) if args.debug: @@ -645,8 +663,11 @@ def main(argsl=None, printdot(uri, processobj, document_loader.ctx, stdout) return 0 + make_tool_kwargs = { + 'tool_dependency_manager': tool_dependency_manager, + } tool = make_tool(document_loader, avsc_names, metadata, uri, - makeTool, {}) + makeTool, make_tool_kwargs) except (validate.ValidationException) as exc: _logger.error(u"Tool definition failed validation:\n%s", exc, exc_info=(exc if args.debug else False)) @@ -749,5 +770,20 @@ def locToPath(p): _logger.removeHandler(stderr_handler) _logger.addHandler(defaultStreamHandler) + +class DependenciesConfigruation(object): + + def __init__(self, args): + conf_file = getattr(args, "beta_dependency_resolvers_configuration", None) + tool_dependency_dir = getattr(args, "beta_dependencies_directory", None) + if conf_file is not None and os.path.exists(conf_file): + self.use_tool_dependencies = True + if not tool_dependency_dir: + tool_dependency_dir = os.path.abspath(os.path.dirname(conf_file)) + self.tool_dependency_dir = tool_dependency_dir + self.dependency_resolvers_config_file = conf_file + else: + self.use_tool_dependencies = False + if __name__ == "__main__": sys.exit(main(sys.argv[1:])) diff --git a/cwltool/process.py b/cwltool/process.py index a2b63115c..a0d3cb04c 100644 --- a/cwltool/process.py +++ b/cwltool/process.py @@ -389,7 +389,7 @@ def __init__(self, toolpath_object, **kwargs): avro.schema.make_avsc_object(self.outputs_record_schema, self.names) except avro.schema.SchemaParseException as e: raise validate.ValidationException(u"Got error `%s` while prcoessing outputs of %s:\n%s" % (str(e), self.tool["id"], json.dumps(self.outputs_record_schema, indent=4))) - + self.tool_dependency_manager = kwargs.get("tool_dependency_manager", None) def _init_job(self, joborder, **kwargs): # type: (Dict[unicode, unicode], **Any) -> Builder