From 2f9136d05d9fba95c305f7928fd950dea4d1be14 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Tue, 28 Feb 2017 18:09:37 -0500 Subject: [PATCH 01/27] Implement mutation manager to validate that files can be safely modified in place. --- cwltool/draft2tool.py | 2 ++ cwltool/main.py | 1 + cwltool/process.py | 1 + cwltool/workflow.py | 15 +++++++++++++++ 4 files changed, 19 insertions(+) diff --git a/cwltool/draft2tool.py b/cwltool/draft2tool.py index 3a8d6be58..61267573d 100644 --- a/cwltool/draft2tool.py +++ b/cwltool/draft2tool.py @@ -364,6 +364,7 @@ def rm_pending_output_callback(output_callbacks, jobcachepending, j.generatefiles[u"listing"] = ls normalizeFilesDirs(j.generatefiles) + adjustFileObjs(j.generatefiles, partial(builder.mutation_manager.register_mutation, j.name)) j.environment = {} evr = self.get_requirement("EnvVarRequirement")[0] @@ -427,6 +428,7 @@ def collect_output_ports(self, ports, builder, outdir, compute_checksum=True): adjustFileObjs(ret, remove_path) adjustDirObjs(ret, remove_path) normalizeFilesDirs(ret) + adjustFileObjs(ret, builder.mutation_manager.set_generation) if compute_checksum: adjustFileObjs(ret, partial(compute_checksums, fs_access)) diff --git a/cwltool/main.py b/cwltool/main.py index 54dc28239..e76156634 100755 --- a/cwltool/main.py +++ b/cwltool/main.py @@ -217,6 +217,7 @@ def output_callback(out, processStatus): kwargs["outdir"] = tempfile.mkdtemp(prefix=kwargs["tmp_outdir_prefix"]) if kwargs.get( "tmp_outdir_prefix") else tempfile.mkdtemp() output_dirs.add(kwargs["outdir"]) + kwargs["mutation_manager"] = workflow.MutationManager() jobReqs = None if "cwl:requirements" in job_order_object: diff --git a/cwltool/process.py b/cwltool/process.py index 15b633272..66c784f61 100644 --- a/cwltool/process.py +++ b/cwltool/process.py @@ -484,6 +484,7 @@ def _init_job(self, joborder, **kwargs): builder.resources = {} builder.timeout = kwargs.get("eval_timeout") builder.debug = kwargs.get("debug") + builder.mutation_manager = kwargs.get("mutation_manager") dockerReq, is_req = self.get_requirement("DockerRequirement") diff --git a/cwltool/workflow.py b/cwltool/workflow.py index 2b44a7ed6..0aa408e7f 100644 --- a/cwltool/workflow.py +++ b/cwltool/workflow.py @@ -760,3 +760,18 @@ def flat_crossproduct_scatter(process, joborder, scatter_keys, output_callback, return parallel_steps(steps, rc, kwargs) else: return steps + +class MutationManager(object): + def __init__(self): + self.generations = {} # type: Dict[Tuple[int, int]] + + def register_mutation(self, stepname, obj): + loc = obj["location"] + if obj.get("_generation", 0) == self.generations.get(loc, 0): + self.generations[loc] = obj.get("_generation", 0)+1 + else: + raise WorkflowException("[job %s] wants to modify %s on generation %s but input is generation %i" % ( + stepname, obj["location"], self.generations[loc], obj.get("_generation", 0))) + + def set_generation(self, obj): + obj["_generation"] = self.generations.get(obj["location"], 0) From ca0611b92d1518a06f657b42badba09759da9b01 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Tue, 28 Feb 2017 18:11:44 -0500 Subject: [PATCH 02/27] Test workflows for mutable inputs. --- tests/wf/cat_mut.cwl | 15 +++++++++++++++ tests/wf/mut.cwl | 16 ++++++++++++++++ tests/wf/mut2.cwl | 16 ++++++++++++++++ 3 files changed, 47 insertions(+) create mode 100644 tests/wf/cat_mut.cwl create mode 100644 tests/wf/mut.cwl create mode 100644 tests/wf/mut2.cwl diff --git a/tests/wf/cat_mut.cwl b/tests/wf/cat_mut.cwl new file mode 100644 index 000000000..cc5c06149 --- /dev/null +++ b/tests/wf/cat_mut.cwl @@ -0,0 +1,15 @@ +class: CommandLineTool +cwlVersion: v1.0 +requirements: + InitialWorkDirRequirement: + listing: + - entry: $(inputs.r) + writable: true +inputs: + r: File +outputs: + out: + type: File + outputBinding: + outputEval: $(inputs.r) +arguments: [cat, $(inputs.r.basename)] \ No newline at end of file diff --git a/tests/wf/mut.cwl b/tests/wf/mut.cwl new file mode 100644 index 000000000..996270387 --- /dev/null +++ b/tests/wf/mut.cwl @@ -0,0 +1,16 @@ +cwlVersion: v1.0 +class: Workflow +inputs: + a: File +outputs: [] +steps: + step1: + in: + r: a + out: [] + run: cat_mut.cwl + step2: + in: + r: a + out: [] + run: cat_mut.cwl diff --git a/tests/wf/mut2.cwl b/tests/wf/mut2.cwl new file mode 100644 index 000000000..01ae54d8a --- /dev/null +++ b/tests/wf/mut2.cwl @@ -0,0 +1,16 @@ +cwlVersion: v1.0 +class: Workflow +inputs: + a: File +outputs: [] +steps: + step1: + in: + r: a + out: [out] + run: cat_mut.cwl + step2: + in: + r: step1/out + out: [] + run: cat_mut.cwl From da1cea7b54b2bafb7b864738c9bd1d1c9f401e91 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Wed, 1 Mar 2017 08:22:04 -0500 Subject: [PATCH 03/27] Add in-place update tests. --- cwltool/draft2tool.py | 5 ++++- tests/wf/mut.cwl | 4 ++-- tests/wf/mut2.cwl | 11 +++++++---- tests/wf/{cat_mut.cwl => updateval.cwl} | 9 +++++++-- tests/wf/updateval.py | 6 ++++++ 5 files changed, 26 insertions(+), 9 deletions(-) rename tests/wf/{cat_mut.cwl => updateval.cwl} (54%) create mode 100644 tests/wf/updateval.py diff --git a/cwltool/draft2tool.py b/cwltool/draft2tool.py index 61267573d..58b0dffe3 100644 --- a/cwltool/draft2tool.py +++ b/cwltool/draft2tool.py @@ -364,7 +364,10 @@ def rm_pending_output_callback(output_callbacks, jobcachepending, j.generatefiles[u"listing"] = ls normalizeFilesDirs(j.generatefiles) - adjustFileObjs(j.generatefiles, partial(builder.mutation_manager.register_mutation, j.name)) + for i in j.generatefiles["listing"]: + if i.get("writable"): + adjustFileObjs(i, partial(builder.mutation_manager.register_mutation, j.name)) + adjustDirObjs(i, partial(builder.mutation_manager.register_mutation, j.name)) j.environment = {} evr = self.get_requirement("EnvVarRequirement")[0] diff --git a/tests/wf/mut.cwl b/tests/wf/mut.cwl index 996270387..2c400f181 100644 --- a/tests/wf/mut.cwl +++ b/tests/wf/mut.cwl @@ -8,9 +8,9 @@ steps: in: r: a out: [] - run: cat_mut.cwl + run: updateval.cwl step2: in: r: a out: [] - run: cat_mut.cwl + run: updateval.cwl diff --git a/tests/wf/mut2.cwl b/tests/wf/mut2.cwl index 01ae54d8a..154ec69fc 100644 --- a/tests/wf/mut2.cwl +++ b/tests/wf/mut2.cwl @@ -2,15 +2,18 @@ cwlVersion: v1.0 class: Workflow inputs: a: File -outputs: [] +outputs: + out: + type: File + outputSource: step2/out steps: step1: in: r: a out: [out] - run: cat_mut.cwl + run: updateval.cwl step2: in: r: step1/out - out: [] - run: cat_mut.cwl + out: [out] + run: updateval.cwl diff --git a/tests/wf/cat_mut.cwl b/tests/wf/updateval.cwl similarity index 54% rename from tests/wf/cat_mut.cwl rename to tests/wf/updateval.cwl index cc5c06149..63e28373a 100644 --- a/tests/wf/cat_mut.cwl +++ b/tests/wf/updateval.cwl @@ -7,9 +7,14 @@ requirements: writable: true inputs: r: File + script: + type: File + default: + class: File + location: updateval.py outputs: out: type: File outputBinding: - outputEval: $(inputs.r) -arguments: [cat, $(inputs.r.basename)] \ No newline at end of file + glob: $(inputs.r.basename) +arguments: [python, $(inputs.script), $(inputs.r.basename)] \ No newline at end of file diff --git a/tests/wf/updateval.py b/tests/wf/updateval.py new file mode 100644 index 000000000..abd9a4016 --- /dev/null +++ b/tests/wf/updateval.py @@ -0,0 +1,6 @@ +import sys +f = open(sys.argv[1], "r+") +val = int(f.read()) +f.seek(0) +f.write(str(val+1)) +f.close() From 33a29c84a3ebc54102f8c22bb6e8cc3e9d9155a3 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Wed, 1 Mar 2017 09:53:53 -0500 Subject: [PATCH 04/27] Fix elif --- cwltool/job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cwltool/job.py b/cwltool/job.py index 84d6720f7..27aa95b89 100644 --- a/cwltool/job.py +++ b/cwltool/job.py @@ -159,7 +159,7 @@ def run(self, dry_run=False, pull_image=True, rm_container=True, continue if vol.type in ("File", "Directory"): runtime.append(u"--volume=%s:%s:ro" % (vol.resolved, vol.target)) - if vol.type == "CreateFile": + elif vol.type == "CreateFile": createtmp = os.path.join(self.stagedir, os.path.basename(vol.target)) with open(createtmp, "w") as f: f.write(vol.resolved.encode("utf-8")) From e454c20dbe4e8a5c65241ffa49b77ad39f2f1ceb Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Thu, 2 Mar 2017 17:19:14 -0500 Subject: [PATCH 05/27] Set up Docker binds to support writable items. --- cwltool/job.py | 80 +++++++++++++++++++++++++++++++++---------------- cwltool/main.py | 6 ++++ 2 files changed, 60 insertions(+), 26 deletions(-) diff --git a/cwltool/job.py b/cwltool/job.py index 27aa95b89..d038d519e 100644 --- a/cwltool/job.py +++ b/cwltool/job.py @@ -89,6 +89,36 @@ def deref_links(outputs): # type: (Any) -> None for v in outputs: deref_links(v) +def add_volumes(pathmapper, runtime, host_outdir=None, container_outdir=None, inplace_update=False): + for src, vol in pathmapper.items(): + if not vol.staged: + continue + if host_outdir: + containertgt = container_outdir + vol.target[len(host_outdir):] + else: + containertgt = vol.target + if vol.type in ("File", "Directory"): + if not vol.resolved.startswith("_:"): + runtime.append(u"--volume=%s:%s:ro" % (vol.resolved, containertgt)) + elif vol.type == "WritableFile": + if inplace_update: + runtime.append(u"--volume=%s:%s:rw" % (vol.resolved, containertgt)) + else: + shutil.copy(vol.resolved, vol.target) + elif vol.type == "WritableDirectory": + if vol.resolved.startswith("_:"): + os.makedirs(vol.target, 0o0755) + else: + if inplace_update: + runtime.append(u"--volume=%s:%s:rw" % (vol.resolved, containertgt)) + else: + shutil.copytree(vol.resolved, vol.target) + elif vol.type == "CreateFile": + createtmp = os.path.join(self.stagedir, os.path.basename(vol.target)) + with open(createtmp, "w") as f: + f.write(vol.resolved.encode("utf-8")) + runtime.append(u"--volume=%s:%s:ro" % (createtmp, vol.target)) + class CommandLineJob(object): def __init__(self): # type: () -> None @@ -152,20 +182,23 @@ def run(self, dry_run=False, pull_image=True, rm_container=True, else: raise WorkflowException("Docker is not available for this tool, try --no-container to disable Docker: %s" % e) + generatemapper = None + if self.generatefiles["listing"]: + generatemapper = PathMapper([self.generatefiles], self.outdir, + self.outdir, separateDirs=False) + _logger.debug(u"[job %s] initial work dir %s", self.name, + json.dumps({p: generatemapper.mapper(p) for p in generatemapper.files()}, indent=4)) + if img_id: runtime = ["docker", "run", "-i"] - for src, vol in self.pathmapper.items(): - if not vol.staged: - continue - if vol.type in ("File", "Directory"): - runtime.append(u"--volume=%s:%s:ro" % (vol.resolved, vol.target)) - elif vol.type == "CreateFile": - createtmp = os.path.join(self.stagedir, os.path.basename(vol.target)) - with open(createtmp, "w") as f: - f.write(vol.resolved.encode("utf-8")) - runtime.append(u"--volume=%s:%s:ro" % (createtmp, vol.target)) + runtime.append(u"--volume=%s:%s:rw" % (os.path.realpath(self.outdir), self.builder.outdir)) runtime.append(u"--volume=%s:%s:rw" % (os.path.realpath(self.tmpdir), "/tmp")) + + add_volumes(self.pathmapper, runtime) + if generatemapper: + add_volumes(generatemapper, runtime, self.outdir, self.builder.outdir) + runtime.append(u"--workdir=%s" % (self.builder.outdir)) runtime.append("--read-only=true") @@ -212,6 +245,17 @@ def run(self, dry_run=False, pull_image=True, rm_container=True, stageFiles(self.pathmapper, os.symlink, ignoreWritable=True) + if generatemapper: + def linkoutdir(src, tgt): + # Need to make the link to the staged file (may be inside + # the container) + for _, item in self.pathmapper.items(): + if src == item.resolved: + os.symlink(item.target, tgt) + break + + stageFiles(generatemapper, linkoutdir) + scr, _ = get_feature(self, "ShellCommandRequirement") shouldquote = None # type: Callable[[Any], Any] @@ -235,22 +279,6 @@ def run(self, dry_run=False, pull_image=True, rm_container=True, outputs = {} # type: Dict[Text,Text] try: - if self.generatefiles["listing"]: - generatemapper = PathMapper([self.generatefiles], self.outdir, - self.outdir, separateDirs=False) - _logger.debug(u"[job %s] initial work dir %s", self.name, - json.dumps({p: generatemapper.mapper(p) for p in generatemapper.files()}, indent=4)) - - def linkoutdir(src, tgt): - # Need to make the link to the staged file (may be inside - # the container) - for _, item in self.pathmapper.items(): - if src == item.resolved: - os.symlink(item.target, tgt) - break - - stageFiles(generatemapper, linkoutdir) - stdin_path = None if self.stdin: stdin_path = self.pathmapper.reversemap(self.stdin)[1] diff --git a/cwltool/main.py b/cwltool/main.py index e76156634..934bfa9b6 100755 --- a/cwltool/main.py +++ b/cwltool/main.py @@ -228,6 +228,12 @@ def output_callback(out, processStatus): for req in jobReqs: t.requirements.append(req) + if kwargs.get("default_container"): + t.requirements.insert(0, { + "class": "DockerRequirement", + "dockerPull": kwargs["default_container"] + }) + jobiter = t.job(job_order_object, output_callback, **kwargs) From a09cf8fac1b9faf3787ab8cb200755629bde82f6 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Fri, 3 Mar 2017 08:27:30 -0500 Subject: [PATCH 06/27] Didn't break anything. --- cwltool/job.py | 27 +++++++++++++++++---------- cwltool/pathmapper.py | 2 +- 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/cwltool/job.py b/cwltool/job.py index d038d519e..56bf9e1d5 100644 --- a/cwltool/job.py +++ b/cwltool/job.py @@ -119,6 +119,14 @@ def add_volumes(pathmapper, runtime, host_outdir=None, container_outdir=None, in f.write(vol.resolved.encode("utf-8")) runtime.append(u"--volume=%s:%s:ro" % (createtmp, vol.target)) +def relink_initialworkdir(pathmapper, inplace_update=False): + for src, vol in pathmapper.items(): + if not vol.staged: + continue + if vol.type in ("File", "Directory") or (inplace_update and + vol.type in ("WritableFile", "WritableDirectory")): + os.remove(vol.target) + os.symlink(vol.resolved, vol.target) class CommandLineJob(object): def __init__(self): # type: () -> None @@ -142,6 +150,7 @@ def __init__(self): # type: () -> None self.environment = None # type: MutableMapping[Text, Text] self.generatefiles = None # type: Dict[Text, Union[List[Dict[Text, Text]], Dict[Text, Text], Text]] self.stagedir = None # type: Text + self.inplace_update = False # type: bool def run(self, dry_run=False, pull_image=True, rm_container=True, rm_tmpdir=True, move_outputs="move", **kwargs): @@ -184,7 +193,8 @@ def run(self, dry_run=False, pull_image=True, rm_container=True, generatemapper = None if self.generatefiles["listing"]: - generatemapper = PathMapper([self.generatefiles], self.outdir, + print self.generatefiles["listing"] + generatemapper = PathMapper(self.generatefiles["listing"], self.outdir, self.outdir, separateDirs=False) _logger.debug(u"[job %s] initial work dir %s", self.name, json.dumps({p: generatemapper.mapper(p) for p in generatemapper.files()}, indent=4)) @@ -197,7 +207,10 @@ def run(self, dry_run=False, pull_image=True, rm_container=True, add_volumes(self.pathmapper, runtime) if generatemapper: - add_volumes(generatemapper, runtime, self.outdir, self.builder.outdir) + add_volumes(generatemapper, runtime, + self.outdir, + self.builder.outdir, + inplace_update=self.inplace_update) runtime.append(u"--workdir=%s" % (self.builder.outdir)) runtime.append("--read-only=true") @@ -249,6 +262,7 @@ def run(self, dry_run=False, pull_image=True, rm_container=True, def linkoutdir(src, tgt): # Need to make the link to the staged file (may be inside # the container) + print "ABC", src, tgt for _, item in self.pathmapper.items(): if src == item.resolved: os.symlink(item.target, tgt) @@ -322,14 +336,7 @@ def linkoutdir(src, tgt): processStatus = "permanentFail" if self.generatefiles["listing"]: - def linkoutdir(src, tgt): - # Need to make the link to the staged file (may be inside - # the container) - if os.path.islink(tgt): - os.remove(tgt) - os.symlink(src, tgt) - - stageFiles(generatemapper, linkoutdir, ignoreWritable=True) + relink_initialworkdir(generatemapper, inplace_update=self.inplace_update) outputs = self.collect_outputs(self.outdir) diff --git a/cwltool/pathmapper.py b/cwltool/pathmapper.py index e5f75f1e2..7f19f1422 100644 --- a/cwltool/pathmapper.py +++ b/cwltool/pathmapper.py @@ -222,7 +222,7 @@ def setup(self, referenced_files, basedir): for fob in referenced_files: if self.separateDirs: stagedir = os.path.join(self.stagedir, "stg%s" % uuid.uuid4()) - self.visit(fob, stagedir, basedir, staged=True) + self.visit(fob, stagedir, basedir, copy=fob.get("writable"), staged=True) def mapper(self, src): # type: (Text) -> MapperEnt if u"#" in src: From 3c6bf89d1bd7b0287db8ed6f31401b0939e3805e Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Fri, 3 Mar 2017 08:48:59 -0500 Subject: [PATCH 07/27] Inplace update works with & without Docker. --- cwltool/job.py | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/cwltool/job.py b/cwltool/job.py index 56bf9e1d5..2a08f08d3 100644 --- a/cwltool/job.py +++ b/cwltool/job.py @@ -125,7 +125,8 @@ def relink_initialworkdir(pathmapper, inplace_update=False): continue if vol.type in ("File", "Directory") or (inplace_update and vol.type in ("WritableFile", "WritableDirectory")): - os.remove(vol.target) + if os.path.exists(vol.target): + os.remove(vol.target) os.symlink(vol.resolved, vol.target) class CommandLineJob(object): @@ -257,18 +258,8 @@ def run(self, dry_run=False, pull_image=True, rm_container=True, env["TMPDIR"] = self.tmpdir stageFiles(self.pathmapper, os.symlink, ignoreWritable=True) - if generatemapper: - def linkoutdir(src, tgt): - # Need to make the link to the staged file (may be inside - # the container) - print "ABC", src, tgt - for _, item in self.pathmapper.items(): - if src == item.resolved: - os.symlink(item.target, tgt) - break - - stageFiles(generatemapper, linkoutdir) + relink_initialworkdir(generatemapper, inplace_update=self.inplace_update) scr, _ = get_feature(self, "ShellCommandRequirement") From 887c6856c8650985768ee4f1ec6af235808c4177 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Thu, 16 Mar 2017 11:18:46 -0400 Subject: [PATCH 08/27] Add InplaceUpdateRequirement. Conflicts: cwltool/process.py --- cwltool/draft2tool.py | 6 +++++- cwltool/extensions.yml | 15 ++++++++++++++- cwltool/job.py | 3 ++- cwltool/process.py | 3 ++- tests/wf/updateval.cwl | 2 ++ 5 files changed, 25 insertions(+), 4 deletions(-) diff --git a/cwltool/draft2tool.py b/cwltool/draft2tool.py index 58b0dffe3..ca859f2a2 100644 --- a/cwltool/draft2tool.py +++ b/cwltool/draft2tool.py @@ -363,9 +363,13 @@ def rm_pending_output_callback(output_callbacks, jobcachepending, ls[i] = t["entry"] j.generatefiles[u"listing"] = ls + inplaceUpdateReq = self.get_requirement("InplaceUpdateRequirement")[0] + + if inplaceUpdateReq: + j.inplace_update = inplaceUpdateReq["inplaceUpdate"] normalizeFilesDirs(j.generatefiles) for i in j.generatefiles["listing"]: - if i.get("writable"): + if i.get("writable") and j.inplace_update: adjustFileObjs(i, partial(builder.mutation_manager.register_mutation, j.name)) adjustDirObjs(i, partial(builder.mutation_manager.register_mutation, j.name)) diff --git a/cwltool/extensions.yml b/cwltool/extensions.yml index 95af9b043..d568f3afe 100644 --- a/cwltool/extensions.yml +++ b/cwltool/extensions.yml @@ -20,4 +20,17 @@ $graph: - "null" - type: enum name: LoadListingEnum - symbols: [shallow, deep] \ No newline at end of file + symbols: [shallow, deep] + +- name: InplaceUpdateRequirement + type: record + extends: cwl:ProcessRequirement + fields: + class: + type: string + doc: "Always 'InplaceUpdateRequirement'" + jsonldPredicate: + "_id": "@type" + "_type": "@vocab" + inplaceUpdate: + type: boolean diff --git a/cwltool/job.py b/cwltool/job.py index 2a08f08d3..ed7fdba15 100644 --- a/cwltool/job.py +++ b/cwltool/job.py @@ -151,7 +151,7 @@ def __init__(self): # type: () -> None self.environment = None # type: MutableMapping[Text, Text] self.generatefiles = None # type: Dict[Text, Union[List[Dict[Text, Text]], Dict[Text, Text], Text]] self.stagedir = None # type: Text - self.inplace_update = False # type: bool + self.inplace_update = None # type: bool def run(self, dry_run=False, pull_image=True, rm_container=True, rm_tmpdir=True, move_outputs="move", **kwargs): @@ -259,6 +259,7 @@ def run(self, dry_run=False, pull_image=True, rm_container=True, stageFiles(self.pathmapper, os.symlink, ignoreWritable=True) if generatemapper: + stageFiles(generatemapper, os.symlink) relink_initialworkdir(generatemapper, inplace_update=self.inplace_update) scr, _ = get_feature(self, "ShellCommandRequirement") diff --git a/cwltool/process.py b/cwltool/process.py index 66c784f61..571fda1a4 100644 --- a/cwltool/process.py +++ b/cwltool/process.py @@ -46,7 +46,8 @@ "StepInputExpressionRequirement", "ResourceRequirement", "InitialWorkDirRequirement", - "http://commonwl.org/cwltool#LoadListingRequirement"] + "http://commonwl.org/cwltool#LoadListingRequirement", + "http://commonwl.org/cwltool#InplaceUpdateRequirement"] cwl_files = ( "Workflow.yml", diff --git a/tests/wf/updateval.cwl b/tests/wf/updateval.cwl index 63e28373a..000d0db05 100644 --- a/tests/wf/updateval.cwl +++ b/tests/wf/updateval.cwl @@ -5,6 +5,8 @@ requirements: listing: - entry: $(inputs.r) writable: true + InplaceUpdateRequirement: + inplaceUpdate: true inputs: r: File script: From 055efd903c00152ade8e62f09d9e403ccf9ef9d6 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Fri, 3 Mar 2017 18:21:33 -0500 Subject: [PATCH 09/27] Refactor job to split Docker and non-Docker execution into separate classes. --- cwltool/draft2tool.py | 9 +- cwltool/job.py | 269 ++++++++++++++++++++++-------------------- 2 files changed, 146 insertions(+), 132 deletions(-) diff --git a/cwltool/draft2tool.py b/cwltool/draft2tool.py index ca859f2a2..48a3211f8 100644 --- a/cwltool/draft2tool.py +++ b/cwltool/draft2tool.py @@ -19,7 +19,7 @@ from .builder import CONTENT_LIMIT, substitute, Builder, adjustFileObjs from .pathmapper import adjustDirObjs from .errors import WorkflowException -from .job import CommandLineJob +from .job import CommandLineJob, DockerCommandLineJob from .pathmapper import PathMapper, get_listing, trim_listing from .process import Process, shortname, uniquename, normalizeFilesDirs, compute_checksums from .stdfsaccess import StdFsAccess @@ -163,11 +163,14 @@ def __init__(self, toolpath_object, **kwargs): super(CommandLineTool, self).__init__(toolpath_object, **kwargs) def makeJobRunner(self): # type: () -> CommandLineJob - return CommandLineJob() + dockerReq, _ = self.get_requirement("DockerRequirement") + if dockerReq: + return DockerCommandLineJob() + else: + return CommandLineJob() def makePathMapper(self, reffiles, stagedir, **kwargs): # type: (List[Any], Text, **Any) -> PathMapper - dockerReq, _ = self.get_requirement("DockerRequirement") return PathMapper(reffiles, kwargs["basedir"], stagedir) def job(self, diff --git a/cwltool/job.py b/cwltool/job.py index ed7fdba15..68c48188b 100644 --- a/cwltool/job.py +++ b/cwltool/job.py @@ -89,36 +89,6 @@ def deref_links(outputs): # type: (Any) -> None for v in outputs: deref_links(v) -def add_volumes(pathmapper, runtime, host_outdir=None, container_outdir=None, inplace_update=False): - for src, vol in pathmapper.items(): - if not vol.staged: - continue - if host_outdir: - containertgt = container_outdir + vol.target[len(host_outdir):] - else: - containertgt = vol.target - if vol.type in ("File", "Directory"): - if not vol.resolved.startswith("_:"): - runtime.append(u"--volume=%s:%s:ro" % (vol.resolved, containertgt)) - elif vol.type == "WritableFile": - if inplace_update: - runtime.append(u"--volume=%s:%s:rw" % (vol.resolved, containertgt)) - else: - shutil.copy(vol.resolved, vol.target) - elif vol.type == "WritableDirectory": - if vol.resolved.startswith("_:"): - os.makedirs(vol.target, 0o0755) - else: - if inplace_update: - runtime.append(u"--volume=%s:%s:rw" % (vol.resolved, containertgt)) - else: - shutil.copytree(vol.resolved, vol.target) - elif vol.type == "CreateFile": - createtmp = os.path.join(self.stagedir, os.path.basename(vol.target)) - with open(createtmp, "w") as f: - f.write(vol.resolved.encode("utf-8")) - runtime.append(u"--volume=%s:%s:ro" % (createtmp, vol.target)) - def relink_initialworkdir(pathmapper, inplace_update=False): for src, vol in pathmapper.items(): if not vol.staged: @@ -129,7 +99,7 @@ def relink_initialworkdir(pathmapper, inplace_update=False): os.remove(vol.target) os.symlink(vol.resolved, vol.target) -class CommandLineJob(object): +class JobBase(): def __init__(self): # type: () -> None self.builder = None # type: Builder self.joborder = None # type: Dict[Text, Union[Dict[Text, Any], List, Text]] @@ -144,6 +114,7 @@ def __init__(self): # type: () -> None self.name = None # type: Text self.command_line = None # type: List[Text] self.pathmapper = None # type: PathMapper + self.generatemapper = None # type: PathMapper self.collect_outputs = None # type: Union[Callable[[Any], Any], functools.partial[Any]] self.output_callback = None # type: Callable[[Any, Any], Any] self.outdir = None # type: Text @@ -153,19 +124,10 @@ def __init__(self): # type: () -> None self.stagedir = None # type: Text self.inplace_update = None # type: bool - def run(self, dry_run=False, pull_image=True, rm_container=True, - rm_tmpdir=True, move_outputs="move", **kwargs): - # type: (bool, bool, bool, bool, Text, **Any) -> Union[Tuple[Text, Dict[None, None]], None] + def _setup(self): if not os.path.exists(self.outdir): os.makedirs(self.outdir) - # with open(os.path.join(outdir, "cwl.input.json"), "w") as fp: - # json.dump(self.joborder, fp) - - runtime = [] # type: List[Text] - - (docker_req, docker_is_req) = get_feature(self, "DockerRequirement") - for knownfile in self.pathmapper.files(): p = self.pathmapper.mapper(knownfile) if p.type == "File" and not os.path.isfile(p[0]): @@ -173,95 +135,14 @@ def run(self, dry_run=False, pull_image=True, rm_container=True, u"Input file %s (at %s) not found or is not a regular " "file." % (knownfile, self.pathmapper.mapper(knownfile)[0])) - img_id = None - env = None # type: Union[MutableMapping[Text, Text], MutableMapping[str, str]] - try: - if docker_req and kwargs.get("use_container") is not False: - env = os.environ - img_id = docker.get_from_requirements(docker_req, True, pull_image) - elif kwargs.get("default_container", None) is not None: - env = os.environ - img_id = kwargs.get("default_container") - - if docker_req and img_id is None and kwargs.get("use_container"): - raise Exception("Docker image not available") - except Exception as e: - _logger.debug("Docker error", exc_info=True) - if docker_is_req: - raise WorkflowException("Docker is required to run this tool: %s" % e) - else: - raise WorkflowException("Docker is not available for this tool, try --no-container to disable Docker: %s" % e) - - generatemapper = None if self.generatefiles["listing"]: - print self.generatefiles["listing"] - generatemapper = PathMapper(self.generatefiles["listing"], self.outdir, + self.generatemapper = PathMapper(self.generatefiles["listing"], self.outdir, self.outdir, separateDirs=False) _logger.debug(u"[job %s] initial work dir %s", self.name, - json.dumps({p: generatemapper.mapper(p) for p in generatemapper.files()}, indent=4)) - - if img_id: - runtime = ["docker", "run", "-i"] - - runtime.append(u"--volume=%s:%s:rw" % (os.path.realpath(self.outdir), self.builder.outdir)) - runtime.append(u"--volume=%s:%s:rw" % (os.path.realpath(self.tmpdir), "/tmp")) - - add_volumes(self.pathmapper, runtime) - if generatemapper: - add_volumes(generatemapper, runtime, - self.outdir, - self.builder.outdir, - inplace_update=self.inplace_update) + json.dumps({p: self.generatemapper.mapper(p) for p in self.generatemapper.files()}, indent=4)) - runtime.append(u"--workdir=%s" % (self.builder.outdir)) - runtime.append("--read-only=true") - - if kwargs.get("custom_net", None) is not None: - runtime.append("--net={0}".format(kwargs.get("custom_net"))) - elif kwargs.get("disable_net", None): - runtime.append("--net=none") - - if self.stdout: - runtime.append("--log-driver=none") - - euid = docker_vm_uid() or os.geteuid() - - if kwargs.get("no_match_user", None) is False: - runtime.append(u"--user=%s" % (euid)) - - if rm_container: - runtime.append("--rm") - - runtime.append("--env=TMPDIR=/tmp") - - # spec currently says "HOME must be set to the designated output - # directory." but spec might change to designated temp directory. - # runtime.append("--env=HOME=/tmp") - runtime.append("--env=HOME=%s" % self.builder.outdir) - - for t, v in self.environment.items(): - runtime.append(u"--env=%s=%s" % (t, v)) - - runtime.append(img_id) - else: - env = self.environment - if not os.path.exists(self.tmpdir): - os.makedirs(self.tmpdir) - vars_to_preserve = kwargs.get("preserve_environment") - if kwargs.get("preserve_entire_environment"): - vars_to_preserve = os.environ - if vars_to_preserve is not None: - for key, value in os.environ.items(): - if key in vars_to_preserve and key not in env: - env[key] = value - env["HOME"] = self.outdir - env["TMPDIR"] = self.tmpdir - - stageFiles(self.pathmapper, os.symlink, ignoreWritable=True) - if generatemapper: - stageFiles(generatemapper, os.symlink) - relink_initialworkdir(generatemapper, inplace_update=self.inplace_update) + def _execute(self, runtime, env, rm_tmpdir=True, move_outputs="move"): scr, _ = get_feature(self, "ShellCommandRequirement") shouldquote = None # type: Callable[[Any], Any] @@ -279,9 +160,6 @@ def run(self, dry_run=False, pull_image=True, rm_container=True, u' > %s' % os.path.join(self.outdir, self.stdout) if self.stdout else '', u' 2> %s' % os.path.join(self.outdir, self.stderr) if self.stderr else '') - if dry_run: - return (self.outdir, {}) - outputs = {} # type: Dict[Text,Text] try: @@ -328,7 +206,7 @@ def run(self, dry_run=False, pull_image=True, rm_container=True, processStatus = "permanentFail" if self.generatefiles["listing"]: - relink_initialworkdir(generatemapper, inplace_update=self.inplace_update) + relink_initialworkdir(self.generatemapper, inplace_update=self.inplace_update) outputs = self.collect_outputs(self.outdir) @@ -371,6 +249,139 @@ def run(self, dry_run=False, pull_image=True, rm_container=True, shutil.rmtree(self.outdir, True) +class CommandLineJob(JobBase): + + def run(self, pull_image=True, rm_container=True, + rm_tmpdir=True, move_outputs="move", **kwargs): + # type: (bool, bool, bool, bool, Text, **Any) -> Union[Tuple[Text, Dict[None, None]], None] + + self._setup() + + env = self.environment + if not os.path.exists(self.tmpdir): + os.makedirs(self.tmpdir) + vars_to_preserve = kwargs.get("preserve_environment") + if kwargs.get("preserve_entire_environment"): + vars_to_preserve = os.environ + if vars_to_preserve is not None: + for key, value in os.environ.items(): + if key in vars_to_preserve and key not in env: + env[key] = value + env["HOME"] = self.outdir + env["TMPDIR"] = self.tmpdir + + stageFiles(self.pathmapper, os.symlink, ignoreWritable=True) + if self.generatemapper: + stageFiles(self.generatemapper, os.symlink) + relink_initialworkdir(self.generatemapper, inplace_update=self.inplace_update) + + self._execute([], env, rm_tmpdir=rm_tmpdir, move_outputs=move_outputs) + + +class DockerCommandLineJob(JobBase): + + def add_volumes(self, pathmapper, runtime, stage_output): + host_outdir = self.outdir + container_outdir = self.builder.outdir + for src, vol in pathmapper.items(): + if not vol.staged: + continue + if stage_output: + containertgt = container_outdir + vol.target[len(host_outdir):] + else: + containertgt = vol.target + if vol.type in ("File", "Directory"): + if not vol.resolved.startswith("_:"): + runtime.append(u"--volume=%s:%s:ro" % (vol.resolved, containertgt)) + elif vol.type == "WritableFile": + if self.inplace_update: + runtime.append(u"--volume=%s:%s:rw" % (vol.resolved, containertgt)) + else: + shutil.copy(vol.resolved, vol.target) + elif vol.type == "WritableDirectory": + if vol.resolved.startswith("_:"): + os.makedirs(vol.target, 0o0755) + else: + if self.inplace_update: + runtime.append(u"--volume=%s:%s:rw" % (vol.resolved, containertgt)) + else: + shutil.copytree(vol.resolved, vol.target) + elif vol.type == "CreateFile": + createtmp = os.path.join(host_outdir, os.path.basename(vol.target)) + with open(createtmp, "w") as f: + f.write(vol.resolved.encode("utf-8")) + runtime.append(u"--volume=%s:%s:ro" % (createtmp, vol.target)) + + def run(self, pull_image=True, rm_container=True, + rm_tmpdir=True, move_outputs="move", **kwargs): + # type: (bool, bool, bool, bool, Text, **Any) -> Union[Tuple[Text, Dict[None, None]], None] + + (docker_req, docker_is_req) = get_feature(self, "DockerRequirement") + + img_id = None + env = None # type: Union[MutableMapping[Text, Text], MutableMapping[str, str]] + try: + if docker_req and kwargs.get("use_container") is not False: + env = os.environ + img_id = docker.get_from_requirements(docker_req, True, pull_image) + elif kwargs.get("default_container", None) is not None: + env = os.environ + img_id = kwargs.get("default_container") + + if docker_req and img_id is None and kwargs.get("use_container"): + raise Exception("Docker image not available") + except Exception as e: + _logger.debug("Docker error", exc_info=True) + if docker_is_req: + raise WorkflowException("Docker is required to run this tool: %s" % e) + else: + raise WorkflowException("Docker is not available for this tool, try --no-container to disable Docker: %s" % e) + + self._setup() + + runtime = ["docker", "run", "-i"] + + runtime.append(u"--volume=%s:%s:rw" % (os.path.realpath(self.outdir), self.builder.outdir)) + runtime.append(u"--volume=%s:%s:rw" % (os.path.realpath(self.tmpdir), "/tmp")) + + self.add_volumes(self.pathmapper, runtime, False) + if self.generatemapper: + self.add_volumes(self.generatemapper, runtime, True) + + runtime.append(u"--workdir=%s" % (self.builder.outdir)) + runtime.append("--read-only=true") + + if kwargs.get("custom_net", None) is not None: + runtime.append("--net={0}".format(kwargs.get("custom_net"))) + elif kwargs.get("disable_net", None): + runtime.append("--net=none") + + if self.stdout: + runtime.append("--log-driver=none") + + euid = docker_vm_uid() or os.geteuid() + + if kwargs.get("no_match_user", None) is False: + runtime.append(u"--user=%s" % (euid)) + + if rm_container: + runtime.append("--rm") + + runtime.append("--env=TMPDIR=/tmp") + + # spec currently says "HOME must be set to the designated output + # directory." but spec might change to designated temp directory. + # runtime.append("--env=HOME=/tmp") + runtime.append("--env=HOME=%s" % self.builder.outdir) + + for t, v in self.environment.items(): + runtime.append(u"--env=%s=%s" % (t, v)) + + runtime.append(img_id) + + self._execute(runtime, env, rm_tmpdir=rm_tmpdir, move_outputs=move_outputs) + + def _job_popen( commands, # type: List[str] stdin_path, # type: Text From d0168e9fba70e740ff96f794d9c1082a6157c11a Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Fri, 3 Mar 2017 22:47:44 -0500 Subject: [PATCH 10/27] Manage readers for inplace updates. --- cwltool/draft2tool.py | 39 +++++++++++++++++++++++++------- cwltool/workflow.py | 52 +++++++++++++++++++++++++++++++++++++------ 2 files changed, 76 insertions(+), 15 deletions(-) diff --git a/cwltool/draft2tool.py b/cwltool/draft2tool.py index 48a3211f8..8bf778d3b 100644 --- a/cwltool/draft2tool.py +++ b/cwltool/draft2tool.py @@ -146,8 +146,9 @@ def run(self, **kwargs): # map files to assigned path inside a container. We need to also explicitly # walk over input as implicit reassignment doesn't reach everything in builder.bindings -def check_adjust(builder, f): +def check_adjust(builder, stepname, f): # type: (Builder, Dict[Text, Any]) -> Dict[Text, Any] + f["path"] = builder.pathmapper.mapper(f["location"])[1] f["dirname"], f["basename"] = os.path.split(f["path"]) if f["class"] == "File": @@ -192,7 +193,7 @@ def job(self, kwargs["basedir"], cachebuilder.stagedir, separateDirs=False) - _check_adjust = partial(check_adjust, cachebuilder) + _check_adjust = partial(check_adjust, cachebuilder, jobname) adjustFileObjs(cachebuilder.files, _check_adjust) adjustFileObjs(cachebuilder.bindings, _check_adjust) adjustDirObjs(cachebuilder.files, _check_adjust) @@ -291,7 +292,7 @@ def rm_pending_output_callback(output_callbacks, jobcachepending, _logger.debug(u"[job %s] path mappings is %s", j.name, json.dumps({p: builder.pathmapper.mapper(p) for p in builder.pathmapper.files()}, indent=4)) - _check_adjust = partial(check_adjust, builder) + _check_adjust = partial(check_adjust, builder, jobname) adjustFileObjs(builder.files, _check_adjust) adjustFileObjs(builder.bindings, _check_adjust) @@ -371,10 +372,30 @@ def rm_pending_output_callback(output_callbacks, jobcachepending, if inplaceUpdateReq: j.inplace_update = inplaceUpdateReq["inplaceUpdate"] normalizeFilesDirs(j.generatefiles) + + readers = {} + muts = set() + def register_mut(f): + muts.add(f["location"]) + builder.mutation_manager.register_mutation(j.name, f) + + def register_reader(f): + if f["location"] not in muts: + builder.mutation_manager.register_reader(j.name, f) + readers[f["location"]] = f + for i in j.generatefiles["listing"]: if i.get("writable") and j.inplace_update: - adjustFileObjs(i, partial(builder.mutation_manager.register_mutation, j.name)) - adjustDirObjs(i, partial(builder.mutation_manager.register_mutation, j.name)) + adjustFileObjs(i, register_mut) + adjustDirObjs(i, register_mut) + else: + adjustFileObjs(i, register_reader) + adjustDirObjs(i, register_reader) + + adjustFileObjs(builder.files, register_reader) + adjustFileObjs(builder.bindings, register_reader) + adjustDirObjs(builder.files, register_reader) + adjustDirObjs(builder.bindings, register_reader) j.environment = {} evr = self.get_requirement("EnvVarRequirement")[0] @@ -397,16 +418,15 @@ def rm_pending_output_callback(output_callbacks, jobcachepending, j.pathmapper = builder.pathmapper j.collect_outputs = partial( self.collect_output_ports, self.tool["outputs"], builder, - compute_checksum=kwargs.get("compute_checksum", True)) + compute_checksum=kwargs.get("compute_checksum", True), readers=readers) j.output_callback = output_callbacks yield j - def collect_output_ports(self, ports, builder, outdir, compute_checksum=True): + def collect_output_ports(self, ports, builder, outdir, compute_checksum=True, readers=None): # type: (Set[Dict[Text, Any]], Builder, Text, bool) -> Dict[Text, Union[Text, List[Any], Dict[Text, Any]]] ret = {} # type: Dict[Text, Union[Text, List[Any], Dict[Text, Any]]] try: - fs_access = builder.make_fs_access(outdir) custom_output = fs_access.join(outdir, "cwl.output.json") if fs_access.exists(custom_output): @@ -446,6 +466,9 @@ def collect_output_ports(self, ports, builder, outdir, compute_checksum=True): return ret if ret is not None else {} except validate.ValidationException as e: raise WorkflowException("Error validating output record, " + Text(e) + "\n in " + json.dumps(ret, indent=4)) + finally: + for r in readers.values(): + builder.mutation_manager.release_reader(r) def collect_output(self, schema, builder, outdir, fs_access, compute_checksum=True): # type: (Dict[Text, Any], Builder, Text, StdFsAccess, bool) -> Union[Dict[Text, Any], List[Union[Dict[Text, Any], Text]]] diff --git a/cwltool/workflow.py b/cwltool/workflow.py index 0aa408e7f..7884cd966 100644 --- a/cwltool/workflow.py +++ b/cwltool/workflow.py @@ -761,17 +761,55 @@ def flat_crossproduct_scatter(process, joborder, scatter_keys, output_callback, else: return steps +MutationState = namedtuple("MutationTracker", ["generation", "readers"]) + class MutationManager(object): def __init__(self): - self.generations = {} # type: Dict[Tuple[int, int]] + self.generations = {} # type: Dict[Text, MutationTracking] + + def register_reader(self, stepname, obj): + # type: (Text, Dict[Text, Any]) -> None + loc = obj["location"] + current = self.generations.get(loc, MutationState(0,0)) + obj_generation = obj.get("_generation", 0) + + if obj_generation != current.generation: + raise WorkflowException("[job %s] wants to read %s from generation %i but current generation is %s" % ( + stepname, loc, obj_generation, current.generation)) + + self.generations[loc] = MutationState(current.generation, current.readers+1) + + def release_reader(self, obj): + # type: (Text, Dict[Text, Any]) -> None + loc = obj["location"] + current = self.generations.get(loc, MutationState(0,0)) + obj_generation = obj.get("_generation", 0) + + if obj_generation != current.generation: + raise WorkflowException("wants to release reader on %s from generation %i but current generation is %s" % ( + loc, obj_generation, current.generation)) + + self.generations[loc] = MutationState(current.generation, current.readers-1) def register_mutation(self, stepname, obj): + # type: (Text, Dict[Text, Any]) -> None loc = obj["location"] - if obj.get("_generation", 0) == self.generations.get(loc, 0): - self.generations[loc] = obj.get("_generation", 0)+1 - else: - raise WorkflowException("[job %s] wants to modify %s on generation %s but input is generation %i" % ( - stepname, obj["location"], self.generations[loc], obj.get("_generation", 0))) + current = self.generations.get(loc, MutationState(0,0)) + obj_generation = obj.get("_generation", 0) + + if current.readers > 0: + raise WorkflowException("[job %s] wants to modify %s but has %i reader%s" % ( + stepname, loc, current.readers, + "s" if current.readers > 1 else "")) + + if obj_generation != current.generation: + raise WorkflowException("[job %s] wants to modify %s from generation %i but current generation is %s" % ( + stepname, loc, obj_generation, current.generation)) + + self.generations[loc] = MutationState(current.generation+1, current.readers) def set_generation(self, obj): - obj["_generation"] = self.generations.get(obj["location"], 0) + # type: (Dict) -> None + loc = obj["location"] + current = self.generations.get(loc, MutationState(0,0)) + obj["_generation"] = current.generation From 8c3e6d0f230a4ecff6d01c92c3d5325a61685fb0 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Fri, 3 Mar 2017 23:04:19 -0500 Subject: [PATCH 11/27] Record readers, last update step. --- cwltool/draft2tool.py | 8 +++++--- cwltool/workflow.py | 38 +++++++++++++++++++------------------- 2 files changed, 24 insertions(+), 22 deletions(-) diff --git a/cwltool/draft2tool.py b/cwltool/draft2tool.py index 8bf778d3b..debd5c739 100644 --- a/cwltool/draft2tool.py +++ b/cwltool/draft2tool.py @@ -418,12 +418,14 @@ def register_reader(f): j.pathmapper = builder.pathmapper j.collect_outputs = partial( self.collect_output_ports, self.tool["outputs"], builder, - compute_checksum=kwargs.get("compute_checksum", True), readers=readers) + compute_checksum=kwargs.get("compute_checksum", True), + jobname=jobname, + readers=readers) j.output_callback = output_callbacks yield j - def collect_output_ports(self, ports, builder, outdir, compute_checksum=True, readers=None): + def collect_output_ports(self, ports, builder, outdir, compute_checksum=True, jobname="", readers=None): # type: (Set[Dict[Text, Any]], Builder, Text, bool) -> Dict[Text, Union[Text, List[Any], Dict[Text, Any]]] ret = {} # type: Dict[Text, Union[Text, List[Any], Dict[Text, Any]]] try: @@ -468,7 +470,7 @@ def collect_output_ports(self, ports, builder, outdir, compute_checksum=True, re raise WorkflowException("Error validating output record, " + Text(e) + "\n in " + json.dumps(ret, indent=4)) finally: for r in readers.values(): - builder.mutation_manager.release_reader(r) + builder.mutation_manager.release_reader(jobname, r) def collect_output(self, schema, builder, outdir, fs_access, compute_checksum=True): # type: (Dict[Text, Any], Builder, Text, StdFsAccess, bool) -> Union[Dict[Text, Any], List[Union[Dict[Text, Any], Text]]] diff --git a/cwltool/workflow.py b/cwltool/workflow.py index 7884cd966..4a88de1fb 100644 --- a/cwltool/workflow.py +++ b/cwltool/workflow.py @@ -761,7 +761,7 @@ def flat_crossproduct_scatter(process, joborder, scatter_keys, output_callback, else: return steps -MutationState = namedtuple("MutationTracker", ["generation", "readers"]) +MutationState = namedtuple("MutationTracker", ["generation", "readers", "stepname"]) class MutationManager(object): def __init__(self): @@ -770,46 +770,46 @@ def __init__(self): def register_reader(self, stepname, obj): # type: (Text, Dict[Text, Any]) -> None loc = obj["location"] - current = self.generations.get(loc, MutationState(0,0)) + current = self.generations.get(loc, MutationState(0, [], "")) obj_generation = obj.get("_generation", 0) if obj_generation != current.generation: - raise WorkflowException("[job %s] wants to read %s from generation %i but current generation is %s" % ( - stepname, loc, obj_generation, current.generation)) + raise WorkflowException("[job %s] wants to read %s from generation %i but current generation is %s (last updated by %s)" % ( + stepname, loc, obj_generation, current.generation, current.stepname)) - self.generations[loc] = MutationState(current.generation, current.readers+1) + current.readers.append(stepname) + self.generations[loc] = current - def release_reader(self, obj): + def release_reader(self, stepname, obj): # type: (Text, Dict[Text, Any]) -> None loc = obj["location"] - current = self.generations.get(loc, MutationState(0,0)) + current = self.generations.get(loc, MutationState(0, [], "")) obj_generation = obj.get("_generation", 0) if obj_generation != current.generation: - raise WorkflowException("wants to release reader on %s from generation %i but current generation is %s" % ( - loc, obj_generation, current.generation)) + raise WorkflowException("[job %s] wants to release reader on %s from generation %i but current generation is %s (last updated by %s)" % ( + stepname, loc, obj_generation, current.generation, current.stepname)) - self.generations[loc] = MutationState(current.generation, current.readers-1) + self.generations[loc].readers.remove(stepname) def register_mutation(self, stepname, obj): # type: (Text, Dict[Text, Any]) -> None loc = obj["location"] - current = self.generations.get(loc, MutationState(0,0)) + current = self.generations.get(loc, MutationState(0,[], "")) obj_generation = obj.get("_generation", 0) - if current.readers > 0: - raise WorkflowException("[job %s] wants to modify %s but has %i reader%s" % ( - stepname, loc, current.readers, - "s" if current.readers > 1 else "")) + if len(current.readers) > 0: + raise WorkflowException("[job %s] wants to modify %s but has readers: %s" % ( + stepname, loc, current.readers)) if obj_generation != current.generation: - raise WorkflowException("[job %s] wants to modify %s from generation %i but current generation is %s" % ( - stepname, loc, obj_generation, current.generation)) + raise WorkflowException("[job %s] wants to modify %s from generation %i but current generation is %s (last updated by %s)" % ( + stepname, loc, obj_generation, current.generation, current.stepname)) - self.generations[loc] = MutationState(current.generation+1, current.readers) + self.generations[loc] = MutationState(current.generation+1, current.readers, stepname) def set_generation(self, obj): # type: (Dict) -> None loc = obj["location"] - current = self.generations.get(loc, MutationState(0,0)) + current = self.generations.get(loc, MutationState(0,[], "")) obj["_generation"] = current.generation From 5cf82f9fccb52ac04dcebb9cf25e4c51143900ba Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Mon, 6 Mar 2017 09:35:31 -0500 Subject: [PATCH 12/27] Add testing. --- tests/test_ext.py | 66 ++++++++++++++++++++++++++++++++++ tests/wf/mut3.cwl | 21 +++++++++++ tests/wf/updateval.cwl | 2 -- tests/wf/updateval_inplace.cwl | 22 ++++++++++++ 4 files changed, 109 insertions(+), 2 deletions(-) create mode 100644 tests/wf/mut3.cwl create mode 100644 tests/wf/updateval_inplace.cwl diff --git a/tests/test_ext.py b/tests/test_ext.py index ccc7ca2c4..2f0b936bc 100644 --- a/tests/test_ext.py +++ b/tests/test_ext.py @@ -1,4 +1,7 @@ import unittest +import tempfile +import os +import shutil import cwltool.expression as expr import cwltool.factory @@ -33,3 +36,66 @@ def test_listing_v1_0(self): # def test_listing_v1_1(self): # # Default behavior in 1.1 will be no expansion # self.assertEquals(main([get_data('tests/wf/listing_v1_1.cwl'), get_data('tests/listing-job.yml')]), 1) + +class TestInplaceUpdate(unittest.TestCase): + + def test_updateval(self): + try: + tmp = tempfile.mkdtemp() + with open(os.path.join(tmp, "value"), "w") as f: + f.write("1") + out = tempfile.mkdtemp() + self.assertEquals(main(["--outdir", out, get_data('tests/wf/updateval.cwl'), "-r", os.path.join(tmp, "value")]), 0) + + with open(os.path.join(tmp, "value"), "r") as f: + self.assertEquals("1", f.read()) + with open(os.path.join(out, "value"), "r") as f: + self.assertEquals("2", f.read()) + finally: + shutil.rmtree(tmp) + shutil.rmtree(out) + + def test_updateval_inplace(self): + try: + tmp = tempfile.mkdtemp() + with open(os.path.join(tmp, "value"), "w") as f: + f.write("1") + out = tempfile.mkdtemp() + self.assertEquals(main(["--enable-ext", "--leave-outputs", "--outdir", out, get_data('tests/wf/updateval_inplace.cwl'), "-r", os.path.join(tmp, "value")]), 0) + + with open(os.path.join(tmp, "value"), "r") as f: + self.assertEquals("2", f.read()) + self.assertFalse(os.path.exists(os.path.join(out, "value"))) + finally: + shutil.rmtree(tmp) + shutil.rmtree(out) + + def test_write_write_conflict(self): + try: + tmp = tempfile.mkdtemp() + with open(os.path.join(tmp, "value"), "w") as f: + f.write("1") + + self.assertEquals(main(["--enable-ext", get_data('tests/wf/mut.cwl'), "-a", os.path.join(tmp, "value")]), 1) + finally: + shutil.rmtree(tmp) + + def test_sequencing(self): + try: + tmp = tempfile.mkdtemp() + with open(os.path.join(tmp, "value"), "w") as f: + f.write("1") + + self.assertEquals(main(["--enable-ext", get_data('tests/wf/mut.cwl'), "-a", os.path.join(tmp, "value")]), 0) + finally: + shutil.rmtree(tmp) + + # def test_read_write_conflict(self): + # try: + # tmp = tempfile.mkdtemp() + # with open(os.path.join(tmp, "value"), "w") as f: + # f.write("1") + + # self.assertEquals(main(["--enable-ext", get_data('tests/wf/mut3.cwl'), "-a", os.path.join(tmp, "value")]), 0) + # finally: + # shutil.rmtree(tmp) diff --git a/tests/wf/mut3.cwl b/tests/wf/mut3.cwl new file mode 100644 index 000000000..3b5b245a6 --- /dev/null +++ b/tests/wf/mut3.cwl @@ -0,0 +1,21 @@ +cwlVersion: v1.0 +class: Workflow +inputs: + a: File +outputs: [] +steps: + step1: + in: + r: a + out: [] + run: cat.cwl + step2: + in: + r: a + out: [] + run: cat.cwl + step3: + in: + r: a + out: [] + run: updateval.cwl diff --git a/tests/wf/updateval.cwl b/tests/wf/updateval.cwl index 000d0db05..63e28373a 100644 --- a/tests/wf/updateval.cwl +++ b/tests/wf/updateval.cwl @@ -5,8 +5,6 @@ requirements: listing: - entry: $(inputs.r) writable: true - InplaceUpdateRequirement: - inplaceUpdate: true inputs: r: File script: diff --git a/tests/wf/updateval_inplace.cwl b/tests/wf/updateval_inplace.cwl new file mode 100644 index 000000000..000d0db05 --- /dev/null +++ b/tests/wf/updateval_inplace.cwl @@ -0,0 +1,22 @@ +class: CommandLineTool +cwlVersion: v1.0 +requirements: + InitialWorkDirRequirement: + listing: + - entry: $(inputs.r) + writable: true + InplaceUpdateRequirement: + inplaceUpdate: true +inputs: + r: File + script: + type: File + default: + class: File + location: updateval.py +outputs: + out: + type: File + outputBinding: + glob: $(inputs.r.basename) +arguments: [python, $(inputs.script), $(inputs.r.basename)] \ No newline at end of file From a8d2a9725e1a22aeef5662a6c6024a5606f79073 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Mon, 6 Mar 2017 21:46:43 -0500 Subject: [PATCH 13/27] Test & mypy fixes WIP. --- cwltool/builder.py | 1 + cwltool/draft2tool.py | 5 +++-- cwltool/job.py | 2 +- cwltool/workflow.py | 1 + tests/test_ext.py | 6 +++++- tests/wf/mut.cwl | 4 ++-- tests/wf/mut2.cwl | 4 ++-- tests/wf/mut3.cwl | 2 +- 8 files changed, 16 insertions(+), 9 deletions(-) diff --git a/cwltool/builder.py b/cwltool/builder.py index 88d5d42ed..5221a2aac 100644 --- a/cwltool/builder.py +++ b/cwltool/builder.py @@ -41,6 +41,7 @@ def __init__(self): # type: () -> None self.make_fs_access = None # type: Type[StdFsAccess] self.build_job_script = None # type: Callable[[List[str]], Text] self.debug = False # type: bool + self.mutation_manager = None # type: MutationManager # One of None, "shallow", "deep" # Will be default None for CWL v1.1 diff --git a/cwltool/draft2tool.py b/cwltool/draft2tool.py index debd5c739..1192ee5a5 100644 --- a/cwltool/draft2tool.py +++ b/cwltool/draft2tool.py @@ -147,7 +147,7 @@ def run(self, **kwargs): # map files to assigned path inside a container. We need to also explicitly # walk over input as implicit reassignment doesn't reach everything in builder.bindings def check_adjust(builder, stepname, f): - # type: (Builder, Dict[Text, Any]) -> Dict[Text, Any] + # type: (Builder, Text, Dict[Text, Any]) -> Dict[Text, Any] f["path"] = builder.pathmapper.mapper(f["location"])[1] f["dirname"], f["basename"] = os.path.split(f["path"]) @@ -375,6 +375,7 @@ def rm_pending_output_callback(output_callbacks, jobcachepending, readers = {} muts = set() + def register_mut(f): muts.add(f["location"]) builder.mutation_manager.register_mutation(j.name, f) @@ -426,7 +427,7 @@ def register_reader(f): yield j def collect_output_ports(self, ports, builder, outdir, compute_checksum=True, jobname="", readers=None): - # type: (Set[Dict[Text, Any]], Builder, Text, bool) -> Dict[Text, Union[Text, List[Any], Dict[Text, Any]]] + # type: (Set[Dict[Text, Any]], Builder, Text, bool, Text, Dict[Text, Any]) -> Dict[Text, Union[Text, List[Any], Dict[Text, Any]]] ret = {} # type: Dict[Text, Union[Text, List[Any], Dict[Text, Any]]] try: fs_access = builder.make_fs_access(outdir) diff --git a/cwltool/job.py b/cwltool/job.py index 68c48188b..b06d5ae84 100644 --- a/cwltool/job.py +++ b/cwltool/job.py @@ -253,7 +253,7 @@ class CommandLineJob(JobBase): def run(self, pull_image=True, rm_container=True, rm_tmpdir=True, move_outputs="move", **kwargs): - # type: (bool, bool, bool, bool, Text, **Any) -> Union[Tuple[Text, Dict[None, None]], None] + # type: (bool, bool, bool, Text, **Any) -> Union[Tuple[Text, Dict[None, None]], None] self._setup() diff --git a/cwltool/workflow.py b/cwltool/workflow.py index 4a88de1fb..f5a742df2 100644 --- a/cwltool/workflow.py +++ b/cwltool/workflow.py @@ -761,6 +761,7 @@ def flat_crossproduct_scatter(process, joborder, scatter_keys, output_callback, else: return steps + MutationState = namedtuple("MutationTracker", ["generation", "readers", "stepname"]) class MutationManager(object): diff --git a/tests/test_ext.py b/tests/test_ext.py index 2f0b936bc..dfd54b8cf 100644 --- a/tests/test_ext.py +++ b/tests/test_ext.py @@ -77,6 +77,8 @@ def test_write_write_conflict(self): f.write("1") self.assertEquals(main(["--enable-ext", get_data('tests/wf/mut.cwl'), "-a", os.path.join(tmp, "value")]), 1) + with open(os.path.join(tmp, "value"), "r") as f: + self.assertEquals("2", f.read()) finally: shutil.rmtree(tmp) @@ -86,7 +88,9 @@ def test_sequencing(self): with open(os.path.join(tmp, "value"), "w") as f: f.write("1") - self.assertEquals(main(["--enable-ext", get_data('tests/wf/mut.cwl'), "-a", os.path.join(tmp, "value")]), 0) + self.assertEquals(main(["--enable-ext", get_data('tests/wf/mut2.cwl'), "-a", os.path.join(tmp, "value")]), 0) + with open(os.path.join(tmp, "value"), "r") as f: + self.assertEquals("3", f.read()) finally: shutil.rmtree(tmp) diff --git a/tests/wf/mut.cwl b/tests/wf/mut.cwl index 2c400f181..6c3a4732e 100644 --- a/tests/wf/mut.cwl +++ b/tests/wf/mut.cwl @@ -8,9 +8,9 @@ steps: in: r: a out: [] - run: updateval.cwl + run: updateval_inplace.cwl step2: in: r: a out: [] - run: updateval.cwl + run: updateval_inplace.cwl diff --git a/tests/wf/mut2.cwl b/tests/wf/mut2.cwl index 154ec69fc..6f7fd94d5 100644 --- a/tests/wf/mut2.cwl +++ b/tests/wf/mut2.cwl @@ -11,9 +11,9 @@ steps: in: r: a out: [out] - run: updateval.cwl + run: updateval_inplace.cwl step2: in: r: step1/out out: [out] - run: updateval.cwl + run: updateval_inplace.cwl diff --git a/tests/wf/mut3.cwl b/tests/wf/mut3.cwl index 3b5b245a6..cf19c72fb 100644 --- a/tests/wf/mut3.cwl +++ b/tests/wf/mut3.cwl @@ -18,4 +18,4 @@ steps: in: r: a out: [] - run: updateval.cwl + run: updateval_inplace.cwl From 35966c99a05bf818c0b0e19093419e160dff32e0 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Tue, 7 Mar 2017 12:59:05 -0500 Subject: [PATCH 14/27] Make mypy happying. --- cwltool/builder.py | 1 + cwltool/draft2tool.py | 19 ++++++------ cwltool/job.py | 36 ++++++++++++----------- cwltool/main.py | 3 +- cwltool/mutation.py | 67 +++++++++++++++++++++++++++++++++++++++++++ cwltool/workflow.py | 54 ---------------------------------- 6 files changed, 100 insertions(+), 80 deletions(-) create mode 100644 cwltool/mutation.py diff --git a/cwltool/builder.py b/cwltool/builder.py index 5221a2aac..aab7e7bc3 100644 --- a/cwltool/builder.py +++ b/cwltool/builder.py @@ -11,6 +11,7 @@ from .pathmapper import PathMapper, adjustFileObjs, normalizeFilesDirs, get_listing from .stdfsaccess import StdFsAccess from .utils import aslist +from .mutation import MutationManager CONTENT_LIMIT = 64 * 1024 diff --git a/cwltool/draft2tool.py b/cwltool/draft2tool.py index 1192ee5a5..abe65e079 100644 --- a/cwltool/draft2tool.py +++ b/cwltool/draft2tool.py @@ -19,7 +19,7 @@ from .builder import CONTENT_LIMIT, substitute, Builder, adjustFileObjs from .pathmapper import adjustDirObjs from .errors import WorkflowException -from .job import CommandLineJob, DockerCommandLineJob +from .job import JobBase, CommandLineJob, DockerCommandLineJob from .pathmapper import PathMapper, get_listing, trim_listing from .process import Process, shortname, uniquename, normalizeFilesDirs, compute_checksums from .stdfsaccess import StdFsAccess @@ -163,7 +163,7 @@ def __init__(self, toolpath_object, **kwargs): # type: (Dict[Text, Any], **Any) -> None super(CommandLineTool, self).__init__(toolpath_object, **kwargs) - def makeJobRunner(self): # type: () -> CommandLineJob + def makeJobRunner(self): # type: () -> JobBase dockerReq, _ = self.get_requirement("DockerRequirement") if dockerReq: return DockerCommandLineJob() @@ -179,7 +179,7 @@ def job(self, output_callbacks, # type: Callable[[Any, Any], Any] **kwargs # type: Any ): - # type: (...) -> Generator[Union[CommandLineJob, CallbackJob], None, None] + # type: (...) -> Generator[Union[JobBase, CallbackJob], None, None] jobname = uniquename(kwargs.get("name", shortname(self.tool.get("id", "job")))) @@ -385,13 +385,14 @@ def register_reader(f): builder.mutation_manager.register_reader(j.name, f) readers[f["location"]] = f - for i in j.generatefiles["listing"]: - if i.get("writable") and j.inplace_update: - adjustFileObjs(i, register_mut) - adjustDirObjs(i, register_mut) + for li in j.generatefiles["listing"]: + li = cast(Dict[Text, Any], li) + if li.get("writable") and j.inplace_update: + adjustFileObjs(li, register_mut) + adjustDirObjs(li, register_mut) else: - adjustFileObjs(i, register_reader) - adjustDirObjs(i, register_reader) + adjustFileObjs(li, register_reader) + adjustDirObjs(li, register_reader) adjustFileObjs(builder.files, register_reader) adjustFileObjs(builder.bindings, register_reader) diff --git a/cwltool/job.py b/cwltool/job.py index b06d5ae84..a9e2f8640 100644 --- a/cwltool/job.py +++ b/cwltool/job.py @@ -11,7 +11,7 @@ import shellescape from typing import (Any, Callable, Union, Iterable, MutableMapping, - IO, Text, Tuple) + IO, Text, Tuple, cast) from . import docker from .builder import Builder @@ -90,6 +90,7 @@ def deref_links(outputs): # type: (Any) -> None deref_links(v) def relink_initialworkdir(pathmapper, inplace_update=False): + # type: (PathMapper, bool) -> None for src, vol in pathmapper.items(): if not vol.staged: continue @@ -99,7 +100,7 @@ def relink_initialworkdir(pathmapper, inplace_update=False): os.remove(vol.target) os.symlink(vol.resolved, vol.target) -class JobBase(): +class JobBase(object): def __init__(self): # type: () -> None self.builder = None # type: Builder self.joborder = None # type: Dict[Text, Union[Dict[Text, Any], List, Text]] @@ -124,7 +125,7 @@ def __init__(self): # type: () -> None self.stagedir = None # type: Text self.inplace_update = None # type: bool - def _setup(self): + def _setup(self): # type: () -> None if not os.path.exists(self.outdir): os.makedirs(self.outdir) @@ -136,13 +137,15 @@ def _setup(self): "file." % (knownfile, self.pathmapper.mapper(knownfile)[0])) if self.generatefiles["listing"]: - self.generatemapper = PathMapper(self.generatefiles["listing"], self.outdir, - self.outdir, separateDirs=False) + self.generatemapper = PathMapper(cast(List[Any], self.generatefiles["listing"]), + self.outdir, self.outdir, separateDirs=False) _logger.debug(u"[job %s] initial work dir %s", self.name, json.dumps({p: self.generatemapper.mapper(p) for p in self.generatemapper.files()}, indent=4)) def _execute(self, runtime, env, rm_tmpdir=True, move_outputs="move"): + # type: (List[Text], MutableMapping[Text, Text], bool, Text) -> None + scr, _ = get_feature(self, "ShellCommandRequirement") shouldquote = None # type: Callable[[Any], Any] @@ -281,6 +284,8 @@ def run(self, pull_image=True, rm_container=True, class DockerCommandLineJob(JobBase): def add_volumes(self, pathmapper, runtime, stage_output): + # type: (PathMapper, List[Text], bool) -> None + host_outdir = self.outdir container_outdir = self.builder.outdir for src, vol in pathmapper.items(): @@ -314,18 +319,17 @@ def add_volumes(self, pathmapper, runtime, stage_output): def run(self, pull_image=True, rm_container=True, rm_tmpdir=True, move_outputs="move", **kwargs): - # type: (bool, bool, bool, bool, Text, **Any) -> Union[Tuple[Text, Dict[None, None]], None] + # type: (bool, bool, bool, Text, **Any) -> Union[Tuple[Text, Dict[None, None]], None] (docker_req, docker_is_req) = get_feature(self, "DockerRequirement") img_id = None - env = None # type: Union[MutableMapping[Text, Text], MutableMapping[str, str]] + env = None # type: MutableMapping[Text, Text] try: + env = cast(MutableMapping[Text, Text], os.environ) if docker_req and kwargs.get("use_container") is not False: - env = os.environ img_id = docker.get_from_requirements(docker_req, True, pull_image) elif kwargs.get("default_container", None) is not None: - env = os.environ img_id = kwargs.get("default_container") if docker_req and img_id is None and kwargs.get("use_container"): @@ -339,7 +343,7 @@ def run(self, pull_image=True, rm_container=True, self._setup() - runtime = ["docker", "run", "-i"] + runtime = [u"docker", u"run", u"-i"] runtime.append(u"--volume=%s:%s:rw" % (os.path.realpath(self.outdir), self.builder.outdir)) runtime.append(u"--volume=%s:%s:rw" % (os.path.realpath(self.tmpdir), "/tmp")) @@ -349,12 +353,12 @@ def run(self, pull_image=True, rm_container=True, self.add_volumes(self.generatemapper, runtime, True) runtime.append(u"--workdir=%s" % (self.builder.outdir)) - runtime.append("--read-only=true") + runtime.append(u"--read-only=true") if kwargs.get("custom_net", None) is not None: - runtime.append("--net={0}".format(kwargs.get("custom_net"))) + runtime.append(u"--net={0}".format(kwargs.get("custom_net"))) elif kwargs.get("disable_net", None): - runtime.append("--net=none") + runtime.append(u"--net=none") if self.stdout: runtime.append("--log-driver=none") @@ -365,14 +369,14 @@ def run(self, pull_image=True, rm_container=True, runtime.append(u"--user=%s" % (euid)) if rm_container: - runtime.append("--rm") + runtime.append(u"--rm") - runtime.append("--env=TMPDIR=/tmp") + runtime.append(u"--env=TMPDIR=/tmp") # spec currently says "HOME must be set to the designated output # directory." but spec might change to designated temp directory. # runtime.append("--env=HOME=/tmp") - runtime.append("--env=HOME=%s" % self.builder.outdir) + runtime.append(u"--env=HOME=%s" % self.builder.outdir) for t, v in self.environment.items(): runtime.append(u"--env=%s=%s" % (t, v)) diff --git a/cwltool/main.py b/cwltool/main.py index 934bfa9b6..197b141bc 100755 --- a/cwltool/main.py +++ b/cwltool/main.py @@ -29,6 +29,7 @@ scandeps, normalizeFilesDirs, use_custom_schema, use_standard_schema) from .resolver import tool_resolver, ga4gh_tool_registries from .stdfsaccess import StdFsAccess +from .mutation import MutationManager _logger = logging.getLogger("cwltool") @@ -217,7 +218,7 @@ def output_callback(out, processStatus): kwargs["outdir"] = tempfile.mkdtemp(prefix=kwargs["tmp_outdir_prefix"]) if kwargs.get( "tmp_outdir_prefix") else tempfile.mkdtemp() output_dirs.add(kwargs["outdir"]) - kwargs["mutation_manager"] = workflow.MutationManager() + kwargs["mutation_manager"] = MutationManager() jobReqs = None if "cwl:requirements" in job_order_object: diff --git a/cwltool/mutation.py b/cwltool/mutation.py new file mode 100644 index 000000000..98a2d1f28 --- /dev/null +++ b/cwltool/mutation.py @@ -0,0 +1,67 @@ +from collections import namedtuple + +from typing import Any, Callable, cast, Generator, Iterable, List, Text, Union + +from .errors import WorkflowException + +MutationState = namedtuple("MutationTracker", ["generation", "readers", "stepname"]) + +class MutationManager(object): + """Lock manager for checking correctness of in-place update of files. + + Used to validate that in-place file updates happen sequentially, and that a + file which is registered for in-place update cannot be read or updated by + any other steps. + + """ + + def __init__(self): + # type: () -> None + self.generations = {} # type: Dict[Text, MutationState] + + def register_reader(self, stepname, obj): + # type: (Text, Dict[Text, Any]) -> None + loc = obj["location"] + current = self.generations.get(loc, MutationState(0, [], "")) + obj_generation = obj.get("_generation", 0) + + if obj_generation != current.generation: + raise WorkflowException("[job %s] wants to read %s from generation %i but current generation is %s (last updated by %s)" % ( + stepname, loc, obj_generation, current.generation, current.stepname)) + + current.readers.append(stepname) + self.generations[loc] = current + + def release_reader(self, stepname, obj): + # type: (Text, Dict[Text, Any]) -> None + loc = obj["location"] + current = self.generations.get(loc, MutationState(0, [], "")) + obj_generation = obj.get("_generation", 0) + + if obj_generation != current.generation: + raise WorkflowException("[job %s] wants to release reader on %s from generation %i but current generation is %s (last updated by %s)" % ( + stepname, loc, obj_generation, current.generation, current.stepname)) + + self.generations[loc].readers.remove(stepname) + + def register_mutation(self, stepname, obj): + # type: (Text, Dict[Text, Any]) -> None + loc = obj["location"] + current = self.generations.get(loc, MutationState(0,[], "")) + obj_generation = obj.get("_generation", 0) + + if len(current.readers) > 0: + raise WorkflowException("[job %s] wants to modify %s but has readers: %s" % ( + stepname, loc, current.readers)) + + if obj_generation != current.generation: + raise WorkflowException("[job %s] wants to modify %s from generation %i but current generation is %s (last updated by %s)" % ( + stepname, loc, obj_generation, current.generation, current.stepname)) + + self.generations[loc] = MutationState(current.generation+1, current.readers, stepname) + + def set_generation(self, obj): + # type: (Dict) -> None + loc = obj["location"] + current = self.generations.get(loc, MutationState(0,[], "")) + obj["_generation"] = current.generation diff --git a/cwltool/workflow.py b/cwltool/workflow.py index f5a742df2..2b44a7ed6 100644 --- a/cwltool/workflow.py +++ b/cwltool/workflow.py @@ -760,57 +760,3 @@ def flat_crossproduct_scatter(process, joborder, scatter_keys, output_callback, return parallel_steps(steps, rc, kwargs) else: return steps - - -MutationState = namedtuple("MutationTracker", ["generation", "readers", "stepname"]) - -class MutationManager(object): - def __init__(self): - self.generations = {} # type: Dict[Text, MutationTracking] - - def register_reader(self, stepname, obj): - # type: (Text, Dict[Text, Any]) -> None - loc = obj["location"] - current = self.generations.get(loc, MutationState(0, [], "")) - obj_generation = obj.get("_generation", 0) - - if obj_generation != current.generation: - raise WorkflowException("[job %s] wants to read %s from generation %i but current generation is %s (last updated by %s)" % ( - stepname, loc, obj_generation, current.generation, current.stepname)) - - current.readers.append(stepname) - self.generations[loc] = current - - def release_reader(self, stepname, obj): - # type: (Text, Dict[Text, Any]) -> None - loc = obj["location"] - current = self.generations.get(loc, MutationState(0, [], "")) - obj_generation = obj.get("_generation", 0) - - if obj_generation != current.generation: - raise WorkflowException("[job %s] wants to release reader on %s from generation %i but current generation is %s (last updated by %s)" % ( - stepname, loc, obj_generation, current.generation, current.stepname)) - - self.generations[loc].readers.remove(stepname) - - def register_mutation(self, stepname, obj): - # type: (Text, Dict[Text, Any]) -> None - loc = obj["location"] - current = self.generations.get(loc, MutationState(0,[], "")) - obj_generation = obj.get("_generation", 0) - - if len(current.readers) > 0: - raise WorkflowException("[job %s] wants to modify %s but has readers: %s" % ( - stepname, loc, current.readers)) - - if obj_generation != current.generation: - raise WorkflowException("[job %s] wants to modify %s from generation %i but current generation is %s (last updated by %s)" % ( - stepname, loc, obj_generation, current.generation, current.stepname)) - - self.generations[loc] = MutationState(current.generation+1, current.readers, stepname) - - def set_generation(self, obj): - # type: (Dict) -> None - loc = obj["location"] - current = self.generations.get(loc, MutationState(0,[], "")) - obj["_generation"] = current.generation From 0dfe86afc3fe7f8400c487e7aa06b473551e2b45 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Tue, 7 Mar 2017 15:42:48 -0500 Subject: [PATCH 15/27] Fix imports for casts. --- cwltool/draft2tool.py | 2 +- cwltool/job.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cwltool/draft2tool.py b/cwltool/draft2tool.py index abe65e079..511583195 100644 --- a/cwltool/draft2tool.py +++ b/cwltool/draft2tool.py @@ -14,7 +14,7 @@ import shellescape from schema_salad.ref_resolver import file_uri, uri_file_path from schema_salad.sourceline import SourceLine, indent -from typing import Any, Callable, cast, Generator, Text, Union +from typing import Any, Callable, cast, Generator, Text, Union, Dict from .builder import CONTENT_LIMIT, substitute, Builder, adjustFileObjs from .pathmapper import adjustDirObjs diff --git a/cwltool/job.py b/cwltool/job.py index a9e2f8640..ba36e35ef 100644 --- a/cwltool/job.py +++ b/cwltool/job.py @@ -11,7 +11,7 @@ import shellescape from typing import (Any, Callable, Union, Iterable, MutableMapping, - IO, Text, Tuple, cast) + IO, Text, Tuple, cast, List) from . import docker from .builder import Builder From 0043494c8c3714a47baa1c7760109a11fc3ae230 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Fri, 10 Mar 2017 18:30:05 -0500 Subject: [PATCH 16/27] Fix WritableDirectory. Add tests. --- cwltool/job.py | 6 ++++-- tests/test_ext.py | 36 ++++++++++++++++++++++++++++++++++ tests/wf/updatedir.cwl | 16 +++++++++++++++ tests/wf/updatedir_inplace.cwl | 18 +++++++++++++++++ 4 files changed, 74 insertions(+), 2 deletions(-) create mode 100644 tests/wf/updatedir.cwl create mode 100644 tests/wf/updatedir_inplace.cwl diff --git a/cwltool/job.py b/cwltool/job.py index ba36e35ef..75dfc3eac 100644 --- a/cwltool/job.py +++ b/cwltool/job.py @@ -96,8 +96,10 @@ def relink_initialworkdir(pathmapper, inplace_update=False): continue if vol.type in ("File", "Directory") or (inplace_update and vol.type in ("WritableFile", "WritableDirectory")): - if os.path.exists(vol.target): + if os.path.islink(vol.target) or os.path.isfile(vol.target): os.remove(vol.target) + elif os.path.isdir(vol.target): + os.rmdir(vol.target) os.symlink(vol.resolved, vol.target) class JobBase(object): @@ -275,7 +277,7 @@ def run(self, pull_image=True, rm_container=True, stageFiles(self.pathmapper, os.symlink, ignoreWritable=True) if self.generatemapper: - stageFiles(self.generatemapper, os.symlink) + stageFiles(self.generatemapper, os.symlink, ignoreWritable=self.inplace_update) relink_initialworkdir(self.generatemapper, inplace_update=self.inplace_update) self._execute([], env, rm_tmpdir=rm_tmpdir, move_outputs=move_outputs) diff --git a/tests/test_ext.py b/tests/test_ext.py index dfd54b8cf..d2d47fcba 100644 --- a/tests/test_ext.py +++ b/tests/test_ext.py @@ -103,3 +103,39 @@ def test_sequencing(self): # self.assertEquals(main(["--enable-ext", get_data('tests/wf/mut3.cwl'), "-a", os.path.join(tmp, "value")]), 0) # finally: # shutil.rmtree(tmp) + + def test_updatedir(self): + try: + tmp = tempfile.mkdtemp() + with open(os.path.join(tmp, "value"), "w") as f: + f.write("1") + out = tempfile.mkdtemp() + + self.assertFalse(os.path.exists(os.path.join(tmp, "blurb"))) + self.assertFalse(os.path.exists(os.path.join(out, "blurb"))) + + self.assertEquals(main(["--outdir", out, get_data('tests/wf/updatedir.cwl'), "-r", tmp]), 0) + + self.assertFalse(os.path.exists(os.path.join(tmp, "blurb"))) + self.assertTrue(os.path.exists(os.path.join(out, "inp/blurb"))) + finally: + shutil.rmtree(tmp) + shutil.rmtree(out) + + def test_updateval_inplace(self): + try: + tmp = tempfile.mkdtemp() + with open(os.path.join(tmp, "value"), "w") as f: + f.write("1") + out = tempfile.mkdtemp() + + self.assertFalse(os.path.exists(os.path.join(tmp, "blurb"))) + self.assertFalse(os.path.exists(os.path.join(out, "blurb"))) + + self.assertEquals(main(["--enable-ext", "--leave-outputs", "--outdir", out, get_data('tests/wf/updatedir_inplace.cwl'), "-r", tmp]), 0) + + self.assertTrue(os.path.exists(os.path.join(tmp, "blurb"))) + self.assertFalse(os.path.exists(os.path.join(out, "inp/blurb"))) + finally: + shutil.rmtree(tmp) + shutil.rmtree(out) diff --git a/tests/wf/updatedir.cwl b/tests/wf/updatedir.cwl new file mode 100644 index 000000000..3a3222123 --- /dev/null +++ b/tests/wf/updatedir.cwl @@ -0,0 +1,16 @@ +class: CommandLineTool +cwlVersion: v1.0 +requirements: + InitialWorkDirRequirement: + listing: + - entry: $(inputs.r) + entryname: inp + writable: true +inputs: + r: Directory +outputs: + out: + type: Directory + outputBinding: + glob: inp +arguments: [touch, inp/blurb] \ No newline at end of file diff --git a/tests/wf/updatedir_inplace.cwl b/tests/wf/updatedir_inplace.cwl new file mode 100644 index 000000000..d15b95789 --- /dev/null +++ b/tests/wf/updatedir_inplace.cwl @@ -0,0 +1,18 @@ +class: CommandLineTool +cwlVersion: v1.0 +requirements: + InitialWorkDirRequirement: + listing: + - entry: $(inputs.r) + entryname: inp + writable: true + InplaceUpdateRequirement: + inplaceUpdate: true +inputs: + r: Directory +outputs: + out: + type: Directory + outputBinding: + glob: inp +arguments: [touch, inp/blurb] \ No newline at end of file From 648490f427851a728701a544597b99fd9d197e05 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Thu, 16 Mar 2017 14:29:17 -0400 Subject: [PATCH 17/27] Rebase on master. Fix tests. --- cwltool/draft2tool.py | 2 +- cwltool/extensions.yml | 1 + tests/wf/updatedir_inplace.cwl | 4 +++- tests/wf/updateval_inplace.cwl | 4 +++- 4 files changed, 8 insertions(+), 3 deletions(-) diff --git a/cwltool/draft2tool.py b/cwltool/draft2tool.py index 511583195..9b6fe75a7 100644 --- a/cwltool/draft2tool.py +++ b/cwltool/draft2tool.py @@ -367,7 +367,7 @@ def rm_pending_output_callback(output_callbacks, jobcachepending, ls[i] = t["entry"] j.generatefiles[u"listing"] = ls - inplaceUpdateReq = self.get_requirement("InplaceUpdateRequirement")[0] + inplaceUpdateReq = self.get_requirement("http://commonwl.org/cwltool#InplaceUpdateRequirement")[0] if inplaceUpdateReq: j.inplace_update = inplaceUpdateReq["inplaceUpdate"] diff --git a/cwltool/extensions.yml b/cwltool/extensions.yml index d568f3afe..b2ad43617 100644 --- a/cwltool/extensions.yml +++ b/cwltool/extensions.yml @@ -24,6 +24,7 @@ $graph: - name: InplaceUpdateRequirement type: record + inVocab: false extends: cwl:ProcessRequirement fields: class: diff --git a/tests/wf/updatedir_inplace.cwl b/tests/wf/updatedir_inplace.cwl index d15b95789..9c98a5dd5 100644 --- a/tests/wf/updatedir_inplace.cwl +++ b/tests/wf/updatedir_inplace.cwl @@ -1,12 +1,14 @@ class: CommandLineTool cwlVersion: v1.0 +$namespaces: + cwltool: http://commonwl.org/cwltool# requirements: InitialWorkDirRequirement: listing: - entry: $(inputs.r) entryname: inp writable: true - InplaceUpdateRequirement: + cwltool:InplaceUpdateRequirement: inplaceUpdate: true inputs: r: Directory diff --git a/tests/wf/updateval_inplace.cwl b/tests/wf/updateval_inplace.cwl index 000d0db05..6c032fbd1 100644 --- a/tests/wf/updateval_inplace.cwl +++ b/tests/wf/updateval_inplace.cwl @@ -1,11 +1,13 @@ class: CommandLineTool cwlVersion: v1.0 +$namespaces: + cwltool: "http://commonwl.org/cwltool#" requirements: InitialWorkDirRequirement: listing: - entry: $(inputs.r) writable: true - InplaceUpdateRequirement: + cwltool:InplaceUpdateRequirement: inplaceUpdate: true inputs: r: File From 973d8556e6b6f3ba5d5a6997805717616d68ff50 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Thu, 16 Mar 2017 14:31:51 -0400 Subject: [PATCH 18/27] Check that readers is not falsy. --- cwltool/draft2tool.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cwltool/draft2tool.py b/cwltool/draft2tool.py index 9b6fe75a7..4cd384977 100644 --- a/cwltool/draft2tool.py +++ b/cwltool/draft2tool.py @@ -471,8 +471,9 @@ def collect_output_ports(self, ports, builder, outdir, compute_checksum=True, jo except validate.ValidationException as e: raise WorkflowException("Error validating output record, " + Text(e) + "\n in " + json.dumps(ret, indent=4)) finally: - for r in readers.values(): - builder.mutation_manager.release_reader(jobname, r) + if readers: + for r in readers.values(): + builder.mutation_manager.release_reader(jobname, r) def collect_output(self, schema, builder, outdir, fs_access, compute_checksum=True): # type: (Dict[Text, Any], Builder, Text, StdFsAccess, bool) -> Union[Dict[Text, Any], List[Union[Dict[Text, Any], Text]]] From 3e0caa2c38fb7f2aa7d33d9f69e931eb22469dd4 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Fri, 24 Mar 2017 17:09:27 -0400 Subject: [PATCH 19/27] Downgrade validation warnings when checking input & output objects. --- cwltool/draft2tool.py | 5 +++-- cwltool/process.py | 21 ++++++++++++++++++--- 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/cwltool/draft2tool.py b/cwltool/draft2tool.py index 4cd384977..d8b1f80df 100644 --- a/cwltool/draft2tool.py +++ b/cwltool/draft2tool.py @@ -21,7 +21,7 @@ from .errors import WorkflowException from .job import JobBase, CommandLineJob, DockerCommandLineJob from .pathmapper import PathMapper, get_listing, trim_listing -from .process import Process, shortname, uniquename, normalizeFilesDirs, compute_checksums +from .process import Process, shortname, uniquename, normalizeFilesDirs, compute_checksums, DebugLogger from .stdfsaccess import StdFsAccess from .utils import aslist @@ -466,7 +466,8 @@ def collect_output_ports(self, ports, builder, outdir, compute_checksum=True, jo if compute_checksum: adjustFileObjs(ret, partial(compute_checksums, fs_access)) - validate.validate_ex(self.names.get_name("outputs_record_schema", ""), ret) + validate.validate_ex(self.names.get_name("outputs_record_schema", ""), ret, + strict=False, logger=DebugLogger()) return ret if ret is not None else {} except validate.ValidationException as e: raise WorkflowException("Error validating output record, " + Text(e) + "\n in " + json.dumps(ret, indent=4)) diff --git a/cwltool/process.py b/cwltool/process.py index 571fda1a4..371eebbdb 100644 --- a/cwltool/process.py +++ b/cwltool/process.py @@ -234,8 +234,9 @@ def moveIt(src, dst): _logger.debug("Moving %s to %s", src, dst) shutil.move(src, dst) return - _logger.debug("Copying %s to %s", src, dst) - shutil.copy(src, dst) + if src != dst: + _logger.debug("Copying %s to %s", src, dst) + shutil.copy(src, dst) outfiles = [] # type: List[Dict[Text, Any]] collectFilesAndDirs(outputObj, outfiles) @@ -342,6 +343,19 @@ def avroize_type(field_type, name_prefix=""): avroize_type(field_type["items"], name_prefix) return field_type +class DebugLogger(object): + def debug(*args, **kwargs): + _logger.debug(*args, **kwargs) + + def info(*args, **kwargs): + _logger.debug(*args, **kwargs) + + def warn(*args, **kwargs): + _logger.debug(*args, **kwargs) + + def error(*args, **kwargs): + _logger.debug(*args, **kwargs) + class Process(object): __metaclass__ = abc.ABCMeta @@ -472,7 +486,8 @@ def _init_job(self, joborder, **kwargs): try: fillInDefaults(self.tool[u"inputs"], builder.job) normalizeFilesDirs(builder.job) - validate.validate_ex(self.names.get_name("input_record_schema", ""), builder.job) + validate.validate_ex(self.names.get_name("input_record_schema", ""), builder.job, + strict=False, logger=DebugLogger()) except (validate.ValidationException, WorkflowException) as e: raise WorkflowException("Invalid job input record:\n" + Text(e)) From bef5afcc20ef76473e39fc68803e5eedaa180f25 Mon Sep 17 00:00:00 2001 From: Gijs Molenaar Date: Thu, 30 Mar 2017 16:15:48 +0200 Subject: [PATCH 20/27] Recursive copy (#356) --- cwltool/process.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/cwltool/process.py b/cwltool/process.py index 371eebbdb..3a8117349 100644 --- a/cwltool/process.py +++ b/cwltool/process.py @@ -236,7 +236,15 @@ def moveIt(src, dst): return if src != dst: _logger.debug("Copying %s to %s", src, dst) - shutil.copy(src, dst) + try: + # first try a tree copy + shutil.copytree(src, dst) + except OSError as exc: + # check for exception is faster + if exc.errno == errno.ENOTDIR: + # if no dir copy file + shutil.copy(src, dst) + else: raise outfiles = [] # type: List[Dict[Text, Any]] collectFilesAndDirs(outputObj, outfiles) From 5c9fc205229bc92108b9187c51ed6b886f34a193 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Thu, 30 Mar 2017 13:15:07 -0400 Subject: [PATCH 21/27] Fix lint --- cwltool/process.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cwltool/process.py b/cwltool/process.py index 3a8117349..836b53788 100644 --- a/cwltool/process.py +++ b/cwltool/process.py @@ -244,7 +244,8 @@ def moveIt(src, dst): if exc.errno == errno.ENOTDIR: # if no dir copy file shutil.copy(src, dst) - else: raise + else: + raise outfiles = [] # type: List[Dict[Text, Any]] collectFilesAndDirs(outputObj, outfiles) From 07423ec7a5034d8235db0aeebd6b2389897dd35b Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Wed, 10 May 2017 09:52:05 -0400 Subject: [PATCH 22/27] Use logger filter, less hacky and should make tox happier. --- cwltool/draft2tool.py | 4 ++-- cwltool/mutation.py | 10 ++++++---- cwltool/process.py | 27 ++++++++++++--------------- 3 files changed, 20 insertions(+), 21 deletions(-) diff --git a/cwltool/draft2tool.py b/cwltool/draft2tool.py index 84e16cde1..4729e0ce8 100644 --- a/cwltool/draft2tool.py +++ b/cwltool/draft2tool.py @@ -21,7 +21,7 @@ from .errors import WorkflowException from .job import JobBase, CommandLineJob, DockerCommandLineJob from .pathmapper import PathMapper, get_listing, trim_listing -from .process import Process, shortname, uniquename, normalizeFilesDirs, compute_checksums, DebugLogger +from .process import Process, shortname, uniquename, normalizeFilesDirs, compute_checksums, _logger_validation_warnings from .stdfsaccess import StdFsAccess from .utils import aslist @@ -471,7 +471,7 @@ def collect_output_ports(self, ports, builder, outdir, compute_checksum=True, jo adjustFileObjs(ret, partial(compute_checksums, fs_access)) validate.validate_ex(self.names.get_name("outputs_record_schema", ""), ret, - strict=False, logger=DebugLogger()) + strict=False, logger=_logger_validation_warnings) return ret if ret is not None else {} except validate.ValidationException as e: raise WorkflowException("Error validating output record. " + Text(e) + "\n in " + json.dumps(ret, indent=4)) diff --git a/cwltool/mutation.py b/cwltool/mutation.py index 98a2d1f28..a91e3cd31 100644 --- a/cwltool/mutation.py +++ b/cwltool/mutation.py @@ -6,6 +6,8 @@ MutationState = namedtuple("MutationTracker", ["generation", "readers", "stepname"]) +_generation = "http://commonwl.org/cwltool#generation" + class MutationManager(object): """Lock manager for checking correctness of in-place update of files. @@ -23,7 +25,7 @@ def register_reader(self, stepname, obj): # type: (Text, Dict[Text, Any]) -> None loc = obj["location"] current = self.generations.get(loc, MutationState(0, [], "")) - obj_generation = obj.get("_generation", 0) + obj_generation = obj.get(_generation, 0) if obj_generation != current.generation: raise WorkflowException("[job %s] wants to read %s from generation %i but current generation is %s (last updated by %s)" % ( @@ -36,7 +38,7 @@ def release_reader(self, stepname, obj): # type: (Text, Dict[Text, Any]) -> None loc = obj["location"] current = self.generations.get(loc, MutationState(0, [], "")) - obj_generation = obj.get("_generation", 0) + obj_generation = obj.get(_generation, 0) if obj_generation != current.generation: raise WorkflowException("[job %s] wants to release reader on %s from generation %i but current generation is %s (last updated by %s)" % ( @@ -48,7 +50,7 @@ def register_mutation(self, stepname, obj): # type: (Text, Dict[Text, Any]) -> None loc = obj["location"] current = self.generations.get(loc, MutationState(0,[], "")) - obj_generation = obj.get("_generation", 0) + obj_generation = obj.get(_generation, 0) if len(current.readers) > 0: raise WorkflowException("[job %s] wants to modify %s but has readers: %s" % ( @@ -64,4 +66,4 @@ def set_generation(self, obj): # type: (Dict) -> None loc = obj["location"] current = self.generations.get(loc, MutationState(0,[], "")) - obj["_generation"] = current.generation + obj[_generation] = current.generation diff --git a/cwltool/process.py b/cwltool/process.py index c825283b1..9dc39afe5 100644 --- a/cwltool/process.py +++ b/cwltool/process.py @@ -33,7 +33,18 @@ from .stdfsaccess import StdFsAccess from .utils import aslist, get_feature +class LogAsDebugFilter(logging.Filter): + def __init__(self, name, parent): + super(LogAsDebugFilter, self).__init__(name) + self.parent = parent + + def filter(self, record): + return self.parent.isEnabledFor(logging.DEBUG) + _logger = logging.getLogger("cwltool") +_logger_validation_warnings = logging.getLogger("cwltool.validation_warnings") +_logger_validation_warnings.setLevel(_logger.getEffectiveLevel()) +_logger_validation_warnings.addFilter(LogAsDebugFilter("cwltool.validation_warnings", _logger)) supportedProcessRequirements = ["DockerRequirement", "SchemaDefRequirement", @@ -345,20 +356,6 @@ def avroize_type(field_type, name_prefix=""): avroize_type(field_type["items"], name_prefix) return field_type -class DebugLogger(object): - def debug(*args, **kwargs): - _logger.debug(*args, **kwargs) - - def info(*args, **kwargs): - _logger.debug(*args, **kwargs) - - def warn(*args, **kwargs): - _logger.debug(*args, **kwargs) - - def error(*args, **kwargs): - _logger.debug(*args, **kwargs) - - class Process(object): __metaclass__ = abc.ABCMeta @@ -489,7 +486,7 @@ def _init_job(self, joborder, **kwargs): fillInDefaults(self.tool[u"inputs"], builder.job) normalizeFilesDirs(builder.job) validate.validate_ex(self.names.get_name("input_record_schema", ""), builder.job, - strict=False, logger=DebugLogger()) + strict=False, logger=_logger_validation_warnings) except (validate.ValidationException, WorkflowException) as e: raise WorkflowException("Invalid job input record:\n" + Text(e)) From ca81c56ab50d50f5400833368c190c98288b4b1f Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Wed, 10 May 2017 10:42:57 -0400 Subject: [PATCH 23/27] Better handling for relocating directory outputs to final output. --- cwltool/main.py | 2 +- cwltool/process.py | 31 +++++++++++++++++++++++++++++-- 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/cwltool/main.py b/cwltool/main.py index 113418643..094537c77 100755 --- a/cwltool/main.py +++ b/cwltool/main.py @@ -214,7 +214,7 @@ def output_callback(out, processStatus): raise WorkflowException("Must provide 'basedir' in kwargs") output_dirs = set() - finaloutdir = kwargs.get("outdir") + finaloutdir = os.path.abspath(kwargs.get("outdir")) kwargs["outdir"] = tempfile.mkdtemp(prefix=kwargs["tmp_outdir_prefix"]) if kwargs.get( "tmp_outdir_prefix") else tempfile.mkdtemp() output_dirs.add(kwargs["outdir"]) diff --git a/cwltool/process.py b/cwltool/process.py index 9dc39afe5..7c8fd52de 100644 --- a/cwltool/process.py +++ b/cwltool/process.py @@ -241,9 +241,15 @@ def relocateOutputs(outputObj, outdir, output_dirs, action, fs_access): def moveIt(src, dst): if action == "move": for a in output_dirs: - if src.startswith(a): + if src.startswith(a+"/"): _logger.debug("Moving %s to %s", src, dst) - shutil.move(src, dst) + if os.path.isdir(src) and os.path.isdir(dst): + # merge directories + for root, dirs, files in os.walk(src): + for f in dirs+files: + moveIt(os.path.join(root, f), os.path.join(dst, f)) + else: + shutil.move(src, dst) return if src != dst: _logger.debug("Copying %s to %s", src, dst) @@ -267,6 +273,27 @@ def _check_adjust(f): visit_class(outputObj, ("File",), functools.partial(compute_checksums, fs_access)) + # If there are symlinks to intermediate output directories, we want to move + # the real files into the final output location. If a file is linked more than once, + # make an internal relative symlink. + if action == "move": + relinked = {} + for root, dirs, files in os.walk(outdir): + for f in dirs+files: + path = os.path.join(root, f) + rp = os.path.realpath(path) + if path != rp: + if rp in relinked: + os.unlink(path) + os.symlink(os.path.relpath(relinked[rp], path), path) + else: + for od in output_dirs: + if rp.startswith(od+"/"): + os.unlink(path) + os.rename(rp, path) + relinked[rp] = path + break + return outputObj From 328d871cc319de38d38572d5a0e5224f3d5ffecb Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Wed, 10 May 2017 10:44:42 -0400 Subject: [PATCH 24/27] Tox fix --- cwltool/process.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cwltool/process.py b/cwltool/process.py index 7c8fd52de..e3b81817d 100644 --- a/cwltool/process.py +++ b/cwltool/process.py @@ -34,13 +34,14 @@ from .utils import aslist, get_feature class LogAsDebugFilter(logging.Filter): - def __init__(self, name, parent): + def __init__(self, name, parent): # type: (Text, logging.Logger) -> None super(LogAsDebugFilter, self).__init__(name) self.parent = parent def filter(self, record): return self.parent.isEnabledFor(logging.DEBUG) + _logger = logging.getLogger("cwltool") _logger_validation_warnings = logging.getLogger("cwltool.validation_warnings") _logger_validation_warnings.setLevel(_logger.getEffectiveLevel()) From d86c9239d47e9b0a65c3afb9281bc21b68ae0426 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Wed, 10 May 2017 10:50:45 -0400 Subject: [PATCH 25/27] Fix outputdir --- cwltool/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cwltool/main.py b/cwltool/main.py index 094537c77..e90018a3f 100755 --- a/cwltool/main.py +++ b/cwltool/main.py @@ -214,7 +214,7 @@ def output_callback(out, processStatus): raise WorkflowException("Must provide 'basedir' in kwargs") output_dirs = set() - finaloutdir = os.path.abspath(kwargs.get("outdir")) + finaloutdir = os.path.abspath(kwargs.get("outdir")) if kwargs.get("outdir") else None kwargs["outdir"] = tempfile.mkdtemp(prefix=kwargs["tmp_outdir_prefix"]) if kwargs.get( "tmp_outdir_prefix") else tempfile.mkdtemp() output_dirs.add(kwargs["outdir"]) From cf5cad3043db68a0f354e9335c42aace59c5757d Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Wed, 10 May 2017 10:54:09 -0400 Subject: [PATCH 26/27] Tox 2 --- cwltool/process.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cwltool/process.py b/cwltool/process.py index e3b81817d..2ef0db49e 100644 --- a/cwltool/process.py +++ b/cwltool/process.py @@ -34,7 +34,7 @@ from .utils import aslist, get_feature class LogAsDebugFilter(logging.Filter): - def __init__(self, name, parent): # type: (Text, logging.Logger) -> None + def __init__(self, name, parent): # type: (str, logging.Logger) -> None super(LogAsDebugFilter, self).__init__(name) self.parent = parent @@ -278,7 +278,7 @@ def _check_adjust(f): # the real files into the final output location. If a file is linked more than once, # make an internal relative symlink. if action == "move": - relinked = {} + relinked = {} # type: Dict[str, str] for root, dirs, files in os.walk(outdir): for f in dirs+files: path = os.path.join(root, f) From 8ff477c416e361e5990699abd24ac4c29e61a7eb Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Wed, 10 May 2017 10:57:03 -0400 Subject: [PATCH 27/27] Tox 3 --- cwltool/process.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cwltool/process.py b/cwltool/process.py index 2ef0db49e..02c5684c1 100644 --- a/cwltool/process.py +++ b/cwltool/process.py @@ -278,7 +278,7 @@ def _check_adjust(f): # the real files into the final output location. If a file is linked more than once, # make an internal relative symlink. if action == "move": - relinked = {} # type: Dict[str, str] + relinked = {} # type: Dict[Text, Text] for root, dirs, files in os.walk(outdir): for f in dirs+files: path = os.path.join(root, f)