From a17eec607af8e700fd30fbb0a306aa0dbd8ba524 Mon Sep 17 00:00:00 2001 From: "Michael R. Crusoe" Date: Thu, 21 Apr 2016 01:27:32 -0700 Subject: [PATCH] more type improvements --- cwltool/__main__.py | 2 +- cwltool/builder.py | 16 ++++---- cwltool/cwlrdf.py | 19 ++++++---- cwltool/cwltest.py | 9 +++-- cwltool/draft2tool.py | 8 ++-- cwltool/expression.py | 8 ++-- cwltool/job.py | 8 ++-- cwltool/main.py | 23 +++++++----- cwltool/pathmapper.py | 40 ++++++++++---------- cwltool/process.py | 79 +++++++++++++++++++++++----------------- cwltool/sandboxjs.py | 13 ++++--- cwltool/update.py | 48 ++++++++++++++++-------- cwltool/workflow.py | 11 +++--- tests/test_pathmapper.py | 2 +- 14 files changed, 164 insertions(+), 122 deletions(-) diff --git a/cwltool/__main__.py b/cwltool/__main__.py index ae4ff8a78..2b15c84f5 100644 --- a/cwltool/__main__.py +++ b/cwltool/__main__.py @@ -1,4 +1,4 @@ -import main +from . import main import sys sys.exit(main.main()) diff --git a/cwltool/builder.py b/cwltool/builder.py index 8f3af934a..36d4663b4 100644 --- a/cwltool/builder.py +++ b/cwltool/builder.py @@ -33,20 +33,20 @@ class Builder(object): def __init__(self): # type: () -> None self.names = None # type: avro.schema.Names - self.schemaDefs = None # type: Dict[str,Dict[str,str]] - self.files = None # type: List[str] + self.schemaDefs = None # type: Dict[str,Dict[unicode, Any]] + self.files = None # type: List[Dict[str, str]] self.fs_access = None # type: StdFsAccess - self.job = None # type: Dict[str,str] + self.job = None # type: Dict[str, Any] self.requirements = None # type: List[Dict[str,Any]] self.outdir = None # type: str self.tmpdir = None # type: str - self.resources = None # type: Dict[str,str] - self.bindings = [] # type: List[Dict[str,str]] + self.resources = None # type: Dict[str, Union[int, str]] + self.bindings = [] # type: List[Dict[str, Any]] self.timeout = None # type: int self.pathmapper = None # type: PathMapper def bind_input(self, schema, datum, lead_pos=[], tail_pos=[]): - # type: (Dict[str,Any], Any, List[int], List[int]) -> List[Dict[str,str]] + # type: (Dict[unicode, Any], Any, List[int], List[int]) -> List[Dict[str, Any]] bindings = [] # type: List[Dict[str,str]] binding = None # type: Dict[str,Any] if "inputBinding" in schema and isinstance(schema["inputBinding"], dict): @@ -64,7 +64,7 @@ def bind_input(self, schema, datum, lead_pos=[], tail_pos=[]): # Handle union types if isinstance(schema["type"], list): for t in schema["type"]: - if isinstance(t, basestring) and self.names.has_name(t, ""): + if isinstance(t, (str, unicode)) and self.names.has_name(t, ""): avsc = self.names.get_name(t, "") elif isinstance(t, dict) and "name" in t and self.names.has_name(t["name"], ""): avsc = self.names.get_name(t["name"], "") @@ -148,7 +148,7 @@ def _capture_files(f): return bindings - def tostr(self, value): # type(Any) -> str + def tostr(self, value): # type: (Any) -> str if isinstance(value, dict) and value.get("class") == "File": if "path" not in value: raise WorkflowException(u"File object must have \"path\": %s" % (value)) diff --git a/cwltool/cwlrdf.py b/cwltool/cwlrdf.py index c6e46230d..8e603b7e5 100644 --- a/cwltool/cwlrdf.py +++ b/cwltool/cwlrdf.py @@ -1,16 +1,19 @@ import json import urlparse +from schema_salad.ref_resolver import Loader from rdflib import Graph, plugin, URIRef from rdflib.serializer import Serializer from typing import Any, Union, Dict, IO def makerdf(workflow, wf, ctx): - # type: (str, Dict[str,Any], Dict[str,Union[str, Dict[str,str]]]) -> Graph + # type: (str, Dict[str,Any], Loader.ContextType) -> Graph prefixes = {} for k,v in ctx.iteritems(): if isinstance(v, dict): - v = v["@id"] - doc_url, frg = urlparse.urldefrag(v) + url = v["@id"] + else: + url = v + doc_url, frg = urlparse.urldefrag(url) if "/" in frg: p, _ = frg.split("/") prefixes[p] = u"%s#%s/" % (doc_url, p) @@ -22,13 +25,13 @@ def makerdf(workflow, wf, ctx): for s,p,o in g.triples((None, URIRef("@id"), None)): g.remove((s, p, o)) - for k,v in prefixes.iteritems(): - g.namespace_manager.bind(k, v) + for k2,v2 in prefixes.iteritems(): + g.namespace_manager.bind(k2, v2) return g def printrdf(workflow, wf, ctx, sr, stdout): - # type: (str, Dict[str,Any], Dict[str,Union[str, Dict[str,str]]], str, IO[Any]) -> None + # type: (str, Dict[str,Any], Loader.ContextType, str, IO[Any]) -> None stdout.write(makerdf(workflow, wf, ctx).serialize(format=sr)) def lastpart(uri): # type: (Any) -> str @@ -158,7 +161,7 @@ def dot_without_parameters(g, stdout): # type: (Graph, IO[Any]) -> None }""") for src, sink, srcrun, sinkrun in qres: - attr = "" + attr = u"" if srcrun in clusternode: attr += u'ltail="%s"' % dotname[srcrun] src = clusternode[srcrun] @@ -169,7 +172,7 @@ def dot_without_parameters(g, stdout): # type: (Graph, IO[Any]) -> None def printdot(workflow, wf, ctx, stdout, include_parameters=False): - # type: (str, Dict[str,Any], Dict[str,Union[str, Dict[str,str]]], Any, bool) -> None + # type: (str, Dict[str,Any], Loader.ContextType, Any, bool) -> None g = makerdf(workflow, wf, ctx) stdout.write("digraph {") diff --git a/cwltool/cwltest.py b/cwltool/cwltest.py index c4441bb49..cb6b6f72a 100755 --- a/cwltool/cwltest.py +++ b/cwltool/cwltest.py @@ -165,9 +165,12 @@ def main(): # type: () -> int tests = [] for t in alltests: loader = schema_salad.ref_resolver.Loader({"id": "@id"}) - cwl, _ = loader.resolve_ref(t["tool"]) - if cwl["class"] == "CommandLineTool": - tests.append(t) + cwl = loader.resolve_ref(t["tool"])[0] + if isinstance(cwl, dict): + if cwl["class"] == "CommandLineTool": + tests.append(t) + else: + raise Exception("Unexpected code path.") if args.l: for i, t in enumerate(tests): diff --git a/cwltool/draft2tool.py b/cwltool/draft2tool.py index ab9cb12d6..b318f5df0 100644 --- a/cwltool/draft2tool.py +++ b/cwltool/draft2tool.py @@ -100,7 +100,7 @@ def __init__(self, toolpath_object, **kwargs): # type: (Dict[str,Any], **Any) -> None super(CommandLineTool, self).__init__(toolpath_object, **kwargs) - def makeJobRunner(self): + def makeJobRunner(self): # type: () -> CommandLineJob return CommandLineJob() def makePathMapper(self, reffiles, input_basedir, **kwargs): @@ -267,7 +267,7 @@ def collect_output_ports(self, ports, builder, outdir): raise WorkflowException("Error validating output record, " + str(e) + "\n in " + json.dumps(ret, indent=4)) def collect_output(self, schema, builder, outdir): - # type: (Dict[str,Any],Builder,str) -> Union[Dict[str,Any],List[Union[Dict[str,Any],str]]] + # type: (Dict[str,Any], Builder, str) -> Union[Dict[str, Any], List[Union[Dict[str, Any], str]]] r = [] # type: List[Any] if "outputBinding" in schema: binding = schema["outputBinding"] @@ -372,9 +372,9 @@ def collect_output(self, schema, builder, outdir): if (not r and isinstance(schema["type"], dict) and schema["type"]["type"] == "record"): - out = {} # type: Dict[str, Any] + out = {} for f in schema["type"]["fields"]: - out[shortname(f["name"])] = self.collect_output( + out[shortname(f["name"])] = self.collect_output( # type: ignore f, builder, outdir) return out return r diff --git a/cwltool/expression.py b/cwltool/expression.py index 312801589..bea675812 100644 --- a/cwltool/expression.py +++ b/cwltool/expression.py @@ -15,14 +15,14 @@ _logger = logging.getLogger("cwltool") def jshead(engineConfig, rootvars): - # type: (List[str],Dict[str,str]) -> str - return "\n".join(engineConfig + [u"var %s = %s;" % (k, json.dumps(v, indent=4)) for k, v in rootvars.items()]) + # type: (List[unicode],Dict[str,str]) -> unicode + return u"\n".join(engineConfig + [u"var %s = %s;" % (k, json.dumps(v, indent=4)) for k, v in rootvars.items()]) def exeval(ex, jobinput, requirements, outdir, tmpdir, context, pull_image): # type: (Dict[str,Any], Dict[str,str], List[Dict[str, Any]], str, str, Any, bool) -> sandboxjs.JSON if ex["engine"] == "https://w3id.org/cwl/cwl#JavascriptEngine": - engineConfig = [] # type: List[str] + engineConfig = [] # type: List[unicode] for r in reversed(requirements): if r["class"] == "ExpressionEngineRequirement" and r["id"] == "https://w3id.org/cwl/cwl#JavascriptEngine": engineConfig = r.get("engineConfig", []) @@ -126,7 +126,7 @@ def param_interpolate(ex, obj, strip=True): def do_eval(ex, jobinput, requirements, outdir, tmpdir, resources, context=None, pull_image=True, timeout=None): - # type: (Any, Dict[str,str], List[Dict[str,Any]], str, str, Dict[str,str], Any, bool, int) -> Any + # type: (Any, Dict[str,str], List[Dict[str,Any]], str, str, Dict[str, Union[int, str]], Any, bool, int) -> Any runtime = resources.copy() runtime["tmpdir"] = tmpdir diff --git a/cwltool/job.py b/cwltool/job.py index d96ab5418..9a3f78bf3 100644 --- a/cwltool/job.py +++ b/cwltool/job.py @@ -48,10 +48,10 @@ def __init__(self): # type: () -> None self.successCodes = None # type: Iterable[int] self.temporaryFailCodes = None # type: Iterable[int] self.permanentFailCodes = None # type: Iterable[int] - self.requirements = None # type: Dict[str,str] + self.requirements = None # type: List[Dict[str, str]] self.hints = None # type: Dict[str,str] - self.name = None # type: str - self.command_line = None # type: List[str] + self.name = None # type: unicode + self.command_line = None # type: List[unicode] self.pathmapper = None # type: PathMapper self.collect_outputs = None # type: Union[Callable[[Any], Any],functools.partial[Any]] self.output_callback = None # type: Callable[[Any, Any], Any] @@ -69,7 +69,7 @@ def run(self, dry_run=False, pull_image=True, rm_container=True, #with open(os.path.join(outdir, "cwl.input.json"), "w") as fp: # json.dump(self.joborder, fp) - runtime = [] # type: List[str] + runtime = [] # type: List[unicode] env = {"TMPDIR": self.tmpdir} # type: Mapping[str,str] (docker_req, docker_is_req) = get_feature(self, "DockerRequirement") diff --git a/cwltool/main.py b/cwltool/main.py index 9d44400ff..84014eb17 100755 --- a/cwltool/main.py +++ b/cwltool/main.py @@ -22,7 +22,7 @@ from .process import shortname, Process import rdflib from .utils import aslist -from typing import Union, Any, cast, Callable, Tuple, IO +from typing import Union, Any, cast, Callable, Dict, Tuple, IO _logger = logging.getLogger("cwltool") @@ -294,7 +294,7 @@ def load_tool(argsworkflow, updateonly, strict, makeTool, debug, rdf_serializer=None, stdout=sys.stdout, urifrag=None): - # type: (Union[str,unicode,dict[str,Any]], bool, bool, Callable[...,Process], bool, bool, bool, bool, bool, bool, Any, Any, Any) -> Any + # type: (Union[str,unicode,dict[unicode,Any]], bool, bool, Callable[...,Process], bool, bool, bool, bool, bool, bool, Any, Any, Any) -> Any (document_loader, avsc_names, schema_metadata) = process.get_schema() if isinstance(avsc_names, Exception): @@ -302,7 +302,7 @@ def load_tool(argsworkflow, updateonly, strict, makeTool, debug, jobobj = None uri = None # type: str - workflowobj = None # type: Dict[str, Any] + workflowobj = None # type: Dict[unicode, Any] if isinstance(argsworkflow, (basestring)): split = urlparse.urlsplit(cast(str, argsworkflow)) if split.scheme: @@ -343,9 +343,11 @@ def load_tool(argsworkflow, updateonly, strict, makeTool, debug, return 0 try: - processobj, metadata = schema_salad.schema.load_and_validate(document_loader, avsc_names, workflowobj, strict) + processobj, metadata = schema_salad.schema.load_and_validate( + document_loader, avsc_names, workflowobj, strict) except (schema_salad.validate.ValidationException, RuntimeError) as e: - _logger.error(u"Tool definition failed validation:\n%s", e, exc_info=(e if debug else False)) + _logger.error(u"Tool definition failed validation:\n%s", e, + exc_info=(e if debug else False)) return 1 if print_pre: @@ -401,7 +403,10 @@ def load_job_order(args, t, parser, stdin, print_input_deps=False, relative_deps if args.conformance_test: loader = Loader({}) else: - jobloaderctx = {"path": {"@type": "@id"}, "format": {"@type": "@id"}, "id": "@id"} + jobloaderctx = { + "path": {"@type": "@id"}, + "format": {"@type": "@id"}, + "id": "@id"} jobloaderctx.update(t.metadata.get("$namespaces", {})) loader = Loader(jobloaderctx) @@ -478,7 +483,7 @@ def load_job_order(args, t, parser, stdin, print_input_deps=False, relative_deps def printdeps(obj, document_loader, stdout, relative_deps, basedir=None): - # type: (Dict[str,Any], Loader, IO[Any], bool, str) -> None + # type: (Dict[unicode, Any], Loader, IO[Any], bool, str) -> None deps = {"class": "File", "path": obj.get("id", "#")} @@ -507,7 +512,7 @@ def makeRelative(u): stdout.write(json.dumps(deps, indent=4)) def versionstring(): - # type: () -> str + # type: () -> unicode pkg = pkg_resources.require("cwltool") if pkg: return u"%s %s" % (sys.argv[0], pkg[0].version) @@ -524,7 +529,7 @@ def main(argsl=None, stdout=sys.stdout, stderr=sys.stderr, versionfunc=versionstring): - # type: (List[str],Callable[...,Union[str,Dict[str,str]]],Callable[...,Process],Callable[[Dict[str,int]],Dict[str,int]],argparse.ArgumentParser,IO[Any],IO[Any],IO[Any],Callable[[],str]) -> int + # type: (List[str],Callable[...,Union[str,Dict[str,str]]],Callable[...,Process],Callable[[Dict[str,int]],Dict[str,int]],argparse.ArgumentParser,IO[Any],IO[Any],IO[Any],Callable[[],unicode]) -> int _logger.removeHandler(defaultStreamHandler) _logger.addHandler(logging.StreamHandler(stderr)) diff --git a/cwltool/pathmapper.py b/cwltool/pathmapper.py index c310a0163..005cc3f8d 100644 --- a/cwltool/pathmapper.py +++ b/cwltool/pathmapper.py @@ -20,13 +20,12 @@ class PathMapper(object): """Mapping of files from relative path provided in the file to a tuple of (absolute local path, absolute container path)""" - def __new__(cls, referenced_files, basedir, *args, **kwargs): - # type: (Set[str], str) -> Any - instance = super(PathMapper,cls).__new__(cls) - instance._pathmap = {} # type: Dict[str, Tuple[str, str]] - return instance - def __init__(self, referenced_files, basedir): + # type: (Set[str], str) -> None + self._pathmap = {} # type: Dict[str, Tuple[str, str]] + self.setup(referenced_files, basedir) + + def setup(self, referenced_files, basedir): # type: (Set[str], str) -> None for src in referenced_files: ab = abspath(src, basedir) @@ -52,30 +51,28 @@ def reversemap(self, target): # type: (str) -> Tuple[str, str] class DockerPathMapper(PathMapper): - def __new__(cls, referenced_files, basedir): - # type: (Set[str], str) -> None - instance = super(DockerPathMapper,cls).__new__(cls, referenced_files, basedir) - instance.dirs = {} # type: Dict[str, Union[bool, str]] - return instance - def __init__(self, referenced_files, basedir): # type: (Set[str], str) -> None + self.dirs = {} # type: Dict[str, Union[bool, str]] + super(DockerPathMapper, self).__init__(referenced_files, basedir) + + def setup(self, referenced_files, basedir): for src in referenced_files: ab = abspath(src, basedir) - dir, fn = os.path.split(ab) + dirn, fn = os.path.split(ab) subdir = False for d in self.dirs: - if dir.startswith(d): + if dirn.startswith(d): subdir = True break if not subdir: for d in list(self.dirs): - if d.startswith(dir): - # 'dir' is a parent of 'd' + if d.startswith(dirn): + # 'dirn' is a parent of 'd' del self.dirs[d] - self.dirs[dir] = True + self.dirs[dirn] = True prefix = "job" + str(random.randint(1, 1000000000)) + "_" @@ -85,7 +82,8 @@ def __init__(self, referenced_files, basedir): i = 1 while name in names: i += 1 - name = os.path.join("/var/lib/cwl", prefix + os.path.basename(d) + str(i)) + name = os.path.join("/var/lib/cwl", + prefix + os.path.basename(d) + str(i)) names.add(name) self.dirs[d] = name @@ -96,9 +94,11 @@ def __init__(self, referenced_files, basedir): st = os.lstat(deref) while stat.S_ISLNK(st.st_mode): rl = os.readlink(deref) - deref = rl if os.path.isabs(rl) else os.path.join(os.path.dirname(deref), rl) + deref = rl if os.path.isabs(rl) else os.path.join( + os.path.dirname(deref), rl) st = os.lstat(deref) for d in self.dirs: if ab.startswith(d): - self._pathmap[src] = (deref, os.path.join(self.dirs[d], ab[len(d)+1:])) + self._pathmap[src] = (deref, os.path.join( + self.dirs[d], ab[len(d)+1:])) diff --git a/cwltool/process.py b/cwltool/process.py index 758ac1199..54479c9ce 100644 --- a/cwltool/process.py +++ b/cwltool/process.py @@ -8,7 +8,7 @@ import copy import logging import pprint -from .utils import aslist +from .utils import aslist, get_feature import schema_salad.schema from schema_salad.ref_resolver import Loader import urlparse @@ -20,11 +20,13 @@ import glob from .errors import WorkflowException from .pathmapper import abspath -from typing import Any, Union, IO, AnyStr, Tuple +from typing import Any, Callable, Generator, Union, IO, AnyStr, Tuple +from collections import Iterable from rdflib import URIRef from rdflib.namespace import RDFS, OWL from .stdfsaccess import StdFsAccess import errno +from rdflib import Graph _logger = logging.getLogger("cwltool") @@ -70,7 +72,8 @@ 'vocab_res_schema.yml', 'vocab_res_src.yml', 'vocab_res_proc.yml') -def get_schema(): # type: () -> Tuple[Loader,avro.schema.Names,List[Dict[str,Any]]] +def get_schema(): + # type: () -> Tuple[Loader, Union[avro.schema.Names, avro.schema.SchemaParseException], Dict[unicode,Any]] cache = {} for f in cwl_files: rs = resource_stream(__name__, 'schemas/draft-3/' + f) @@ -84,16 +87,6 @@ def get_schema(): # type: () -> Tuple[Loader,avro.schema.Names,List[Dict[str,An return schema_salad.schema.load_schema("https://w3id.org/cwl/CommonWorkflowLanguage.yml", cache=cache) -def get_feature(self, feature): # type: (Any, Any) -> Tuple[Any, bool] - for t in reversed(self.requirements): - if t["class"] == feature: - return (t, True) - for t in reversed(self.hints): - if t["class"] == feature: - return (t, False) - return (None, None) - - def shortname(inputid): # type: (str) -> str d = urlparse.urlparse(inputid) @@ -107,6 +100,7 @@ class UnsupportedRequirement(Exception): pass def checkRequirements(rec, supportedProcessRequirements): + # type: (Any, Iterable[Any]) -> None if isinstance(rec, dict): if "requirements" in rec: for r in rec["requirements"]: @@ -118,7 +112,7 @@ def checkRequirements(rec, supportedProcessRequirements): for d in rec: checkRequirements(d, supportedProcessRequirements) -def adjustFiles(rec, op): +def adjustFiles(rec, op): # type: (Any, Callable[..., Any]) -> None """Apply a mapping function to each File path in the object `rec`.""" if isinstance(rec, dict): @@ -148,6 +142,7 @@ def adjustFilesWithSecondary(rec, op, primary=None): adjustFilesWithSecondary(d, op, primary) def formatSubclassOf(fmt, cls, ontology, visited): + # type: (str, str, Graph, Set[str]) -> bool """Determine if `fmt` is a subclass of `cls`.""" if URIRef(fmt) == URIRef(cls): @@ -157,23 +152,23 @@ def formatSubclassOf(fmt, cls, ontology, visited): return False if fmt in visited: - return + return False visited.add(fmt) - fmt = URIRef(fmt) + uriRefFmt = URIRef(fmt) - for s,p,o in ontology.triples( (fmt, RDFS.subClassOf, None) ): + for s,p,o in ontology.triples( (uriRefFmt, RDFS.subClassOf, None) ): # Find parent classes of `fmt` and search upward if formatSubclassOf(o, cls, ontology, visited): return True - for s,p,o in ontology.triples( (fmt, OWL.equivalentClass, None) ): + for s,p,o in ontology.triples( (uriRefFmt, OWL.equivalentClass, None) ): # Find equivalent classes of `fmt` and search horizontally if formatSubclassOf(o, cls, ontology, visited): return True - for s,p,o in ontology.triples( (None, OWL.equivalentClass, fmt) ): + for s,p,o in ontology.triples( (None, OWL.equivalentClass, uriRefFmt) ): # Find equivalent classes of `fmt` and search horizontally if formatSubclassOf(s, cls, ontology, visited): return True @@ -181,7 +176,8 @@ def formatSubclassOf(fmt, cls, ontology, visited): return False -def checkFormat(actualFile, inputFormats, requirements, ontology): +def checkFormat(actualFile, inputFormats, ontology): + # type: (Union[Dict[str, Any], List[Dict[str, Any]]], Any, Graph) -> None for af in aslist(actualFile): if "format" not in af: raise validate.ValidationException(u"Missing required 'format' for File %s" % af) @@ -191,6 +187,7 @@ def checkFormat(actualFile, inputFormats, requirements, ontology): raise validate.ValidationException(u"Incompatible file format %s required format(s) %s" % (af["format"], inputFormats)) def fillInDefaults(inputs, job): + # type: (List[Dict[str, str]], Dict[str, str]) -> None for inp in inputs: if shortname(inp["id"]) in job: pass @@ -207,19 +204,23 @@ class Process(object): def __init__(self, toolpath_object, **kwargs): # type: (Dict[str,Any], **Any) -> None self.metadata = None # type: Dict[str,Any] - self.names = get_schema()[1] + self.names = None # type: avro.schema.Names + n = get_schema()[1] + if isinstance(n, avro.schema.SchemaParseException): + raise n + else: + self.names = n self.tool = toolpath_object self.requirements = kwargs.get("requirements", []) + self.tool.get("requirements", []) self.hints = kwargs.get("hints", []) + self.tool.get("hints", []) + self.formatgraph = None # type: Graph if "loader" in kwargs: self.formatgraph = kwargs["loader"].graph - else: - self.formatgraph = None checkRequirements(self.tool, supportedProcessRequirements) self.validate_hints(self.tool.get("hints", []), strict=kwargs.get("strict")) - self.schemaDefs = {} # type: Dict[str,Dict[str,str]] + self.schemaDefs = {} # type: Dict[str,Dict[unicode, Any]] sd, _ = self.get_requirement("SchemaDefRequirement") @@ -231,8 +232,12 @@ def __init__(self, toolpath_object, **kwargs): avro.schema.make_avsc_object(av, self.names) # Build record schema from inputs - self.inputs_record_schema = {"name": "input_record_schema", "type": "record", "fields": []} - self.outputs_record_schema = {"name": "outputs_record_schema", "type": "record", "fields": []} + self.inputs_record_schema = { + "name": "input_record_schema", "type": "record", + "fields": []} # type: Dict[unicode, Any] + self.outputs_record_schema = { + "name": "outputs_record_schema", "type": "record", + "fields": []} # type: Dict[unicode, Any] for key in ("inputs", "outputs"): for i in self.tool[key]: @@ -267,6 +272,7 @@ def __init__(self, toolpath_object, **kwargs): def _init_job(self, joborder, input_basedir, **kwargs): + # type: (Dict[str, str], str, **Any) -> Builder builder = Builder() builder.job = copy.deepcopy(joborder) @@ -300,7 +306,7 @@ def _init_job(self, joborder, input_basedir, **kwargs): for i in self.tool["inputs"]: d = shortname(i["id"]) if d in builder.job and i.get("format"): - checkFormat(builder.job[d], builder.do_eval(i["format"]), self.requirements, self.formatgraph) + checkFormat(builder.job[d], builder.do_eval(i["format"]), self.formatgraph) builder.bindings.extend(builder.bind_input(self.inputs_record_schema, builder.job)) @@ -309,6 +315,7 @@ def _init_job(self, joborder, input_basedir, **kwargs): return builder def evalResources(self, builder, kwargs): + # type: (Builder, Dict[str, Any]) -> Dict[str, Union[int, str]] resourceReq, _ = self.get_requirement("ResourceRequirement") if resourceReq is None: resourceReq = {} @@ -349,16 +356,18 @@ def evalResources(self, builder, kwargs): } def validate_hints(self, hints, strict): + # type: (List[Dict[str, Any]], bool) -> None for r in hints: try: if self.names.get_name(r["class"], "") is not None: validate.validate_ex(self.names.get_name(r["class"], ""), r, strict=strict) else: - _logger.info(validate.ValidationException(u"Unknown hint %s" % (r["class"]))) + _logger.info(str(validate.ValidationException( + u"Unknown hint %s" % (r["class"])))) except validate.ValidationException as v: raise validate.ValidationException(u"Validating hint `%s`: %s" % (r["class"], str(v))) - def get_requirement(self, feature): + def get_requirement(self, feature): # type: (Any) -> Tuple[Any, bool] return get_feature(self, feature) def visit(self, op): @@ -366,9 +375,10 @@ def visit(self, op): @abc.abstractmethod def job(self, job_order, input_basedir, output_callbacks, **kwargs): - return + # type: (Dict[str, str], str, Callable[[Any, Any], Any], **Any) -> Generator[Any, None, None] + return None -def empty_subtree(dirpath): +def empty_subtree(dirpath): # type: (AnyStr) -> bool # Test if a directory tree contains any files (does not count empty # subdirectories) for d in os.listdir(dirpath): @@ -386,10 +396,10 @@ def empty_subtree(dirpath): raise return True -_names = set() # type: Set[str] +_names = set() # type: Set[unicode] -def uniquename(stem): +def uniquename(stem): # type: (unicode) -> unicode c = 1 u = stem while u in _names: @@ -399,6 +409,7 @@ def uniquename(stem): return u def scandeps(base, doc, reffields, urlfields, loadref): + # type: (str, Any, Set[str], Set[str], Callable[[str, str], Any]) -> List[Dict[str, str]] r = [] if isinstance(doc, dict): if "id" in doc: @@ -422,7 +433,7 @@ def scandeps(base, doc, reffields, urlfields, loadref): deps = { "class": "File", "path": subid - } + } # type: Dict[str, Any] sf = scandeps(subid, sub, reffields, urlfields, loadref) if sf: deps["secondaryFiles"] = sf diff --git a/cwltool/sandboxjs.py b/cwltool/sandboxjs.py index 0a7069f38..4144cb209 100644 --- a/cwltool/sandboxjs.py +++ b/cwltool/sandboxjs.py @@ -15,10 +15,10 @@ class JavascriptException(Exception): def execjs(js, jslib, timeout=None): # type: (Union[Mapping,str], Any, int) -> JSON nodejs = None - trynodes = (["nodejs"], ["node"]) + trynodes = ("nodejs", "node") for n in trynodes: try: - nodejs = subprocess.Popen(n, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + nodejs = subprocess.Popen([n], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) break except OSError as e: if e.errno == errno.ENOENT: @@ -47,7 +47,10 @@ def execjs(js, jslib, timeout=None): # type: (Union[Mapping,str], Any, int) -> pass if nodejs is None: - raise JavascriptException(u"cwltool requires Node.js engine to evaluate Javascript expressions, but couldn't find it. Tried %s, docker run node:slim" % ", ".join(trynodes)) + raise JavascriptException( + u"cwltool requires Node.js engine to evaluate Javascript " + "expressions, but couldn't find it. Tried %s, docker run " + "node:slim" % u", ".join(trynodes)) fn = u"\"use strict\";\n%s\n(function()%s)()" % (jslib, js if isinstance(js, basestring) and len(js) > 1 and js[0] == '{' else ("{return (%s);}" % js)) script = u"console.log(JSON.stringify(require(\"vm\").runInNewContext(%s, {})));\n" % json.dumps(fn) @@ -69,7 +72,7 @@ def term(): stdoutdata, stderrdata = nodejs.communicate(script) tm.cancel() - def fn_linenum(): + def fn_linenum(): # type: () -> unicode return u"\n".join(u"%04i %s" % (i+1, b) for i, b in enumerate(fn.split("\n"))) if killed: @@ -159,7 +162,7 @@ def scanner(scan): # type: (str) -> List[int] return None -def interpolate(scan, jslib, timeout=None): # type: (str, str, int) -> JSON +def interpolate(scan, jslib, timeout=None): # type: (str, Union[str, unicode], int) -> JSON scan = scan.strip() parts = [] w = scanner(scan) diff --git a/cwltool/update.py b/cwltool/update.py index 700dbe4d2..d63fd716c 100644 --- a/cwltool/update.py +++ b/cwltool/update.py @@ -3,7 +3,7 @@ import json import re from .utils import aslist -from typing import Any, Dict, Callable, Tuple +from typing import Any, Dict, Callable, List, Tuple, Union import traceback from schema_salad.ref_resolver import Loader @@ -27,7 +27,7 @@ def fixType(doc): # type: (Any) -> Any if isinstance(doc, list): return [fixType(f) for f in doc] - if isinstance(doc, (basestring)): + if isinstance(doc, (str, unicode)): if doc not in ("null", "boolean", "int", "long", "float", "double", "string", "File", "record", "enum", "array", "Any") and "#" not in doc: return "#" + doc return doc @@ -37,9 +37,14 @@ def _draft2toDraft3dev1(doc, loader, baseuri): # type: (Any, Loader, str) -> An if isinstance(doc, dict): if "import" in doc: imp = urlparse.urljoin(baseuri, doc["import"]) - r = loader.fetch(imp) - if isinstance(r, list): + impLoaded = loader.fetch(imp) + r = None # type: Dict[str, Any] + if isinstance(impLoaded, list): r = {"@graph": r} + elif isinstance(impLoaded, dict): + r = impLoaded + else: + raise Exception("Unexpected code path.") r["id"] = imp _, frag = urlparse.urldefrag(imp) if frag: @@ -131,8 +136,13 @@ def _draftDraft3dev1toDev2(doc, loader, baseuri): # Convert expressions if isinstance(doc, dict): if "@import" in doc: - r, _ = loader.resolve_ref(doc["@import"], base_url=baseuri) - return _draftDraft3dev1toDev2(r, loader, r["id"]) + resolved_doc = loader.resolve_ref( + doc["@import"], base_url=baseuri)[0] + if isinstance(resolved_doc, dict): + return _draftDraft3dev1toDev2( + resolved_doc, loader, resolved_doc["id"]) + else: + raise Exception("Unexpected codepath") for a in doc: doc[a] = _draftDraft3dev1toDev2(doc[a], loader, baseuri) @@ -174,11 +184,15 @@ def _draftDraft3dev2toDev3(doc, loader, baseuri): return doc["@import"] else: imp = urlparse.urljoin(baseuri, doc["@import"]) - r = loader.fetch(imp) - if isinstance(r, list): - r = {"@graph": r} + impLoaded = loader.fetch(imp) + if isinstance(impLoaded, list): + r = {"@graph": impLoaded} + elif isinstance(impLoaded, dict): + r = impLoaded + else: + raise Exception("Unexpected code path.") r["id"] = imp - _, frag = urlparse.urldefrag(imp) + frag = urlparse.urldefrag(imp)[1] if frag: frag = "#" + frag r = findId(r, frag) @@ -215,9 +229,13 @@ def traverseImport(doc, loader, baseuri, func): return doc["$import"] else: imp = urlparse.urljoin(baseuri, doc["$import"]) - r = loader.fetch(imp) - if isinstance(r, list): - r = {"$graph": r} + impLoaded = loader.fetch(imp) + if isinstance(impLoaded, list): + r = {"$graph": impLoaded} + elif isinstance(impLoaded, dict): + r = impLoaded + else: + raise Exception("Unexpected code path.") r["id"] = imp _, frag = urlparse.urldefrag(imp) if frag: @@ -307,10 +325,10 @@ def update(doc, loader, baseuri): "https://w3id.org/cwl/cwl#draft-3.dev4": draftDraft3dev4toDev5, "https://w3id.org/cwl/cwl#draft-3.dev5": draftDraft3dev5toFinal, "https://w3id.org/cwl/cwl#draft-3": None - } + } # type: Dict[unicode, Any] def identity(doc, loader, baseuri): - # type: (Any, Loader, str) -> Tuple[Any, str] + # type: (Any, Loader, str) -> Tuple[Any, Union[str, unicode]] v = doc.get("cwlVersion") if v: return (doc, loader.expand_url(v, "")) diff --git a/cwltool/workflow.py b/cwltool/workflow.py index c67919c8d..6b369c823 100644 --- a/cwltool/workflow.py +++ b/cwltool/workflow.py @@ -222,7 +222,7 @@ def try_make_job(self, step, basedir, **kwargs): raise WorkflowException("Workflow step contains valueFrom but StepInputExpressionRequirement not in requirements") vfinputs = {shortname(k): v for k,v in inputobj.iteritems()} - def valueFromFunc(k, v): + def valueFromFunc(k, v): # type: (Any, Any) -> Any if k in valueFrom: return expression.do_eval(valueFrom[k], vfinputs, self.workflow.requirements, None, None, {}, context=v) @@ -415,11 +415,10 @@ def __init__(self, toolpath_object, pos, **kwargs): try: makeTool = kwargs.get("makeTool") runobj = None - if isinstance(toolpath_object["run"], basestring): - runobj, _ = schema_salad.schema.load_and_validate(kwargs["loader"], - kwargs["avsc_names"], - toolpath_object["run"], - True) + if isinstance(toolpath_object["run"], (str, unicode)): + runobj = schema_salad.schema.load_and_validate( + kwargs["loader"], kwargs["avsc_names"], + toolpath_object["run"], True)[0] else: runobj = toolpath_object["run"] self.embedded_tool = makeTool(runobj, **kwargs) diff --git a/tests/test_pathmapper.py b/tests/test_pathmapper.py index 9da9e823a..7e4af9e5b 100644 --- a/tests/test_pathmapper.py +++ b/tests/test_pathmapper.py @@ -13,7 +13,7 @@ def test_subclass(self): class SubPathMapper(PathMapper): def __init__(self, referenced_files, basedir, new): - super(PathMapper, self).__init__(referenced_files, basedir) + super(SubPathMapper, self).__init__(referenced_files, basedir) self.new = new a = SubPathMapper([], '', "new")