diff --git a/cwltool/builder.py b/cwltool/builder.py index 4b2a5a13a..658c4d383 100644 --- a/cwltool/builder.py +++ b/cwltool/builder.py @@ -11,6 +11,7 @@ from .pathmapper import PathMapper, normalizeFilesDirs, get_listing, visit_class from .stdfsaccess import StdFsAccess from .utils import aslist +from .mutation import MutationManager CONTENT_LIMIT = 64 * 1024 @@ -41,6 +42,7 @@ def __init__(self): # type: () -> None self.make_fs_access = None # type: Type[StdFsAccess] self.build_job_script = None # type: Callable[[List[str]], Text] self.debug = False # type: bool + self.mutation_manager = None # type: MutationManager # One of "no_listing", "shallow_listing", "deep_listing" # Will be default "no_listing" for CWL v1.1 diff --git a/cwltool/draft2tool.py b/cwltool/draft2tool.py index 2b793ea95..4729e0ce8 100644 --- a/cwltool/draft2tool.py +++ b/cwltool/draft2tool.py @@ -14,14 +14,14 @@ import shellescape from schema_salad.ref_resolver import file_uri, uri_file_path from schema_salad.sourceline import SourceLine, indent -from typing import Any, Callable, cast, Generator, Text, Union +from typing import Any, Callable, cast, Generator, Text, Union, Dict from .builder import CONTENT_LIMIT, substitute, Builder from .pathmapper import adjustFileObjs, adjustDirObjs, visit_class from .errors import WorkflowException -from .job import CommandLineJob +from .job import JobBase, CommandLineJob, DockerCommandLineJob from .pathmapper import PathMapper, get_listing, trim_listing -from .process import Process, shortname, uniquename, normalizeFilesDirs, compute_checksums +from .process import Process, shortname, uniquename, normalizeFilesDirs, compute_checksums, _logger_validation_warnings from .stdfsaccess import StdFsAccess from .utils import aslist @@ -148,8 +148,9 @@ def run(self, **kwargs): # map files to assigned path inside a container. We need to also explicitly # walk over input as implicit reassignment doesn't reach everything in builder.bindings -def check_adjust(builder, f): - # type: (Builder, Dict[Text, Any]) -> Dict[Text, Any] +def check_adjust(builder, stepname, f): + # type: (Builder, Text, Dict[Text, Any]) -> Dict[Text, Any] + f["path"] = builder.pathmapper.mapper(f["location"])[1] f["dirname"], f["basename"] = os.path.split(f["path"]) if f["class"] == "File": @@ -171,12 +172,15 @@ def __init__(self, toolpath_object, **kwargs): # type: (Dict[Text, Any], **Any) -> None super(CommandLineTool, self).__init__(toolpath_object, **kwargs) - def makeJobRunner(self): # type: () -> CommandLineJob - return CommandLineJob() + def makeJobRunner(self): # type: () -> JobBase + dockerReq, _ = self.get_requirement("DockerRequirement") + if dockerReq: + return DockerCommandLineJob() + else: + return CommandLineJob() def makePathMapper(self, reffiles, stagedir, **kwargs): # type: (List[Any], Text, **Any) -> PathMapper - dockerReq, _ = self.get_requirement("DockerRequirement") return PathMapper(reffiles, kwargs["basedir"], stagedir) def job(self, @@ -184,7 +188,7 @@ def job(self, output_callbacks, # type: Callable[[Any, Any], Any] **kwargs # type: Any ): - # type: (...) -> Generator[Union[CommandLineJob, CallbackJob], None, None] + # type: (...) -> Generator[Union[JobBase, CallbackJob], None, None] jobname = uniquename(kwargs.get("name", shortname(self.tool.get("id", "job")))) @@ -199,9 +203,9 @@ def job(self, cachebuilder.stagedir, separateDirs=False) _check_adjust = partial(check_adjust, cachebuilder) - visit_class([cachebuilder.files, cachebuilder.bindings], ("File", "Directory"), _check_adjust) + cmdline = flatten(map(cachebuilder.generate_arg, cachebuilder.bindings)) (docker_req, docker_is_req) = self.get_requirement("DockerRequirement") if docker_req and kwargs.get("use_container") is not False: @@ -296,7 +300,7 @@ def rm_pending_output_callback(output_callbacks, jobcachepending, _logger.debug(u"[job %s] path mappings is %s", j.name, json.dumps({p: builder.pathmapper.mapper(p) for p in builder.pathmapper.files()}, indent=4)) - _check_adjust = partial(check_adjust, builder) + _check_adjust = partial(check_adjust, builder, jobname) visit_class([builder.files, builder.bindings], ("File", "Directory"), _check_adjust) @@ -368,8 +372,38 @@ def rm_pending_output_callback(output_callbacks, jobcachepending, ls[i] = t["entry"] j.generatefiles[u"listing"] = ls + inplaceUpdateReq = self.get_requirement("http://commonwl.org/cwltool#InplaceUpdateRequirement")[0] + + if inplaceUpdateReq: + j.inplace_update = inplaceUpdateReq["inplaceUpdate"] normalizeFilesDirs(j.generatefiles) + readers = {} + muts = set() + + def register_mut(f): + muts.add(f["location"]) + builder.mutation_manager.register_mutation(j.name, f) + + def register_reader(f): + if f["location"] not in muts: + builder.mutation_manager.register_reader(j.name, f) + readers[f["location"]] = f + + for li in j.generatefiles["listing"]: + li = cast(Dict[Text, Any], li) + if li.get("writable") and j.inplace_update: + adjustFileObjs(li, register_mut) + adjustDirObjs(li, register_mut) + else: + adjustFileObjs(li, register_reader) + adjustDirObjs(li, register_reader) + + adjustFileObjs(builder.files, register_reader) + adjustFileObjs(builder.bindings, register_reader) + adjustDirObjs(builder.files, register_reader) + adjustDirObjs(builder.bindings, register_reader) + j.environment = {} evr = self.get_requirement("EnvVarRequirement")[0] if evr: @@ -391,16 +425,17 @@ def rm_pending_output_callback(output_callbacks, jobcachepending, j.pathmapper = builder.pathmapper j.collect_outputs = partial( self.collect_output_ports, self.tool["outputs"], builder, - compute_checksum=kwargs.get("compute_checksum", True)) + compute_checksum=kwargs.get("compute_checksum", True), + jobname=jobname, + readers=readers) j.output_callback = output_callbacks yield j - def collect_output_ports(self, ports, builder, outdir, compute_checksum=True): - # type: (Set[Dict[Text, Any]], Builder, Text, bool) -> Dict[Text, Union[Text, List[Any], Dict[Text, Any]]] + def collect_output_ports(self, ports, builder, outdir, compute_checksum=True, jobname="", readers=None): + # type: (Set[Dict[Text, Any]], Builder, Text, bool, Text, Dict[Text, Any]) -> Dict[Text, Union[Text, List[Any], Dict[Text, Any]]] ret = {} # type: Dict[Text, Union[Text, List[Any], Dict[Text, Any]]] try: - fs_access = builder.make_fs_access(outdir) custom_output = fs_access.join(outdir, "cwl.output.json") if fs_access.exists(custom_output): @@ -429,14 +464,21 @@ def collect_output_ports(self, ports, builder, outdir, compute_checksum=True): visit_class(ret, ("File", "Directory"), cast(Callable[[Any], Any], revmap)) visit_class(ret, ("File", "Directory"), remove_path) normalizeFilesDirs(ret) + adjustFileObjs(ret, builder.mutation_manager.set_generation) visit_class(ret, ("File", "Directory"), partial(check_valid_locations, fs_access)) + if compute_checksum: adjustFileObjs(ret, partial(compute_checksums, fs_access)) - validate.validate_ex(self.names.get_name("outputs_record_schema", ""), ret) + validate.validate_ex(self.names.get_name("outputs_record_schema", ""), ret, + strict=False, logger=_logger_validation_warnings) return ret if ret is not None else {} except validate.ValidationException as e: raise WorkflowException("Error validating output record. " + Text(e) + "\n in " + json.dumps(ret, indent=4)) + finally: + if readers: + for r in readers.values(): + builder.mutation_manager.release_reader(jobname, r) def collect_output(self, schema, builder, outdir, fs_access, compute_checksum=True): # type: (Dict[Text, Any], Builder, Text, StdFsAccess, bool) -> Union[Dict[Text, Any], List[Union[Dict[Text, Any], Text]]] diff --git a/cwltool/extensions.yml b/cwltool/extensions.yml index e0b0dadf7..2cfd1ea44 100644 --- a/cwltool/extensions.yml +++ b/cwltool/extensions.yml @@ -19,4 +19,18 @@ $graph: type: - type: enum name: LoadListingEnum - symbols: [no_listing, shallow_listing, deep_listing] \ No newline at end of file + symbols: [no_listing, shallow_listing, deep_listing] + +- name: InplaceUpdateRequirement + type: record + inVocab: false + extends: cwl:ProcessRequirement + fields: + class: + type: string + doc: "Always 'InplaceUpdateRequirement'" + jsonldPredicate: + "_id": "@type" + "_type": "@vocab" + inplaceUpdate: + type: boolean diff --git a/cwltool/job.py b/cwltool/job.py index 441e19ee3..8cc279508 100644 --- a/cwltool/job.py +++ b/cwltool/job.py @@ -11,7 +11,7 @@ import shellescape from typing import (Any, Callable, Union, Iterable, MutableMapping, - IO, Text, Tuple) + IO, Text, Tuple, cast, List) from . import docker from .builder import Builder @@ -89,8 +89,20 @@ def deref_links(outputs): # type: (Any) -> None for v in outputs: deref_links(v) - -class CommandLineJob(object): +def relink_initialworkdir(pathmapper, inplace_update=False): + # type: (PathMapper, bool) -> None + for src, vol in pathmapper.items(): + if not vol.staged: + continue + if vol.type in ("File", "Directory") or (inplace_update and + vol.type in ("WritableFile", "WritableDirectory")): + if os.path.islink(vol.target) or os.path.isfile(vol.target): + os.remove(vol.target) + elif os.path.isdir(vol.target): + os.rmdir(vol.target) + os.symlink(vol.resolved, vol.target) + +class JobBase(object): def __init__(self): # type: () -> None self.builder = None # type: Builder self.joborder = None # type: Dict[Text, Union[Dict[Text, Any], List, Text]] @@ -105,6 +117,7 @@ def __init__(self): # type: () -> None self.name = None # type: Text self.command_line = None # type: List[Text] self.pathmapper = None # type: PathMapper + self.generatemapper = None # type: PathMapper self.collect_outputs = None # type: Union[Callable[[Any], Any], functools.partial[Any]] self.output_callback = None # type: Callable[[Any, Any], Any] self.outdir = None # type: Text @@ -112,20 +125,12 @@ def __init__(self): # type: () -> None self.environment = None # type: MutableMapping[Text, Text] self.generatefiles = None # type: Dict[Text, Union[List[Dict[Text, Text]], Dict[Text, Text], Text]] self.stagedir = None # type: Text + self.inplace_update = None # type: bool - def run(self, dry_run=False, pull_image=True, rm_container=True, - rm_tmpdir=True, move_outputs="move", **kwargs): - # type: (bool, bool, bool, bool, Text, **Any) -> Union[Tuple[Text, Dict[None, None]], None] + def _setup(self): # type: () -> None if not os.path.exists(self.outdir): os.makedirs(self.outdir) - # with open(os.path.join(outdir, "cwl.input.json"), "w") as fp: - # json.dump(self.joborder, fp) - - runtime = [] # type: List[Text] - - (docker_req, docker_is_req) = get_feature(self, "DockerRequirement") - for knownfile in self.pathmapper.files(): p = self.pathmapper.mapper(knownfile) if p.type == "File" and not os.path.isfile(p[0]): @@ -133,84 +138,15 @@ def run(self, dry_run=False, pull_image=True, rm_container=True, u"Input file %s (at %s) not found or is not a regular " "file." % (knownfile, self.pathmapper.mapper(knownfile)[0])) - img_id = None - env = None # type: Union[MutableMapping[Text, Text], MutableMapping[str, str]] - try: - if docker_req and kwargs.get("use_container") is not False: - env = os.environ - img_id = docker.get_from_requirements(docker_req, True, pull_image) - elif kwargs.get("default_container", None) is not None: - env = os.environ - img_id = kwargs.get("default_container") + if self.generatefiles["listing"]: + self.generatemapper = PathMapper(cast(List[Any], self.generatefiles["listing"]), + self.outdir, self.outdir, separateDirs=False) + _logger.debug(u"[job %s] initial work dir %s", self.name, + json.dumps({p: self.generatemapper.mapper(p) for p in self.generatemapper.files()}, indent=4)) - if docker_req and img_id is None and kwargs.get("use_container"): - raise Exception("Docker image not available") - except Exception as e: - _logger.debug("Docker error", exc_info=True) - if docker_is_req: - raise WorkflowException("Docker is required to run this tool: %s" % e) - else: - raise WorkflowException("Docker is not available for this tool, try --no-container to disable Docker: %s" % e) - if img_id: - runtime = ["docker", "run", "-i"] - for src, vol in self.pathmapper.items(): - if not vol.staged: - continue - if vol.type in ("File", "Directory"): - runtime.append(u"--volume=%s:%s:ro" % (vol.resolved, vol.target)) - if vol.type == "CreateFile": - createtmp = os.path.join(self.stagedir, os.path.basename(vol.target)) - with open(createtmp, "w") as f: - f.write(vol.resolved.encode("utf-8")) - runtime.append(u"--volume=%s:%s:ro" % (createtmp, vol.target)) - runtime.append(u"--volume=%s:%s:rw" % (os.path.realpath(self.outdir), self.builder.outdir)) - runtime.append(u"--volume=%s:%s:rw" % (os.path.realpath(self.tmpdir), "/tmp")) - runtime.append(u"--workdir=%s" % (self.builder.outdir)) - runtime.append("--read-only=true") - - if kwargs.get("custom_net", None) is not None: - runtime.append("--net={0}".format(kwargs.get("custom_net"))) - elif kwargs.get("disable_net", None): - runtime.append("--net=none") - - if self.stdout: - runtime.append("--log-driver=none") - - euid = docker_vm_uid() or os.geteuid() - - if kwargs.get("no_match_user", None) is False: - runtime.append(u"--user=%s" % (euid)) - - if rm_container: - runtime.append("--rm") - - runtime.append("--env=TMPDIR=/tmp") - - # spec currently says "HOME must be set to the designated output - # directory." but spec might change to designated temp directory. - # runtime.append("--env=HOME=/tmp") - runtime.append("--env=HOME=%s" % self.builder.outdir) - - for t, v in self.environment.items(): - runtime.append(u"--env=%s=%s" % (t, v)) - - runtime.append(img_id) - else: - env = self.environment - if not os.path.exists(self.tmpdir): - os.makedirs(self.tmpdir) - vars_to_preserve = kwargs.get("preserve_environment") - if kwargs.get("preserve_entire_environment"): - vars_to_preserve = os.environ - if vars_to_preserve is not None: - for key, value in os.environ.items(): - if key in vars_to_preserve and key not in env: - env[key] = value - env["HOME"] = self.outdir - env["TMPDIR"] = self.tmpdir - - stageFiles(self.pathmapper, os.symlink, ignoreWritable=True) + def _execute(self, runtime, env, rm_tmpdir=True, move_outputs="move"): + # type: (List[Text], MutableMapping[Text, Text], bool, Text) -> None scr, _ = get_feature(self, "ShellCommandRequirement") @@ -229,28 +165,9 @@ def run(self, dry_run=False, pull_image=True, rm_container=True, u' > %s' % os.path.join(self.outdir, self.stdout) if self.stdout else '', u' 2> %s' % os.path.join(self.outdir, self.stderr) if self.stderr else '') - if dry_run: - return (self.outdir, {}) - outputs = {} # type: Dict[Text,Text] try: - if self.generatefiles["listing"]: - generatemapper = PathMapper([self.generatefiles], self.outdir, - self.outdir, separateDirs=False) - _logger.debug(u"[job %s] initial work dir %s", self.name, - json.dumps({p: generatemapper.mapper(p) for p in generatemapper.files()}, indent=4)) - - def linkoutdir(src, tgt): - # Need to make the link to the staged file (may be inside - # the container) - for _, item in self.pathmapper.items(): - if src == item.resolved: - os.symlink(item.target, tgt) - break - - stageFiles(generatemapper, linkoutdir) - stdin_path = None if self.stdin: stdin_path = self.pathmapper.reversemap(self.stdin)[1] @@ -294,14 +211,7 @@ def linkoutdir(src, tgt): processStatus = "permanentFail" if self.generatefiles["listing"]: - def linkoutdir(src, tgt): - # Need to make the link to the staged file (may be inside - # the container) - if os.path.islink(tgt): - os.remove(tgt) - os.symlink(src, tgt) - - stageFiles(generatemapper, linkoutdir, ignoreWritable=True) + relink_initialworkdir(self.generatemapper, inplace_update=self.inplace_update) outputs = self.collect_outputs(self.outdir) @@ -340,6 +250,140 @@ def linkoutdir(src, tgt): shutil.rmtree(self.tmpdir, True) +class CommandLineJob(JobBase): + + def run(self, pull_image=True, rm_container=True, + rm_tmpdir=True, move_outputs="move", **kwargs): + # type: (bool, bool, bool, Text, **Any) -> Union[Tuple[Text, Dict[None, None]], None] + + self._setup() + + env = self.environment + if not os.path.exists(self.tmpdir): + os.makedirs(self.tmpdir) + vars_to_preserve = kwargs.get("preserve_environment") + if kwargs.get("preserve_entire_environment"): + vars_to_preserve = os.environ + if vars_to_preserve is not None: + for key, value in os.environ.items(): + if key in vars_to_preserve and key not in env: + env[key] = value + env["HOME"] = self.outdir + env["TMPDIR"] = self.tmpdir + + stageFiles(self.pathmapper, os.symlink, ignoreWritable=True) + if self.generatemapper: + stageFiles(self.generatemapper, os.symlink, ignoreWritable=self.inplace_update) + relink_initialworkdir(self.generatemapper, inplace_update=self.inplace_update) + + self._execute([], env, rm_tmpdir=rm_tmpdir, move_outputs=move_outputs) + + +class DockerCommandLineJob(JobBase): + + def add_volumes(self, pathmapper, runtime, stage_output): + # type: (PathMapper, List[Text], bool) -> None + + host_outdir = self.outdir + container_outdir = self.builder.outdir + for src, vol in pathmapper.items(): + if not vol.staged: + continue + if stage_output: + containertgt = container_outdir + vol.target[len(host_outdir):] + else: + containertgt = vol.target + if vol.type in ("File", "Directory"): + if not vol.resolved.startswith("_:"): + runtime.append(u"--volume=%s:%s:ro" % (vol.resolved, containertgt)) + elif vol.type == "WritableFile": + if self.inplace_update: + runtime.append(u"--volume=%s:%s:rw" % (vol.resolved, containertgt)) + else: + shutil.copy(vol.resolved, vol.target) + elif vol.type == "WritableDirectory": + if vol.resolved.startswith("_:"): + os.makedirs(vol.target, 0o0755) + else: + if self.inplace_update: + runtime.append(u"--volume=%s:%s:rw" % (vol.resolved, containertgt)) + else: + shutil.copytree(vol.resolved, vol.target) + elif vol.type == "CreateFile": + createtmp = os.path.join(host_outdir, os.path.basename(vol.target)) + with open(createtmp, "w") as f: + f.write(vol.resolved.encode("utf-8")) + runtime.append(u"--volume=%s:%s:ro" % (createtmp, vol.target)) + + def run(self, pull_image=True, rm_container=True, + rm_tmpdir=True, move_outputs="move", **kwargs): + # type: (bool, bool, bool, Text, **Any) -> Union[Tuple[Text, Dict[None, None]], None] + + (docker_req, docker_is_req) = get_feature(self, "DockerRequirement") + + img_id = None + env = None # type: MutableMapping[Text, Text] + try: + env = cast(MutableMapping[Text, Text], os.environ) + if docker_req and kwargs.get("use_container") is not False: + img_id = docker.get_from_requirements(docker_req, True, pull_image) + elif kwargs.get("default_container", None) is not None: + img_id = kwargs.get("default_container") + + if docker_req and img_id is None and kwargs.get("use_container"): + raise Exception("Docker image not available") + except Exception as e: + _logger.debug("Docker error", exc_info=True) + if docker_is_req: + raise WorkflowException("Docker is required to run this tool: %s" % e) + else: + raise WorkflowException("Docker is not available for this tool, try --no-container to disable Docker: %s" % e) + + self._setup() + + runtime = [u"docker", u"run", u"-i"] + + runtime.append(u"--volume=%s:%s:rw" % (os.path.realpath(self.outdir), self.builder.outdir)) + runtime.append(u"--volume=%s:%s:rw" % (os.path.realpath(self.tmpdir), "/tmp")) + + self.add_volumes(self.pathmapper, runtime, False) + if self.generatemapper: + self.add_volumes(self.generatemapper, runtime, True) + + runtime.append(u"--workdir=%s" % (self.builder.outdir)) + runtime.append(u"--read-only=true") + + if kwargs.get("custom_net", None) is not None: + runtime.append(u"--net={0}".format(kwargs.get("custom_net"))) + elif kwargs.get("disable_net", None): + runtime.append(u"--net=none") + + if self.stdout: + runtime.append("--log-driver=none") + + euid = docker_vm_uid() or os.geteuid() + + if kwargs.get("no_match_user", None) is False: + runtime.append(u"--user=%s" % (euid)) + + if rm_container: + runtime.append(u"--rm") + + runtime.append(u"--env=TMPDIR=/tmp") + + # spec currently says "HOME must be set to the designated output + # directory." but spec might change to designated temp directory. + # runtime.append("--env=HOME=/tmp") + runtime.append(u"--env=HOME=%s" % self.builder.outdir) + + for t, v in self.environment.items(): + runtime.append(u"--env=%s=%s" % (t, v)) + + runtime.append(img_id) + + self._execute(runtime, env, rm_tmpdir=rm_tmpdir, move_outputs=move_outputs) + + def _job_popen( commands, # type: List[str] stdin_path, # type: Text diff --git a/cwltool/main.py b/cwltool/main.py index 9b13172be..e90018a3f 100755 --- a/cwltool/main.py +++ b/cwltool/main.py @@ -29,6 +29,7 @@ scandeps, normalizeFilesDirs, use_custom_schema, use_standard_schema) from .resolver import tool_resolver, ga4gh_tool_registries from .stdfsaccess import StdFsAccess +from .mutation import MutationManager _logger = logging.getLogger("cwltool") @@ -213,10 +214,11 @@ def output_callback(out, processStatus): raise WorkflowException("Must provide 'basedir' in kwargs") output_dirs = set() - finaloutdir = kwargs.get("outdir") + finaloutdir = os.path.abspath(kwargs.get("outdir")) if kwargs.get("outdir") else None kwargs["outdir"] = tempfile.mkdtemp(prefix=kwargs["tmp_outdir_prefix"]) if kwargs.get( "tmp_outdir_prefix") else tempfile.mkdtemp() output_dirs.add(kwargs["outdir"]) + kwargs["mutation_manager"] = MutationManager() jobReqs = None if "cwl:requirements" in job_order_object: @@ -227,6 +229,12 @@ def output_callback(out, processStatus): for req in jobReqs: t.requirements.append(req) + if kwargs.get("default_container"): + t.requirements.insert(0, { + "class": "DockerRequirement", + "dockerPull": kwargs["default_container"] + }) + jobiter = t.job(job_order_object, output_callback, **kwargs) diff --git a/cwltool/mutation.py b/cwltool/mutation.py new file mode 100644 index 000000000..a91e3cd31 --- /dev/null +++ b/cwltool/mutation.py @@ -0,0 +1,69 @@ +from collections import namedtuple + +from typing import Any, Callable, cast, Generator, Iterable, List, Text, Union + +from .errors import WorkflowException + +MutationState = namedtuple("MutationTracker", ["generation", "readers", "stepname"]) + +_generation = "http://commonwl.org/cwltool#generation" + +class MutationManager(object): + """Lock manager for checking correctness of in-place update of files. + + Used to validate that in-place file updates happen sequentially, and that a + file which is registered for in-place update cannot be read or updated by + any other steps. + + """ + + def __init__(self): + # type: () -> None + self.generations = {} # type: Dict[Text, MutationState] + + def register_reader(self, stepname, obj): + # type: (Text, Dict[Text, Any]) -> None + loc = obj["location"] + current = self.generations.get(loc, MutationState(0, [], "")) + obj_generation = obj.get(_generation, 0) + + if obj_generation != current.generation: + raise WorkflowException("[job %s] wants to read %s from generation %i but current generation is %s (last updated by %s)" % ( + stepname, loc, obj_generation, current.generation, current.stepname)) + + current.readers.append(stepname) + self.generations[loc] = current + + def release_reader(self, stepname, obj): + # type: (Text, Dict[Text, Any]) -> None + loc = obj["location"] + current = self.generations.get(loc, MutationState(0, [], "")) + obj_generation = obj.get(_generation, 0) + + if obj_generation != current.generation: + raise WorkflowException("[job %s] wants to release reader on %s from generation %i but current generation is %s (last updated by %s)" % ( + stepname, loc, obj_generation, current.generation, current.stepname)) + + self.generations[loc].readers.remove(stepname) + + def register_mutation(self, stepname, obj): + # type: (Text, Dict[Text, Any]) -> None + loc = obj["location"] + current = self.generations.get(loc, MutationState(0,[], "")) + obj_generation = obj.get(_generation, 0) + + if len(current.readers) > 0: + raise WorkflowException("[job %s] wants to modify %s but has readers: %s" % ( + stepname, loc, current.readers)) + + if obj_generation != current.generation: + raise WorkflowException("[job %s] wants to modify %s from generation %i but current generation is %s (last updated by %s)" % ( + stepname, loc, obj_generation, current.generation, current.stepname)) + + self.generations[loc] = MutationState(current.generation+1, current.readers, stepname) + + def set_generation(self, obj): + # type: (Dict) -> None + loc = obj["location"] + current = self.generations.get(loc, MutationState(0,[], "")) + obj[_generation] = current.generation diff --git a/cwltool/pathmapper.py b/cwltool/pathmapper.py index 9399518a1..3faef33e8 100644 --- a/cwltool/pathmapper.py +++ b/cwltool/pathmapper.py @@ -221,7 +221,7 @@ def setup(self, referenced_files, basedir): for fob in referenced_files: if self.separateDirs: stagedir = os.path.join(self.stagedir, "stg%s" % uuid.uuid4()) - self.visit(fob, stagedir, basedir, staged=True) + self.visit(fob, stagedir, basedir, copy=fob.get("writable"), staged=True) def mapper(self, src): # type: (Text) -> MapperEnt if u"#" in src: diff --git a/cwltool/process.py b/cwltool/process.py index 4188104e2..02c5684c1 100644 --- a/cwltool/process.py +++ b/cwltool/process.py @@ -33,7 +33,19 @@ from .stdfsaccess import StdFsAccess from .utils import aslist, get_feature +class LogAsDebugFilter(logging.Filter): + def __init__(self, name, parent): # type: (str, logging.Logger) -> None + super(LogAsDebugFilter, self).__init__(name) + self.parent = parent + + def filter(self, record): + return self.parent.isEnabledFor(logging.DEBUG) + + _logger = logging.getLogger("cwltool") +_logger_validation_warnings = logging.getLogger("cwltool.validation_warnings") +_logger_validation_warnings.setLevel(_logger.getEffectiveLevel()) +_logger_validation_warnings.addFilter(LogAsDebugFilter("cwltool.validation_warnings", _logger)) supportedProcessRequirements = ["DockerRequirement", "SchemaDefRequirement", @@ -46,7 +58,8 @@ "StepInputExpressionRequirement", "ResourceRequirement", "InitialWorkDirRequirement", - "http://commonwl.org/cwltool#LoadListingRequirement"] + "http://commonwl.org/cwltool#LoadListingRequirement", + "http://commonwl.org/cwltool#InplaceUpdateRequirement"] cwl_files = ( "Workflow.yml", @@ -229,15 +242,22 @@ def relocateOutputs(outputObj, outdir, output_dirs, action, fs_access): def moveIt(src, dst): if action == "move": for a in output_dirs: - if src.startswith(a): + if src.startswith(a+"/"): _logger.debug("Moving %s to %s", src, dst) - shutil.move(src, dst) + if os.path.isdir(src) and os.path.isdir(dst): + # merge directories + for root, dirs, files in os.walk(src): + for f in dirs+files: + moveIt(os.path.join(root, f), os.path.join(dst, f)) + else: + shutil.move(src, dst) return - _logger.debug("Copying %s to %s", src, dst) - if os.path.isdir(src): - shutil.copytree(src, dst) - else: - shutil.copy(src, dst) + if src != dst: + _logger.debug("Copying %s to %s", src, dst) + if os.path.isdir(src): + shutil.copytree(src, dst) + else: + shutil.copy(src, dst) outfiles = [] # type: List[Dict[Text, Any]] collectFilesAndDirs(outputObj, outfiles) @@ -254,6 +274,27 @@ def _check_adjust(f): visit_class(outputObj, ("File",), functools.partial(compute_checksums, fs_access)) + # If there are symlinks to intermediate output directories, we want to move + # the real files into the final output location. If a file is linked more than once, + # make an internal relative symlink. + if action == "move": + relinked = {} # type: Dict[Text, Text] + for root, dirs, files in os.walk(outdir): + for f in dirs+files: + path = os.path.join(root, f) + rp = os.path.realpath(path) + if path != rp: + if rp in relinked: + os.unlink(path) + os.symlink(os.path.relpath(relinked[rp], path), path) + else: + for od in output_dirs: + if rp.startswith(od+"/"): + os.unlink(path) + os.rename(rp, path) + relinked[rp] = path + break + return outputObj @@ -343,7 +384,6 @@ def avroize_type(field_type, name_prefix=""): avroize_type(field_type["items"], name_prefix) return field_type - class Process(object): __metaclass__ = abc.ABCMeta @@ -473,7 +513,8 @@ def _init_job(self, joborder, **kwargs): try: fillInDefaults(self.tool[u"inputs"], builder.job) normalizeFilesDirs(builder.job) - validate.validate_ex(self.names.get_name("input_record_schema", ""), builder.job) + validate.validate_ex(self.names.get_name("input_record_schema", ""), builder.job, + strict=False, logger=_logger_validation_warnings) except (validate.ValidationException, WorkflowException) as e: raise WorkflowException("Invalid job input record:\n" + Text(e)) @@ -486,6 +527,7 @@ def _init_job(self, joborder, **kwargs): builder.resources = {} builder.timeout = kwargs.get("eval_timeout") builder.debug = kwargs.get("debug") + builder.mutation_manager = kwargs.get("mutation_manager") dockerReq, is_req = self.get_requirement("DockerRequirement") diff --git a/tests/test_ext.py b/tests/test_ext.py index ccc7ca2c4..d2d47fcba 100644 --- a/tests/test_ext.py +++ b/tests/test_ext.py @@ -1,4 +1,7 @@ import unittest +import tempfile +import os +import shutil import cwltool.expression as expr import cwltool.factory @@ -33,3 +36,106 @@ def test_listing_v1_0(self): # def test_listing_v1_1(self): # # Default behavior in 1.1 will be no expansion # self.assertEquals(main([get_data('tests/wf/listing_v1_1.cwl'), get_data('tests/listing-job.yml')]), 1) + +class TestInplaceUpdate(unittest.TestCase): + + def test_updateval(self): + try: + tmp = tempfile.mkdtemp() + with open(os.path.join(tmp, "value"), "w") as f: + f.write("1") + out = tempfile.mkdtemp() + self.assertEquals(main(["--outdir", out, get_data('tests/wf/updateval.cwl'), "-r", os.path.join(tmp, "value")]), 0) + + with open(os.path.join(tmp, "value"), "r") as f: + self.assertEquals("1", f.read()) + with open(os.path.join(out, "value"), "r") as f: + self.assertEquals("2", f.read()) + finally: + shutil.rmtree(tmp) + shutil.rmtree(out) + + def test_updateval_inplace(self): + try: + tmp = tempfile.mkdtemp() + with open(os.path.join(tmp, "value"), "w") as f: + f.write("1") + out = tempfile.mkdtemp() + self.assertEquals(main(["--enable-ext", "--leave-outputs", "--outdir", out, get_data('tests/wf/updateval_inplace.cwl'), "-r", os.path.join(tmp, "value")]), 0) + + with open(os.path.join(tmp, "value"), "r") as f: + self.assertEquals("2", f.read()) + self.assertFalse(os.path.exists(os.path.join(out, "value"))) + finally: + shutil.rmtree(tmp) + shutil.rmtree(out) + + def test_write_write_conflict(self): + try: + tmp = tempfile.mkdtemp() + with open(os.path.join(tmp, "value"), "w") as f: + f.write("1") + + self.assertEquals(main(["--enable-ext", get_data('tests/wf/mut.cwl'), "-a", os.path.join(tmp, "value")]), 1) + with open(os.path.join(tmp, "value"), "r") as f: + self.assertEquals("2", f.read()) + finally: + shutil.rmtree(tmp) + + def test_sequencing(self): + try: + tmp = tempfile.mkdtemp() + with open(os.path.join(tmp, "value"), "w") as f: + f.write("1") + + self.assertEquals(main(["--enable-ext", get_data('tests/wf/mut2.cwl'), "-a", os.path.join(tmp, "value")]), 0) + with open(os.path.join(tmp, "value"), "r") as f: + self.assertEquals("3", f.read()) + finally: + shutil.rmtree(tmp) + + # def test_read_write_conflict(self): + # try: + # tmp = tempfile.mkdtemp() + # with open(os.path.join(tmp, "value"), "w") as f: + # f.write("1") + + # self.assertEquals(main(["--enable-ext", get_data('tests/wf/mut3.cwl'), "-a", os.path.join(tmp, "value")]), 0) + # finally: + # shutil.rmtree(tmp) + + def test_updatedir(self): + try: + tmp = tempfile.mkdtemp() + with open(os.path.join(tmp, "value"), "w") as f: + f.write("1") + out = tempfile.mkdtemp() + + self.assertFalse(os.path.exists(os.path.join(tmp, "blurb"))) + self.assertFalse(os.path.exists(os.path.join(out, "blurb"))) + + self.assertEquals(main(["--outdir", out, get_data('tests/wf/updatedir.cwl'), "-r", tmp]), 0) + + self.assertFalse(os.path.exists(os.path.join(tmp, "blurb"))) + self.assertTrue(os.path.exists(os.path.join(out, "inp/blurb"))) + finally: + shutil.rmtree(tmp) + shutil.rmtree(out) + + def test_updateval_inplace(self): + try: + tmp = tempfile.mkdtemp() + with open(os.path.join(tmp, "value"), "w") as f: + f.write("1") + out = tempfile.mkdtemp() + + self.assertFalse(os.path.exists(os.path.join(tmp, "blurb"))) + self.assertFalse(os.path.exists(os.path.join(out, "blurb"))) + + self.assertEquals(main(["--enable-ext", "--leave-outputs", "--outdir", out, get_data('tests/wf/updatedir_inplace.cwl'), "-r", tmp]), 0) + + self.assertTrue(os.path.exists(os.path.join(tmp, "blurb"))) + self.assertFalse(os.path.exists(os.path.join(out, "inp/blurb"))) + finally: + shutil.rmtree(tmp) + shutil.rmtree(out) diff --git a/tests/wf/mut.cwl b/tests/wf/mut.cwl new file mode 100644 index 000000000..6c3a4732e --- /dev/null +++ b/tests/wf/mut.cwl @@ -0,0 +1,16 @@ +cwlVersion: v1.0 +class: Workflow +inputs: + a: File +outputs: [] +steps: + step1: + in: + r: a + out: [] + run: updateval_inplace.cwl + step2: + in: + r: a + out: [] + run: updateval_inplace.cwl diff --git a/tests/wf/mut2.cwl b/tests/wf/mut2.cwl new file mode 100644 index 000000000..6f7fd94d5 --- /dev/null +++ b/tests/wf/mut2.cwl @@ -0,0 +1,19 @@ +cwlVersion: v1.0 +class: Workflow +inputs: + a: File +outputs: + out: + type: File + outputSource: step2/out +steps: + step1: + in: + r: a + out: [out] + run: updateval_inplace.cwl + step2: + in: + r: step1/out + out: [out] + run: updateval_inplace.cwl diff --git a/tests/wf/mut3.cwl b/tests/wf/mut3.cwl new file mode 100644 index 000000000..cf19c72fb --- /dev/null +++ b/tests/wf/mut3.cwl @@ -0,0 +1,21 @@ +cwlVersion: v1.0 +class: Workflow +inputs: + a: File +outputs: [] +steps: + step1: + in: + r: a + out: [] + run: cat.cwl + step2: + in: + r: a + out: [] + run: cat.cwl + step3: + in: + r: a + out: [] + run: updateval_inplace.cwl diff --git a/tests/wf/updatedir.cwl b/tests/wf/updatedir.cwl new file mode 100644 index 000000000..3a3222123 --- /dev/null +++ b/tests/wf/updatedir.cwl @@ -0,0 +1,16 @@ +class: CommandLineTool +cwlVersion: v1.0 +requirements: + InitialWorkDirRequirement: + listing: + - entry: $(inputs.r) + entryname: inp + writable: true +inputs: + r: Directory +outputs: + out: + type: Directory + outputBinding: + glob: inp +arguments: [touch, inp/blurb] \ No newline at end of file diff --git a/tests/wf/updatedir_inplace.cwl b/tests/wf/updatedir_inplace.cwl new file mode 100644 index 000000000..9c98a5dd5 --- /dev/null +++ b/tests/wf/updatedir_inplace.cwl @@ -0,0 +1,20 @@ +class: CommandLineTool +cwlVersion: v1.0 +$namespaces: + cwltool: http://commonwl.org/cwltool# +requirements: + InitialWorkDirRequirement: + listing: + - entry: $(inputs.r) + entryname: inp + writable: true + cwltool:InplaceUpdateRequirement: + inplaceUpdate: true +inputs: + r: Directory +outputs: + out: + type: Directory + outputBinding: + glob: inp +arguments: [touch, inp/blurb] \ No newline at end of file diff --git a/tests/wf/updateval.cwl b/tests/wf/updateval.cwl new file mode 100644 index 000000000..63e28373a --- /dev/null +++ b/tests/wf/updateval.cwl @@ -0,0 +1,20 @@ +class: CommandLineTool +cwlVersion: v1.0 +requirements: + InitialWorkDirRequirement: + listing: + - entry: $(inputs.r) + writable: true +inputs: + r: File + script: + type: File + default: + class: File + location: updateval.py +outputs: + out: + type: File + outputBinding: + glob: $(inputs.r.basename) +arguments: [python, $(inputs.script), $(inputs.r.basename)] \ No newline at end of file diff --git a/tests/wf/updateval.py b/tests/wf/updateval.py new file mode 100644 index 000000000..abd9a4016 --- /dev/null +++ b/tests/wf/updateval.py @@ -0,0 +1,6 @@ +import sys +f = open(sys.argv[1], "r+") +val = int(f.read()) +f.seek(0) +f.write(str(val+1)) +f.close() diff --git a/tests/wf/updateval_inplace.cwl b/tests/wf/updateval_inplace.cwl new file mode 100644 index 000000000..6c032fbd1 --- /dev/null +++ b/tests/wf/updateval_inplace.cwl @@ -0,0 +1,24 @@ +class: CommandLineTool +cwlVersion: v1.0 +$namespaces: + cwltool: "http://commonwl.org/cwltool#" +requirements: + InitialWorkDirRequirement: + listing: + - entry: $(inputs.r) + writable: true + cwltool:InplaceUpdateRequirement: + inplaceUpdate: true +inputs: + r: File + script: + type: File + default: + class: File + location: updateval.py +outputs: + out: + type: File + outputBinding: + glob: $(inputs.r.basename) +arguments: [python, $(inputs.script), $(inputs.r.basename)] \ No newline at end of file