Skip to content

Commit a91c16c

Browse files
authored
Updatable inputs (#317)
* Implement mutation manager to validate that files can be safely modified in place. * Test workflows for mutable inputs. * Add in-place update tests. * Set up Docker binds to support writable items. * Inplace update works with & without Docker. * Add InplaceUpdateRequirement. * Refactor job to split Docker and non-Docker execution into separate classes. * Manage readers for inplace updates. * Record readers, last update step. * Downgrade validation warnings when checking input & output objects. * Recursive copy (#356) * Better handling for relocating directory outputs to final output.
1 parent 41a80e3 commit a91c16c

17 files changed

+614
-145
lines changed

cwltool/builder.py

+2
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from .pathmapper import PathMapper, normalizeFilesDirs, get_listing, visit_class
1212
from .stdfsaccess import StdFsAccess
1313
from .utils import aslist
14+
from .mutation import MutationManager
1415

1516
CONTENT_LIMIT = 64 * 1024
1617

@@ -41,6 +42,7 @@ def __init__(self): # type: () -> None
4142
self.make_fs_access = None # type: Type[StdFsAccess]
4243
self.build_job_script = None # type: Callable[[List[str]], Text]
4344
self.debug = False # type: bool
45+
self.mutation_manager = None # type: MutationManager
4446

4547
# One of "no_listing", "shallow_listing", "deep_listing"
4648
# Will be default "no_listing" for CWL v1.1

cwltool/draft2tool.py

+58-16
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,14 @@
1414
import shellescape
1515
from schema_salad.ref_resolver import file_uri, uri_file_path
1616
from schema_salad.sourceline import SourceLine, indent
17-
from typing import Any, Callable, cast, Generator, Text, Union
17+
from typing import Any, Callable, cast, Generator, Text, Union, Dict
1818

1919
from .builder import CONTENT_LIMIT, substitute, Builder
2020
from .pathmapper import adjustFileObjs, adjustDirObjs, visit_class
2121
from .errors import WorkflowException
22-
from .job import CommandLineJob
22+
from .job import JobBase, CommandLineJob, DockerCommandLineJob
2323
from .pathmapper import PathMapper, get_listing, trim_listing
24-
from .process import Process, shortname, uniquename, normalizeFilesDirs, compute_checksums
24+
from .process import Process, shortname, uniquename, normalizeFilesDirs, compute_checksums, _logger_validation_warnings
2525
from .stdfsaccess import StdFsAccess
2626
from .utils import aslist
2727

@@ -148,8 +148,9 @@ def run(self, **kwargs):
148148

149149
# map files to assigned path inside a container. We need to also explicitly
150150
# walk over input as implicit reassignment doesn't reach everything in builder.bindings
151-
def check_adjust(builder, f):
152-
# type: (Builder, Dict[Text, Any]) -> Dict[Text, Any]
151+
def check_adjust(builder, stepname, f):
152+
# type: (Builder, Text, Dict[Text, Any]) -> Dict[Text, Any]
153+
153154
f["path"] = builder.pathmapper.mapper(f["location"])[1]
154155
f["dirname"], f["basename"] = os.path.split(f["path"])
155156
if f["class"] == "File":
@@ -171,20 +172,23 @@ def __init__(self, toolpath_object, **kwargs):
171172
# type: (Dict[Text, Any], **Any) -> None
172173
super(CommandLineTool, self).__init__(toolpath_object, **kwargs)
173174

174-
def makeJobRunner(self): # type: () -> CommandLineJob
175-
return CommandLineJob()
175+
def makeJobRunner(self): # type: () -> JobBase
176+
dockerReq, _ = self.get_requirement("DockerRequirement")
177+
if dockerReq:
178+
return DockerCommandLineJob()
179+
else:
180+
return CommandLineJob()
176181

177182
def makePathMapper(self, reffiles, stagedir, **kwargs):
178183
# type: (List[Any], Text, **Any) -> PathMapper
179-
dockerReq, _ = self.get_requirement("DockerRequirement")
180184
return PathMapper(reffiles, kwargs["basedir"], stagedir)
181185

182186
def job(self,
183187
job_order, # type: Dict[Text, Text]
184188
output_callbacks, # type: Callable[[Any, Any], Any]
185189
**kwargs # type: Any
186190
):
187-
# type: (...) -> Generator[Union[CommandLineJob, CallbackJob], None, None]
191+
# type: (...) -> Generator[Union[JobBase, CallbackJob], None, None]
188192

189193
jobname = uniquename(kwargs.get("name", shortname(self.tool.get("id", "job"))))
190194

@@ -199,9 +203,9 @@ def job(self,
199203
cachebuilder.stagedir,
200204
separateDirs=False)
201205
_check_adjust = partial(check_adjust, cachebuilder)
202-
203206
visit_class([cachebuilder.files, cachebuilder.bindings],
204207
("File", "Directory"), _check_adjust)
208+
205209
cmdline = flatten(map(cachebuilder.generate_arg, cachebuilder.bindings))
206210
(docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
207211
if docker_req and kwargs.get("use_container") is not False:
@@ -296,7 +300,7 @@ def rm_pending_output_callback(output_callbacks, jobcachepending,
296300
_logger.debug(u"[job %s] path mappings is %s", j.name,
297301
json.dumps({p: builder.pathmapper.mapper(p) for p in builder.pathmapper.files()}, indent=4))
298302

299-
_check_adjust = partial(check_adjust, builder)
303+
_check_adjust = partial(check_adjust, builder, jobname)
300304

301305
visit_class([builder.files, builder.bindings], ("File", "Directory"), _check_adjust)
302306

@@ -368,8 +372,38 @@ def rm_pending_output_callback(output_callbacks, jobcachepending,
368372
ls[i] = t["entry"]
369373
j.generatefiles[u"listing"] = ls
370374

375+
inplaceUpdateReq = self.get_requirement("http://commonwl.org/cwltool#InplaceUpdateRequirement")[0]
376+
377+
if inplaceUpdateReq:
378+
j.inplace_update = inplaceUpdateReq["inplaceUpdate"]
371379
normalizeFilesDirs(j.generatefiles)
372380

381+
readers = {}
382+
muts = set()
383+
384+
def register_mut(f):
385+
muts.add(f["location"])
386+
builder.mutation_manager.register_mutation(j.name, f)
387+
388+
def register_reader(f):
389+
if f["location"] not in muts:
390+
builder.mutation_manager.register_reader(j.name, f)
391+
readers[f["location"]] = f
392+
393+
for li in j.generatefiles["listing"]:
394+
li = cast(Dict[Text, Any], li)
395+
if li.get("writable") and j.inplace_update:
396+
adjustFileObjs(li, register_mut)
397+
adjustDirObjs(li, register_mut)
398+
else:
399+
adjustFileObjs(li, register_reader)
400+
adjustDirObjs(li, register_reader)
401+
402+
adjustFileObjs(builder.files, register_reader)
403+
adjustFileObjs(builder.bindings, register_reader)
404+
adjustDirObjs(builder.files, register_reader)
405+
adjustDirObjs(builder.bindings, register_reader)
406+
373407
j.environment = {}
374408
evr = self.get_requirement("EnvVarRequirement")[0]
375409
if evr:
@@ -391,16 +425,17 @@ def rm_pending_output_callback(output_callbacks, jobcachepending,
391425
j.pathmapper = builder.pathmapper
392426
j.collect_outputs = partial(
393427
self.collect_output_ports, self.tool["outputs"], builder,
394-
compute_checksum=kwargs.get("compute_checksum", True))
428+
compute_checksum=kwargs.get("compute_checksum", True),
429+
jobname=jobname,
430+
readers=readers)
395431
j.output_callback = output_callbacks
396432

397433
yield j
398434

399-
def collect_output_ports(self, ports, builder, outdir, compute_checksum=True):
400-
# type: (Set[Dict[Text, Any]], Builder, Text, bool) -> Dict[Text, Union[Text, List[Any], Dict[Text, Any]]]
435+
def collect_output_ports(self, ports, builder, outdir, compute_checksum=True, jobname="", readers=None):
436+
# type: (Set[Dict[Text, Any]], Builder, Text, bool, Text, Dict[Text, Any]) -> Dict[Text, Union[Text, List[Any], Dict[Text, Any]]]
401437
ret = {} # type: Dict[Text, Union[Text, List[Any], Dict[Text, Any]]]
402438
try:
403-
404439
fs_access = builder.make_fs_access(outdir)
405440
custom_output = fs_access.join(outdir, "cwl.output.json")
406441
if fs_access.exists(custom_output):
@@ -429,14 +464,21 @@ def collect_output_ports(self, ports, builder, outdir, compute_checksum=True):
429464
visit_class(ret, ("File", "Directory"), cast(Callable[[Any], Any], revmap))
430465
visit_class(ret, ("File", "Directory"), remove_path)
431466
normalizeFilesDirs(ret)
467+
adjustFileObjs(ret, builder.mutation_manager.set_generation)
432468
visit_class(ret, ("File", "Directory"), partial(check_valid_locations, fs_access))
469+
433470
if compute_checksum:
434471
adjustFileObjs(ret, partial(compute_checksums, fs_access))
435472

436-
validate.validate_ex(self.names.get_name("outputs_record_schema", ""), ret)
473+
validate.validate_ex(self.names.get_name("outputs_record_schema", ""), ret,
474+
strict=False, logger=_logger_validation_warnings)
437475
return ret if ret is not None else {}
438476
except validate.ValidationException as e:
439477
raise WorkflowException("Error validating output record. " + Text(e) + "\n in " + json.dumps(ret, indent=4))
478+
finally:
479+
if readers:
480+
for r in readers.values():
481+
builder.mutation_manager.release_reader(jobname, r)
440482

441483
def collect_output(self, schema, builder, outdir, fs_access, compute_checksum=True):
442484
# type: (Dict[Text, Any], Builder, Text, StdFsAccess, bool) -> Union[Dict[Text, Any], List[Union[Dict[Text, Any], Text]]]

cwltool/extensions.yml

+15-1
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,18 @@ $graph:
1919
type:
2020
- type: enum
2121
name: LoadListingEnum
22-
symbols: [no_listing, shallow_listing, deep_listing]
22+
symbols: [no_listing, shallow_listing, deep_listing]
23+
24+
- name: InplaceUpdateRequirement
25+
type: record
26+
inVocab: false
27+
extends: cwl:ProcessRequirement
28+
fields:
29+
class:
30+
type: string
31+
doc: "Always 'InplaceUpdateRequirement'"
32+
jsonldPredicate:
33+
"_id": "@type"
34+
"_type": "@vocab"
35+
inplaceUpdate:
36+
type: boolean

0 commit comments

Comments
 (0)