Skip to content

Commit 34bef35

Browse files
author
Peter Amstutz
committed
Refactor wip
1 parent ef0bce9 commit 34bef35

File tree

6 files changed

+104
-98
lines changed

6 files changed

+104
-98
lines changed

cwltool/builder.py

-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ def __init__(self): # type: () -> None
4545
self.build_job_script = None # type: Callable[[List[str]], Text]
4646
self.debug = False # type: bool
4747
self.mutation_manager = None # type: MutationManager
48-
self.tool_id = None # type: Text
4948

5049
# One of "no_listing", "shallow_listing", "deep_listing"
5150
# Will be default "no_listing" for CWL v1.1

cwltool/draft2tool.py

+10-13
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,6 @@ def job(self,
8080
j.hints = self.hints
8181
j.outdir = None
8282
j.tmpdir = None
83-
j.tool_id = self.tool["id"]
8483

8584
yield j
8685

@@ -176,9 +175,9 @@ def __init__(self, toolpath_object, **kwargs):
176175
# type: (Dict[Text, Any], **Any) -> None
177176
super(CommandLineTool, self).__init__(toolpath_object, **kwargs)
178177

179-
def makeJobRunner(self, **kwargs): # type: (Optional[bool]) -> JobBase
180-
dockerReq, _ = self.get_requirement("DockerRequirement", kwargs)
181-
if dockerReq and kwargs.get("use_container", True):
178+
def makeJobRunner(self, use_container=True): # type: (Optional[bool]) -> JobBase
179+
dockerReq, _ = self.get_requirement("DockerRequirement")
180+
if dockerReq and use_container:
182181
return DockerCommandLineJob()
183182
else:
184183
for t in reversed(self.requirements):
@@ -216,7 +215,7 @@ def job(self,
216215
("File", "Directory"), _check_adjust)
217216

218217
cmdline = flatten(map(cachebuilder.generate_arg, cachebuilder.bindings))
219-
(docker_req, docker_is_req) = self.get_requirement("DockerRequirement", kwargs)
218+
(docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
220219
if docker_req and kwargs.get("use_container") is not False:
221220
dockerimg = docker_req.get("dockerImageId") or docker_req.get("dockerPull")
222221
cmdline = ["docker", "run", dockerimg] + cmdline
@@ -277,7 +276,7 @@ def rm_pending_output_callback(output_callbacks, jobcachepending,
277276

278277
reffiles = copy.deepcopy(builder.files)
279278

280-
j = self.makeJobRunner(**kwargs)
279+
j = self.makeJobRunner(kwargs.get("use_container"))
281280
j.builder = builder
282281
j.joborder = builder.job
283282
j.stdin = None
@@ -289,8 +288,6 @@ def rm_pending_output_callback(output_callbacks, jobcachepending,
289288
j.requirements = self.requirements
290289
j.hints = self.hints
291290
j.name = jobname
292-
j.tool_id = self.tool["id"]
293-
j.overrides = kwargs.get("overrides", [])
294291

295292
if _logger.isEnabledFor(logging.DEBUG):
296293
_logger.debug(u"[job %s] initializing from %s%s",
@@ -335,7 +332,7 @@ def rm_pending_output_callback(output_callbacks, jobcachepending,
335332
if _logger.isEnabledFor(logging.DEBUG):
336333
_logger.debug(u"[job %s] command line bindings is %s", j.name, json.dumps(builder.bindings, indent=4))
337334

338-
dockerReq = self.get_requirement("DockerRequirement", kwargs)[0]
335+
dockerReq = self.get_requirement("DockerRequirement")[0]
339336
if dockerReq and kwargs.get("use_container"):
340337
out_prefix = kwargs.get("tmp_outdir_prefix")
341338
j.outdir = kwargs.get("outdir") or tempfile.mkdtemp(prefix=out_prefix)
@@ -347,7 +344,7 @@ def rm_pending_output_callback(output_callbacks, jobcachepending,
347344
j.tmpdir = builder.tmpdir
348345
j.stagedir = builder.stagedir
349346

350-
initialWorkdir = self.get_requirement("InitialWorkDirRequirement", kwargs)[0]
347+
initialWorkdir = self.get_requirement("InitialWorkDirRequirement")[0]
351348
j.generatefiles = {"class": "Directory", "listing": [], "basename": ""}
352349
if initialWorkdir:
353350
ls = [] # type: List[Dict[Text, Any]]
@@ -383,7 +380,7 @@ def rm_pending_output_callback(output_callbacks, jobcachepending,
383380
ls[i] = t["entry"]
384381
j.generatefiles[u"listing"] = ls
385382

386-
inplaceUpdateReq = self.get_requirement("http://commonwl.org/cwltool#InplaceUpdateRequirement", kwargs)[0]
383+
inplaceUpdateReq = self.get_requirement("http://commonwl.org/cwltool#InplaceUpdateRequirement")[0]
387384

388385
if inplaceUpdateReq:
389386
j.inplace_update = inplaceUpdateReq["inplaceUpdate"]
@@ -417,12 +414,12 @@ def register_reader(f):
417414
adjustDirObjs(builder.bindings, register_reader)
418415

419416
j.environment = {}
420-
evr = self.get_requirement("EnvVarRequirement", kwargs)[0]
417+
evr = self.get_requirement("EnvVarRequirement")[0]
421418
if evr:
422419
for t in evr["envDef"]:
423420
j.environment[t["envName"]] = builder.do_eval(t["envValue"])
424421

425-
shellcmd = self.get_requirement("ShellCommandRequirement", kwargs)[0]
422+
shellcmd = self.get_requirement("ShellCommandRequirement")[0]
426423
if shellcmd:
427424
cmd = [] # type: List[Text]
428425
for b in builder.bindings:

cwltool/job.py

+2-3
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,6 @@ def __init__(self): # type: () -> None
127127
self.generatefiles = None # type: Dict[Text, Union[List[Dict[Text, Text]], Dict[Text, Text], Text]]
128128
self.stagedir = None # type: Text
129129
self.inplace_update = None # type: bool
130-
self.overrides = None # type: List[Dict[Text,Dict[Text, Text]]]
131130

132131
def _setup(self): # type: () -> None
133132
if not os.path.exists(self.outdir):
@@ -150,7 +149,7 @@ def _setup(self): # type: () -> None
150149
def _execute(self, runtime, env, rm_tmpdir=True, move_outputs="move"):
151150
# type: (List[Text], MutableMapping[Text, Text], bool, Text) -> None
152151

153-
scr, _ = get_feature(self, "ShellCommandRequirement", {})
152+
scr, _ = get_feature(self, "ShellCommandRequirement")
154153

155154
shouldquote = None # type: Callable[[Any], Any]
156155
if scr:
@@ -321,7 +320,7 @@ def run(self, pull_image=True, rm_container=True,
321320
rm_tmpdir=True, move_outputs="move", **kwargs):
322321
# type: (bool, bool, bool, Text, **Any) -> Union[Tuple[Text, Dict[None, None]], None]
323322

324-
(docker_req, docker_is_req) = get_feature(self, "DockerRequirement", kwargs)
323+
(docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
325324

326325
img_id = None
327326
env = None # type: MutableMapping[Text, Text]

cwltool/main.py

+82-68
Original file line numberDiff line numberDiff line change
@@ -230,19 +230,20 @@ def output_callback(out, processStatus):
230230
output_dirs.add(kwargs["outdir"])
231231
kwargs["mutation_manager"] = MutationManager()
232232

233-
jobReqs = []
233+
jobReqs = None
234234
if "cwl:requirements" in job_order_object:
235235
jobReqs = job_order_object["cwl:requirements"]
236236
del job_order_object["cwl:requirements"]
237237
elif ("cwl:defaults" in t.metadata and "cwl:requirements" in t.metadata["cwl:defaults"]):
238238
jobReqs = t.metadata["cwl:defaults"]["cwl:requirements"]
239+
if jobReqs:
240+
for req in jobReqs:
241+
t.requirements.append(req)
239242

240243
if "http://commonwl.org/cwltool#overrides" in job_order_object:
241244
kwargs["overrides"] = resolve_overrides(job_order_object, t.tool["id"])
242245
del job_order_object["http://commonwl.org/cwltool#overrides"]
243246

244-
kwargs["requirements"] = jobReqs
245-
246247
if kwargs.get("default_container"):
247248
t.requirements.insert(0, {
248249
"class": "DockerRequirement",
@@ -428,15 +429,12 @@ def generate_parser(toolparser, tool, namemap, records):
428429
return toolparser
429430

430431

431-
def load_job_order(args, t, stdin, print_input_deps=False, relative_deps=False,
432-
stdout=sys.stdout, make_fs_access=None, fetcher_constructor=None):
432+
def load_job_order(args, stdin, fetcher_constructor=None):
433433
# type: (argparse.Namespace, Process, IO[Any], bool, bool, IO[Any], Callable[[Text], StdFsAccess], Callable[[Dict[unicode, unicode], requests.sessions.Session], Fetcher]) -> Union[int, Tuple[Dict[Text, Any], Text]]
434434

435-
job_order_object = None
435+
job_order_object = {}
436436

437-
_jobloaderctx = jobloaderctx.copy()
438-
_jobloaderctx.update(t.metadata.get("$namespaces", {}))
439-
loader = Loader(_jobloaderctx, fetcher_constructor=fetcher_constructor)
437+
loader = Loader(jobloaderctx, fetcher_constructor=fetcher_constructor)
440438

441439
if len(args.job_order) == 1 and args.job_order[0][0] != "-":
442440
job_order_file = args.job_order[0]
@@ -454,48 +452,53 @@ def load_job_order(args, t, stdin, print_input_deps=False, relative_deps=False,
454452
job_order_object, _ = loader.resolve_ref(job_order_file, checklinks=False)
455453
except Exception as e:
456454
_logger.error(Text(e), exc_info=args.debug)
457-
return 1
455+
return (1, None)
458456
toolparser = None
459-
else:
460-
input_basedir = args.basedir if args.basedir else os.getcwd()
461-
namemap = {} # type: Dict[Text, Text]
462-
records = [] # type: List[Text]
463-
toolparser = generate_parser(
464-
argparse.ArgumentParser(prog=args.workflow), t, namemap, records)
465-
if toolparser:
466-
if args.tool_help:
467-
toolparser.print_help()
468-
return 0
469-
cmd_line = vars(toolparser.parse_args(args.job_order))
470-
for record_name in records:
471-
record = {}
472-
record_items = {
473-
k: v for k, v in cmd_line.iteritems()
474-
if k.startswith(record_name)}
475-
for key, value in record_items.iteritems():
476-
record[key[len(record_name) + 1:]] = value
477-
del cmd_line[key]
478-
cmd_line[str(record_name)] = record
479-
480-
if cmd_line["job_order"]:
481-
try:
482-
input_basedir = args.basedir if args.basedir else os.path.abspath(
483-
os.path.dirname(cmd_line["job_order"]))
484-
job_order_object = loader.resolve_ref(cmd_line["job_order"])
485-
except Exception as e:
486-
_logger.error(Text(e), exc_info=args.debug)
487-
return 1
488-
else:
489-
job_order_object = {"id": args.workflow}
490457

491-
del cmd_line["job_order"]
458+
return (job_order_object, loader)
459+
460+
def job_order_from_command_line(args, t, loader, input_basedir,
461+
print_input_deps=False,
462+
relative_deps=False,
463+
stdout=sys.stdout):
464+
namemap = {} # type: Dict[Text, Text]
465+
records = [] # type: List[Text]
466+
toolparser = generate_parser(
467+
argparse.ArgumentParser(prog=args.workflow), t, namemap, records)
468+
if toolparser:
469+
if args.tool_help:
470+
toolparser.print_help()
471+
return 0
472+
cmd_line = vars(toolparser.parse_args(args.job_order))
473+
for record_name in records:
474+
record = {}
475+
record_items = {
476+
k: v for k, v in cmd_line.iteritems()
477+
if k.startswith(record_name)}
478+
for key, value in record_items.iteritems():
479+
record[key[len(record_name) + 1:]] = value
480+
del cmd_line[key]
481+
cmd_line[str(record_name)] = record
482+
483+
if cmd_line["job_order"]:
484+
try:
485+
input_basedir = args.basedir if args.basedir else os.path.abspath(
486+
os.path.dirname(cmd_line["job_order"]))
487+
job_order_object, _ = loader.resolve_ref(cmd_line["job_order"])
488+
except Exception as e:
489+
_logger.error(Text(e), exc_info=args.debug)
490+
return 1
491+
else:
492+
job_order_object = {"id": args.workflow}
492493

493-
job_order_object.update({namemap[k]: v for k, v in cmd_line.items()})
494+
del cmd_line["job_order"]
494495

495-
if _logger.isEnabledFor(logging.DEBUG):
496-
_logger.debug(u"Parsed job order from command line: %s", json.dumps(job_order_object, indent=4))
497-
else:
498-
job_order_object = None
496+
job_order_object.update({namemap[k]: v for k, v in cmd_line.items()})
497+
498+
if _logger.isEnabledFor(logging.DEBUG):
499+
_logger.debug(u"Parsed job order from command line: %s", json.dumps(job_order_object, indent=4))
500+
else:
501+
job_order_object = {}
499502

500503
for inp in t.tool["inputs"]:
501504
if "default" in inp and (not job_order_object or shortname(inp["id"]) not in job_order_object):
@@ -509,13 +512,16 @@ def load_job_order(args, t, stdin, print_input_deps=False, relative_deps=False,
509512
toolparser.print_help()
510513
_logger.error("")
511514
_logger.error("Input object required, use --help for details")
512-
return 1
515+
return (1, None)
513516

514517
if print_input_deps:
515518
printdeps(job_order_object, loader, stdout, relative_deps, "",
516519
basedir=file_uri(input_basedir + "/"))
517-
return 0
520+
return (0, None)
518521

522+
return (job_order_object, input_basedir)
523+
524+
def normalize_job_order(job_order_object):
519525
def pathToLoc(p):
520526
if "location" not in p and "path" in p:
521527
p["location"] = p["path"]
@@ -530,8 +536,6 @@ def pathToLoc(p):
530536
if "id" in job_order_object:
531537
del job_order_object["id"]
532538

533-
return (job_order_object, input_basedir)
534-
535539

536540
def makeRelative(base, ob):
537541
u = ob.get("location", ob.get("path"))
@@ -701,6 +705,21 @@ def main(argsl=None, # type: List[str]
701705
else:
702706
use_standard_schema("v1.0")
703707

708+
try:
709+
if job_order_object is None:
710+
job_order_object, job_order_loader = load_job_order(args, stdin,
711+
fetcher_constructor=fetcher_constructor)
712+
else:
713+
job_order_loader = None
714+
except SystemExit as e:
715+
return e.code
716+
717+
if isinstance(job_order_object, int):
718+
return job_order_object
719+
720+
if args.overrides:
721+
args.overrides = load_overrides(file_uri(os.path.abspath(args.overrides)), args.workflow)
722+
704723
try:
705724
document_loader, workflowobj, uri = fetch_document(args.workflow, resolver=resolver,
706725
fetcher_constructor=fetcher_constructor)
@@ -774,28 +793,23 @@ def main(argsl=None, # type: List[str]
774793
setattr(args, 'move_outputs', "copy")
775794
setattr(args, "tmp_outdir_prefix", args.cachedir)
776795

777-
try:
778-
if job_order_object is None:
779-
job_order_object = load_job_order(args, tool, stdin,
780-
print_input_deps=args.print_input_deps,
781-
relative_deps=args.relative_deps,
782-
stdout=stdout,
783-
make_fs_access=make_fs_access,
784-
fetcher_constructor=fetcher_constructor)
785-
except SystemExit as e:
786-
return e.code
787-
788-
if isinstance(job_order_object, int):
789-
return job_order_object
796+
input_basedir = args.basedir if args.basedir else os.getcwd()
797+
if job_order_loader:
798+
jofc, input_basedir = job_order_from_command_line(args, tool, job_order_loader, input_basedir,
799+
print_input_deps=args.print_input_deps,
800+
relative_deps=args.relative_deps,
801+
stdout=stdout)
802+
if isinstance(jofc, int):
803+
return jofc
804+
job_order_object.update(jofc)
790805

791-
if args.overrides:
792-
args.overrides = load_overrides(file_uri(os.path.abspath(args.overrides)), tool.tool["id"])
806+
normalize_job_order(job_order_object)
793807

794808
try:
795-
setattr(args, 'basedir', job_order_object[1])
809+
setattr(args, 'basedir', input_basedir)
796810
del args.workflow
797811
del args.job_order
798-
(out, status) = executor(tool, job_order_object[0],
812+
(out, status) = executor(tool, job_order_object,
799813
makeTool=makeTool,
800814
select_resources=selectResources,
801815
make_fs_access=make_fs_access,

cwltool/process.py

+9-7
Original file line numberDiff line numberDiff line change
@@ -424,10 +424,12 @@ def __init__(self, toolpath_object, **kwargs):
424424
self.names = names
425425
self.tool = toolpath_object
426426
self.requirements = kwargs.get("requirements", []) + self.tool.get("requirements", [])
427+
for ov in reversed(kwargs.get("overrides", [])):
428+
if ov.get("overrideTarget") == self.tool["id"]:
429+
self.requirements.extend(ov.get("override", []))
430+
427431
self.hints = kwargs.get("hints", []) + self.tool.get("hints", [])
428432
self.formatgraph = None # type: Graph
429-
self.tool_id = self.tool["id"]
430-
431433
if "loader" in kwargs:
432434
self.formatgraph = kwargs["loader"].graph
433435

@@ -534,11 +536,11 @@ def _init_job(self, joborder, **kwargs):
534536
builder.debug = kwargs.get("debug")
535537
builder.mutation_manager = kwargs.get("mutation_manager")
536538

537-
dockerReq, is_req = self.get_requirement("DockerRequirement", kwargs)
539+
dockerReq, is_req = self.get_requirement("DockerRequirement")
538540
builder.make_fs_access = kwargs.get("make_fs_access") or StdFsAccess
539541
builder.fs_access = builder.make_fs_access(kwargs["basedir"])
540542

541-
loadListingReq, _ = self.get_requirement("http://commonwl.org/cwltool#LoadListingRequirement", kwargs)
543+
loadListingReq, _ = self.get_requirement("http://commonwl.org/cwltool#LoadListingRequirement")
542544
if loadListingReq:
543545
builder.loadListing = loadListingReq.get("loadListing")
544546

@@ -604,7 +606,7 @@ def _init_job(self, joborder, **kwargs):
604606

605607
def evalResources(self, builder, kwargs):
606608
# type: (Builder, Dict[AnyStr, Any]) -> Dict[Text, Union[int, Text]]
607-
resourceReq, _ = self.get_requirement("ResourceRequirement", kwargs)
609+
resourceReq, _ = self.get_requirement("ResourceRequirement")
608610
if resourceReq is None:
609611
resourceReq = {}
610612
request = {
@@ -657,8 +659,8 @@ def validate_hints(self, avsc_names, hints, strict):
657659
else:
658660
_logger.info(sl.makeError(u"Unknown hint %s" % (r["class"])))
659661

660-
def get_requirement(self, feature, kwargs={}): # type: (Any) -> Tuple[Any, bool]
661-
return get_feature(self, feature, kwargs)
662+
def get_requirement(self, feature): # type: (Any) -> Tuple[Any, bool]
663+
return get_feature(self, feature)
662664

663665
def visit(self, op): # type: (Callable[[Dict[Text, Any]], None]) -> None
664666
op(self.tool)

0 commit comments

Comments
 (0)