diff --git a/cwltool/load_tool.py b/cwltool/load_tool.py index 59fa62eaa..59cb324f6 100644 --- a/cwltool/load_tool.py +++ b/cwltool/load_tool.py @@ -1,6 +1,6 @@ +"""Loads a CWL document.""" from __future__ import absolute_import # pylint: disable=unused-import -"""Loads a CWL document.""" import logging import os @@ -9,18 +9,19 @@ import hashlib import json import copy -from typing import Any, Callable, Dict, List, Text, Tuple, Union, cast, Iterable +from typing import (Any, Callable, Dict, Iterable, List, Mapping, Optional, + Text, Tuple, Union, cast) import requests.sessions from six import itervalues, string_types +from six.moves import urllib import schema_salad.schema as schema from avro.schema import Names from ruamel.yaml.comments import CommentedMap, CommentedSeq -from schema_salad.ref_resolver import Fetcher, Loader, file_uri +from schema_salad.ref_resolver import ContextType, Fetcher, Loader, file_uri from schema_salad.sourceline import cmap from schema_salad.validate import ValidationException -from six.moves import urllib from . import process, update from .errors import WorkflowException @@ -28,7 +29,6 @@ from .update import ALLUPDATES _logger = logging.getLogger("cwltool") - jobloaderctx = { u"cwl": "https://w3id.org/cwl/cwl#", u"cwltool": "http://commonwl.org/cwltool#", @@ -36,7 +36,7 @@ u"location": {u"@type": u"@id"}, u"format": {u"@type": u"@id"}, u"id": u"@id" -} +} # type: ContextType overrides_ctx = { @@ -51,26 +51,39 @@ "@id": "cwltool:override", "mapSubject": "class" } -} # type: Dict[Text, Union[Dict[Any, Any], Text, Iterable[Text]]] +} # type: ContextType + + +FetcherConstructorType = Callable[[Dict[Text, Union[Text, bool]], + requests.sessions.Session], Fetcher] + +loaders = {} # type: Dict[FetcherConstructorType, Loader] + +def default_loader(fetcher_constructor): + # type: (Optional[FetcherConstructorType]) -> Loader + if fetcher_constructor in loaders: + return loaders[fetcher_constructor] + else: + loader = Loader(jobloaderctx, fetcher_constructor=fetcher_constructor) + loaders[fetcher_constructor] = loader + return loader def resolve_tool_uri(argsworkflow, # type: Text resolver=None, # type: Callable[[Loader, Union[Text, Dict[Text, Any]]], Text] - fetcher_constructor=None, - # type: Callable[[Dict[Text, Text], requests.sessions.Session], Fetcher] + fetcher_constructor=None, # type: FetcherConstructorType document_loader=None # type: Loader -): - # type: (...) -> Tuple[Text, Text] + ): # type: (...) -> Tuple[Text, Text] uri = None # type: Text split = urllib.parse.urlsplit(argsworkflow) # In case of Windows path, urlsplit misjudge Drive letters as scheme, here we are skipping that - if split.scheme and split.scheme in [u'http',u'https',u'file']: + if split.scheme and split.scheme in [u'http', u'https', u'file']: uri = argsworkflow elif os.path.exists(os.path.abspath(argsworkflow)): uri = file_uri(str(os.path.abspath(argsworkflow))) elif resolver: if document_loader is None: - document_loader = Loader(jobloaderctx, fetcher_constructor=fetcher_constructor) # type: ignore + document_loader = default_loader(fetcher_constructor) # type: ignore uri = resolver(document_loader, argsworkflow) if uri is None: @@ -85,18 +98,17 @@ def resolve_tool_uri(argsworkflow, # type: Text def fetch_document(argsworkflow, # type: Union[Text, Dict[Text, Any]] resolver=None, # type: Callable[[Loader, Union[Text, Dict[Text, Any]]], Text] - fetcher_constructor=None - # type: Callable[[Dict[Text, Text], requests.sessions.Session], Fetcher] - ): - # type: (...) -> Tuple[Loader, CommentedMap, Text] + fetcher_constructor=None # type: FetcherConstructorType + ): # type: (...) -> Tuple[Loader, CommentedMap, Text] """Retrieve a CWL document.""" - document_loader = Loader(jobloaderctx, fetcher_constructor=fetcher_constructor) # type: ignore + document_loader = default_loader(fetcher_constructor) # type: ignore uri = None # type: Text workflowobj = None # type: CommentedMap if isinstance(argsworkflow, string_types): - uri, fileuri = resolve_tool_uri(argsworkflow, resolver=resolver, document_loader=document_loader) + uri, fileuri = resolve_tool_uri(argsworkflow, resolver=resolver, + document_loader=document_loader) workflowobj = document_loader.fetch(fileuri) elif isinstance(argsworkflow, dict): uri = "#" + Text(id(argsworkflow)) @@ -126,7 +138,7 @@ def _convert_stdstreams_to_files(workflowobj): sort_keys=True).encode('utf-8')).hexdigest()) workflowobj[streamtype] = filename out['type'] = 'File' - out['outputBinding'] = {'glob': filename} + out['outputBinding'] = cmap({'glob': filename}) for inp in workflowobj.get('inputs', []): if inp.get('type') == 'stdin': if 'inputBinding' in inp: @@ -170,25 +182,25 @@ def validate_document(document_loader, # type: Loader enable_dev=False, # type: bool strict=True, # type: bool preprocess_only=False, # type: bool - fetcher_constructor=None, - skip_schemas=None, - # type: Callable[[Dict[Text, Text], requests.sessions.Session], Fetcher] - overrides=None # type: List[Dict] + fetcher_constructor=None, # type: FetcherConstructorType + skip_schemas=None, # type: bool + overrides=None, # type: List[Dict] + metadata=None, # type: Optional[Dict] ): # type: (...) -> Tuple[Loader, Names, Union[Dict[Text, Any], List[Dict[Text, Any]]], Dict[Text, Any], Text] """Validate a CWL document.""" if isinstance(workflowobj, list): - workflowobj = { + workflowobj = cmap({ "$graph": workflowobj - } + }, fn=uri) if not isinstance(workflowobj, dict): raise ValueError("workflowjobj must be a dict, got '%s': %s" % (type(workflowobj), workflowobj)) jobobj = None if "cwl:tool" in workflowobj: - job_loader = Loader(jobloaderctx, fetcher_constructor=fetcher_constructor) # type: ignore + job_loader = default_loader(fetcher_constructor) # type: ignore jobobj, _ = job_loader.resolve_all(workflowobj, uri) uri = urllib.parse.urljoin(uri, workflowobj["https://w3id.org/cwl/cwl#tool"]) del cast(dict, jobobj)["https://w3id.org/cwl/cwl#tool"] @@ -200,22 +212,25 @@ def validate_document(document_loader, # type: Loader workflowobj = fetch_document(uri, fetcher_constructor=fetcher_constructor)[1] fileuri = urllib.parse.urldefrag(uri)[0] - - if "cwlVersion" in workflowobj: - if not isinstance(workflowobj["cwlVersion"], (str, Text)): - raise Exception("'cwlVersion' must be a string, got %s" % type(workflowobj["cwlVersion"])) - # strip out version - workflowobj["cwlVersion"] = re.sub( - r"^(?:cwl:|https://w3id.org/cwl/cwl#)", "", - workflowobj["cwlVersion"]) - if workflowobj["cwlVersion"] not in list(ALLUPDATES): - # print out all the Supported Versions of cwlVersion - versions = list(ALLUPDATES) # ALLUPDATES is a dict - versions.sort() - raise ValidationException("'cwlVersion' not valid. Supported CWL versions are: \n{}".format("\n".join(versions))) - else: - raise ValidationException("No cwlVersion found." - "Use the following syntax in your CWL workflow to declare version: cwlVersion: ") + if "cwlVersion" not in workflowobj: + if metadata and 'cwlVersion' in metadata: + workflowobj['cwlVersion'] = metadata['cwlVersion'] + else: + raise ValidationException("No cwlVersion found." + "Use the following syntax in your CWL document to declare " + "the version: cwlVersion: ") + + if not isinstance(workflowobj["cwlVersion"], (str, Text)): + raise Exception("'cwlVersion' must be a string, got %s" % type(workflowobj["cwlVersion"])) + # strip out version + workflowobj["cwlVersion"] = re.sub( + r"^(?:cwl:|https://w3id.org/cwl/cwl#)", "", + workflowobj["cwlVersion"]) + if workflowobj["cwlVersion"] not in list(ALLUPDATES): + # print out all the Supported Versions of cwlVersion + versions = list(ALLUPDATES) # ALLUPDATES is a dict + versions.sort() + raise ValidationException("'cwlVersion' not valid. Supported CWL versions are: \n{}".format("\n".join(versions))) if workflowobj["cwlVersion"] == "draft-2": workflowobj = cast(CommentedMap, cmap(update._draft2toDraft3dev1( @@ -238,36 +253,36 @@ def validate_document(document_loader, # type: Loader _add_blank_ids(workflowobj) workflowobj["id"] = fileuri - processobj, metadata = document_loader.resolve_all(workflowobj, fileuri) + processobj, new_metadata = document_loader.resolve_all(workflowobj, fileuri) if not isinstance(processobj, (CommentedMap, CommentedSeq)): raise ValidationException("Workflow must be a dict or list.") - if not metadata: + if not new_metadata: if not isinstance(processobj, dict): raise ValidationException("Draft-2 workflows must be a dict.") - metadata = cast(CommentedMap, cmap({"$namespaces": processobj.get("$namespaces", {}), - "$schemas": processobj.get("$schemas", []), - "cwlVersion": processobj["cwlVersion"]}, - fn=fileuri)) + new_metadata = cast(CommentedMap, cmap( + {"$namespaces": processobj.get("$namespaces", {}), + "$schemas": processobj.get("$schemas", []), + "cwlVersion": processobj["cwlVersion"]}, fn=fileuri)) _convert_stdstreams_to_files(workflowobj) if preprocess_only: - return document_loader, avsc_names, processobj, metadata, uri + return document_loader, avsc_names, processobj, new_metadata, uri schema.validate_doc(avsc_names, processobj, document_loader, strict) - if metadata.get("cwlVersion") != update.LATEST: + if new_metadata.get("cwlVersion") != update.LATEST: processobj = cast(CommentedMap, cmap(update.update( - processobj, document_loader, fileuri, enable_dev, metadata))) + processobj, document_loader, fileuri, enable_dev, new_metadata))) if jobobj: - metadata[u"cwl:defaults"] = jobobj + new_metadata[u"cwl:defaults"] = jobobj if overrides: - metadata[u"cwltool:overrides"] = overrides + new_metadata[u"cwltool:overrides"] = overrides - return document_loader, avsc_names, processobj, metadata, uri + return document_loader, avsc_names, processobj, new_metadata, uri def make_tool(document_loader, # type: Loader @@ -322,7 +337,7 @@ def load_tool(argsworkflow, # type: Union[Text, Dict[Text, Any]] enable_dev=False, # type: bool strict=True, # type: bool resolver=None, # type: Callable[[Loader, Union[Text, Dict[Text, Any]]], Text] - fetcher_constructor=None, # type: Callable[[Dict[Text, Text], requests.sessions.Session], Fetcher] + fetcher_constructor=None, # type: FetcherConstructorType overrides=None ): # type: (...) -> Process @@ -332,7 +347,8 @@ def load_tool(argsworkflow, # type: Union[Text, Dict[Text, Any]] document_loader, avsc_names, processobj, metadata, uri = validate_document( document_loader, workflowobj, uri, enable_dev=enable_dev, strict=strict, fetcher_constructor=fetcher_constructor, - overrides=overrides) + overrides=overrides, metadata=kwargs.get('metadata', None) + if kwargs else None) return make_tool(document_loader, avsc_names, metadata, uri, makeTool, kwargs if kwargs else {}) diff --git a/cwltool/main.py b/cwltool/main.py index 8f58cb117..2be90f598 100755 --- a/cwltool/main.py +++ b/cwltool/main.py @@ -27,8 +27,9 @@ from .builder import Builder from .cwlrdf import printdot, printrdf from .errors import UnsupportedRequirement, WorkflowException -from .load_tool import (resolve_tool_uri, fetch_document, make_tool, validate_document, - jobloaderctx, resolve_overrides, load_overrides) +from .load_tool import (FetcherConstructorType, resolve_tool_uri, + fetch_document, make_tool, validate_document, jobloaderctx, + resolve_overrides, load_overrides) from .mutation import MutationManager from .pack import pack from .pathmapper import (adjustDirObjs, adjustFileObjs, get_listing, @@ -544,8 +545,8 @@ def load_job_order(args, # type: argparse.Namespace job_order_object, _ = loader.resolve_ref(job_order_file, checklinks=False) if job_order_object and "http://commonwl.org/cwltool#overrides" in job_order_object: - overrides.extend(resolve_overrides(job_order_object, file_uri(job_order_file), tool_file_uri)) - del job_order_object["http://commonwl.org/cwltool#overrides"] + overrides.extend(resolve_overrides(job_order_object, file_uri(job_order_file), tool_file_uri)) + del job_order_object["http://commonwl.org/cwltool#overrides"] if not job_order_object: input_basedir = args.basedir if args.basedir else os.getcwd() @@ -641,6 +642,7 @@ def addSizes(p): ns = {} # type: Dict[Text, Union[Dict[Any, Any], Text, Iterable[Text]]] ns.update(t.metadata.get("$namespaces", {})) ld = Loader(ns) + def expand_formats(p): if "format" in p: p["format"] = ld.expand_url(p["format"], "") @@ -734,7 +736,7 @@ def main(argsl=None, # type: List[str] versionfunc=versionstring, # type: Callable[[], Text] job_order_object=None, # type: MutableMapping[Text, Any] make_fs_access=StdFsAccess, # type: Callable[[Text], StdFsAccess] - fetcher_constructor=None, # type: Callable[[Dict[Text, Text], requests.sessions.Session], Fetcher] + fetcher_constructor=None, # type: FetcherConstructorType resolver=tool_resolver, logger_handler=None, custom_schema_callback=None # type: Callable[[], None] diff --git a/cwltool/workflow.py b/cwltool/workflow.py index ea9def27c..ffe000dce 100644 --- a/cwltool/workflow.py +++ b/cwltool/workflow.py @@ -70,11 +70,15 @@ def match_types(sinktype, src, iid, inputobj, linkMerge, valueFrom): elif isinstance(src.parameter["type"], list): # Source is union type # Check that at least one source type is compatible with the sink. - for st in src.parameter["type"]: - srccopy = copy.deepcopy(src) - srccopy.parameter["type"] = st - if match_types(sinktype, srccopy, iid, inputobj, linkMerge, valueFrom): + original_types = src.parameter["type"] + for source_type in original_types: + src.parameter["type"] = source_type + match = match_types( + sinktype, src, iid, inputobj, linkMerge, valueFrom) + if match: + src.parameter["type"] = original_types return True + src.parameter["type"] = original_types return False elif linkMerge: if iid not in inputobj: diff --git a/tests/test_pack.py b/tests/test_pack.py index 728a1e0bc..e846cdd22 100644 --- a/tests/test_pack.py +++ b/tests/test_pack.py @@ -12,6 +12,8 @@ import cwltool.pack import cwltool.workflow +from cwltool.resolver import tool_resolver +from cwltool import load_tool from cwltool.load_tool import fetch_document, validate_document from cwltool.main import makeRelative, main, print_pack from cwltool.pathmapper import adjustDirObjs, adjustFileObjs @@ -23,6 +25,7 @@ class TestPack(unittest.TestCase): maxDiff = None def test_pack(self): + load_tool.loaders = {} document_loader, workflowobj, uri = fetch_document( get_data("tests/wf/revsort.cwl")) @@ -97,10 +100,11 @@ def _pack_idempotently(self, document): reason="Instance of cwltool is used, on Windows it invokes a default docker container" "which is not supported on AppVeyor") def test_packed_workflow_execution(self): + load_tool.loaders = {} test_wf = "tests/wf/count-lines1-wf.cwl" test_wf_job = "tests/wf/wc-job.json" document_loader, workflowobj, uri = fetch_document( - get_data(test_wf)) + get_data(test_wf), resolver=tool_resolver) document_loader, avsc_names, processobj, metadata, uri = validate_document( document_loader, workflowobj, uri) packed = json.loads(print_pack(document_loader, processobj, uri, metadata))