Skip to content

Commit f691bf1

Browse files
authored
Merge pull request #589 from common-workflow-language/faster_parse
reduce loading time for packed documents
2 parents 0a5f13d + 9956e0e commit f691bf1

File tree

4 files changed

+93
-67
lines changed

4 files changed

+93
-67
lines changed

cwltool/load_tool.py

+73-57
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1+
"""Loads a CWL document."""
12
from __future__ import absolute_import
23
# pylint: disable=unused-import
3-
"""Loads a CWL document."""
44

55
import logging
66
import os
@@ -9,34 +9,34 @@
99
import hashlib
1010
import json
1111
import copy
12-
from typing import Any, Callable, Dict, List, Text, Tuple, Union, cast, Iterable
12+
from typing import (Any, Callable, Dict, Iterable, List, Mapping, Optional,
13+
Text, Tuple, Union, cast)
1314

1415
import requests.sessions
1516
from six import itervalues, string_types
17+
from six.moves import urllib
1618

1719
import schema_salad.schema as schema
1820
from avro.schema import Names
1921
from ruamel.yaml.comments import CommentedMap, CommentedSeq
20-
from schema_salad.ref_resolver import Fetcher, Loader, file_uri
22+
from schema_salad.ref_resolver import ContextType, Fetcher, Loader, file_uri
2123
from schema_salad.sourceline import cmap
2224
from schema_salad.validate import ValidationException
23-
from six.moves import urllib
2425

2526
from . import process, update
2627
from .errors import WorkflowException
2728
from .process import Process, shortname, get_schema
2829
from .update import ALLUPDATES
2930

3031
_logger = logging.getLogger("cwltool")
31-
3232
jobloaderctx = {
3333
u"cwl": "https://w3id.org/cwl/cwl#",
3434
u"cwltool": "http://commonwl.org/cwltool#",
3535
u"path": {u"@type": u"@id"},
3636
u"location": {u"@type": u"@id"},
3737
u"format": {u"@type": u"@id"},
3838
u"id": u"@id"
39-
}
39+
} # type: ContextType
4040

4141

4242
overrides_ctx = {
@@ -51,26 +51,39 @@
5151
"@id": "cwltool:override",
5252
"mapSubject": "class"
5353
}
54-
} # type: Dict[Text, Union[Dict[Any, Any], Text, Iterable[Text]]]
54+
} # type: ContextType
55+
56+
57+
FetcherConstructorType = Callable[[Dict[Text, Union[Text, bool]],
58+
requests.sessions.Session], Fetcher]
59+
60+
loaders = {} # type: Dict[FetcherConstructorType, Loader]
61+
62+
def default_loader(fetcher_constructor):
63+
# type: (Optional[FetcherConstructorType]) -> Loader
64+
if fetcher_constructor in loaders:
65+
return loaders[fetcher_constructor]
66+
else:
67+
loader = Loader(jobloaderctx, fetcher_constructor=fetcher_constructor)
68+
loaders[fetcher_constructor] = loader
69+
return loader
5570

5671
def resolve_tool_uri(argsworkflow, # type: Text
5772
resolver=None, # type: Callable[[Loader, Union[Text, Dict[Text, Any]]], Text]
58-
fetcher_constructor=None,
59-
# type: Callable[[Dict[Text, Text], requests.sessions.Session], Fetcher]
73+
fetcher_constructor=None, # type: FetcherConstructorType
6074
document_loader=None # type: Loader
61-
):
62-
# type: (...) -> Tuple[Text, Text]
75+
): # type: (...) -> Tuple[Text, Text]
6376

6477
uri = None # type: Text
6578
split = urllib.parse.urlsplit(argsworkflow)
6679
# In case of Windows path, urlsplit misjudge Drive letters as scheme, here we are skipping that
67-
if split.scheme and split.scheme in [u'http',u'https',u'file']:
80+
if split.scheme and split.scheme in [u'http', u'https', u'file']:
6881
uri = argsworkflow
6982
elif os.path.exists(os.path.abspath(argsworkflow)):
7083
uri = file_uri(str(os.path.abspath(argsworkflow)))
7184
elif resolver:
7285
if document_loader is None:
73-
document_loader = Loader(jobloaderctx, fetcher_constructor=fetcher_constructor) # type: ignore
86+
document_loader = default_loader(fetcher_constructor) # type: ignore
7487
uri = resolver(document_loader, argsworkflow)
7588

7689
if uri is None:
@@ -85,18 +98,17 @@ def resolve_tool_uri(argsworkflow, # type: Text
8598

8699
def fetch_document(argsworkflow, # type: Union[Text, Dict[Text, Any]]
87100
resolver=None, # type: Callable[[Loader, Union[Text, Dict[Text, Any]]], Text]
88-
fetcher_constructor=None
89-
# type: Callable[[Dict[Text, Text], requests.sessions.Session], Fetcher]
90-
):
91-
# type: (...) -> Tuple[Loader, CommentedMap, Text]
101+
fetcher_constructor=None # type: FetcherConstructorType
102+
): # type: (...) -> Tuple[Loader, CommentedMap, Text]
92103
"""Retrieve a CWL document."""
93104

94-
document_loader = Loader(jobloaderctx, fetcher_constructor=fetcher_constructor) # type: ignore
105+
document_loader = default_loader(fetcher_constructor) # type: ignore
95106

96107
uri = None # type: Text
97108
workflowobj = None # type: CommentedMap
98109
if isinstance(argsworkflow, string_types):
99-
uri, fileuri = resolve_tool_uri(argsworkflow, resolver=resolver, document_loader=document_loader)
110+
uri, fileuri = resolve_tool_uri(argsworkflow, resolver=resolver,
111+
document_loader=document_loader)
100112
workflowobj = document_loader.fetch(fileuri)
101113
elif isinstance(argsworkflow, dict):
102114
uri = "#" + Text(id(argsworkflow))
@@ -126,7 +138,7 @@ def _convert_stdstreams_to_files(workflowobj):
126138
sort_keys=True).encode('utf-8')).hexdigest())
127139
workflowobj[streamtype] = filename
128140
out['type'] = 'File'
129-
out['outputBinding'] = {'glob': filename}
141+
out['outputBinding'] = cmap({'glob': filename})
130142
for inp in workflowobj.get('inputs', []):
131143
if inp.get('type') == 'stdin':
132144
if 'inputBinding' in inp:
@@ -170,25 +182,25 @@ def validate_document(document_loader, # type: Loader
170182
enable_dev=False, # type: bool
171183
strict=True, # type: bool
172184
preprocess_only=False, # type: bool
173-
fetcher_constructor=None,
174-
skip_schemas=None,
175-
# type: Callable[[Dict[Text, Text], requests.sessions.Session], Fetcher]
176-
overrides=None # type: List[Dict]
185+
fetcher_constructor=None, # type: FetcherConstructorType
186+
skip_schemas=None, # type: bool
187+
overrides=None, # type: List[Dict]
188+
metadata=None, # type: Optional[Dict]
177189
):
178190
# type: (...) -> Tuple[Loader, Names, Union[Dict[Text, Any], List[Dict[Text, Any]]], Dict[Text, Any], Text]
179191
"""Validate a CWL document."""
180192

181193
if isinstance(workflowobj, list):
182-
workflowobj = {
194+
workflowobj = cmap({
183195
"$graph": workflowobj
184-
}
196+
}, fn=uri)
185197

186198
if not isinstance(workflowobj, dict):
187199
raise ValueError("workflowjobj must be a dict, got '%s': %s" % (type(workflowobj), workflowobj))
188200

189201
jobobj = None
190202
if "cwl:tool" in workflowobj:
191-
job_loader = Loader(jobloaderctx, fetcher_constructor=fetcher_constructor) # type: ignore
203+
job_loader = default_loader(fetcher_constructor) # type: ignore
192204
jobobj, _ = job_loader.resolve_all(workflowobj, uri)
193205
uri = urllib.parse.urljoin(uri, workflowobj["https://w3id.org/cwl/cwl#tool"])
194206
del cast(dict, jobobj)["https://w3id.org/cwl/cwl#tool"]
@@ -200,22 +212,25 @@ def validate_document(document_loader, # type: Loader
200212
workflowobj = fetch_document(uri, fetcher_constructor=fetcher_constructor)[1]
201213

202214
fileuri = urllib.parse.urldefrag(uri)[0]
203-
204-
if "cwlVersion" in workflowobj:
205-
if not isinstance(workflowobj["cwlVersion"], (str, Text)):
206-
raise Exception("'cwlVersion' must be a string, got %s" % type(workflowobj["cwlVersion"]))
207-
# strip out version
208-
workflowobj["cwlVersion"] = re.sub(
209-
r"^(?:cwl:|https://w3id.org/cwl/cwl#)", "",
210-
workflowobj["cwlVersion"])
211-
if workflowobj["cwlVersion"] not in list(ALLUPDATES):
212-
# print out all the Supported Versions of cwlVersion
213-
versions = list(ALLUPDATES) # ALLUPDATES is a dict
214-
versions.sort()
215-
raise ValidationException("'cwlVersion' not valid. Supported CWL versions are: \n{}".format("\n".join(versions)))
216-
else:
217-
raise ValidationException("No cwlVersion found."
218-
"Use the following syntax in your CWL workflow to declare version: cwlVersion: <version>")
215+
if "cwlVersion" not in workflowobj:
216+
if metadata and 'cwlVersion' in metadata:
217+
workflowobj['cwlVersion'] = metadata['cwlVersion']
218+
else:
219+
raise ValidationException("No cwlVersion found."
220+
"Use the following syntax in your CWL document to declare "
221+
"the version: cwlVersion: <version>")
222+
223+
if not isinstance(workflowobj["cwlVersion"], (str, Text)):
224+
raise Exception("'cwlVersion' must be a string, got %s" % type(workflowobj["cwlVersion"]))
225+
# strip out version
226+
workflowobj["cwlVersion"] = re.sub(
227+
r"^(?:cwl:|https://w3id.org/cwl/cwl#)", "",
228+
workflowobj["cwlVersion"])
229+
if workflowobj["cwlVersion"] not in list(ALLUPDATES):
230+
# print out all the Supported Versions of cwlVersion
231+
versions = list(ALLUPDATES) # ALLUPDATES is a dict
232+
versions.sort()
233+
raise ValidationException("'cwlVersion' not valid. Supported CWL versions are: \n{}".format("\n".join(versions)))
219234

220235
if workflowobj["cwlVersion"] == "draft-2":
221236
workflowobj = cast(CommentedMap, cmap(update._draft2toDraft3dev1(
@@ -238,36 +253,36 @@ def validate_document(document_loader, # type: Loader
238253
_add_blank_ids(workflowobj)
239254

240255
workflowobj["id"] = fileuri
241-
processobj, metadata = document_loader.resolve_all(workflowobj, fileuri)
256+
processobj, new_metadata = document_loader.resolve_all(workflowobj, fileuri)
242257
if not isinstance(processobj, (CommentedMap, CommentedSeq)):
243258
raise ValidationException("Workflow must be a dict or list.")
244259

245-
if not metadata:
260+
if not new_metadata:
246261
if not isinstance(processobj, dict):
247262
raise ValidationException("Draft-2 workflows must be a dict.")
248-
metadata = cast(CommentedMap, cmap({"$namespaces": processobj.get("$namespaces", {}),
249-
"$schemas": processobj.get("$schemas", []),
250-
"cwlVersion": processobj["cwlVersion"]},
251-
fn=fileuri))
263+
new_metadata = cast(CommentedMap, cmap(
264+
{"$namespaces": processobj.get("$namespaces", {}),
265+
"$schemas": processobj.get("$schemas", []),
266+
"cwlVersion": processobj["cwlVersion"]}, fn=fileuri))
252267

253268
_convert_stdstreams_to_files(workflowobj)
254269

255270
if preprocess_only:
256-
return document_loader, avsc_names, processobj, metadata, uri
271+
return document_loader, avsc_names, processobj, new_metadata, uri
257272

258273
schema.validate_doc(avsc_names, processobj, document_loader, strict)
259274

260-
if metadata.get("cwlVersion") != update.LATEST:
275+
if new_metadata.get("cwlVersion") != update.LATEST:
261276
processobj = cast(CommentedMap, cmap(update.update(
262-
processobj, document_loader, fileuri, enable_dev, metadata)))
277+
processobj, document_loader, fileuri, enable_dev, new_metadata)))
263278

264279
if jobobj:
265-
metadata[u"cwl:defaults"] = jobobj
280+
new_metadata[u"cwl:defaults"] = jobobj
266281

267282
if overrides:
268-
metadata[u"cwltool:overrides"] = overrides
283+
new_metadata[u"cwltool:overrides"] = overrides
269284

270-
return document_loader, avsc_names, processobj, metadata, uri
285+
return document_loader, avsc_names, processobj, new_metadata, uri
271286

272287

273288
def make_tool(document_loader, # type: Loader
@@ -322,7 +337,7 @@ def load_tool(argsworkflow, # type: Union[Text, Dict[Text, Any]]
322337
enable_dev=False, # type: bool
323338
strict=True, # type: bool
324339
resolver=None, # type: Callable[[Loader, Union[Text, Dict[Text, Any]]], Text]
325-
fetcher_constructor=None, # type: Callable[[Dict[Text, Text], requests.sessions.Session], Fetcher]
340+
fetcher_constructor=None, # type: FetcherConstructorType
326341
overrides=None
327342
):
328343
# type: (...) -> Process
@@ -332,7 +347,8 @@ def load_tool(argsworkflow, # type: Union[Text, Dict[Text, Any]]
332347
document_loader, avsc_names, processobj, metadata, uri = validate_document(
333348
document_loader, workflowobj, uri, enable_dev=enable_dev,
334349
strict=strict, fetcher_constructor=fetcher_constructor,
335-
overrides=overrides)
350+
overrides=overrides, metadata=kwargs.get('metadata', None)
351+
if kwargs else None)
336352
return make_tool(document_loader, avsc_names, metadata, uri,
337353
makeTool, kwargs if kwargs else {})
338354

cwltool/main.py

+7-5
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,9 @@
2727
from .builder import Builder
2828
from .cwlrdf import printdot, printrdf
2929
from .errors import UnsupportedRequirement, WorkflowException
30-
from .load_tool import (resolve_tool_uri, fetch_document, make_tool, validate_document,
31-
jobloaderctx, resolve_overrides, load_overrides)
30+
from .load_tool import (FetcherConstructorType, resolve_tool_uri,
31+
fetch_document, make_tool, validate_document, jobloaderctx,
32+
resolve_overrides, load_overrides)
3233
from .mutation import MutationManager
3334
from .pack import pack
3435
from .pathmapper import (adjustDirObjs, adjustFileObjs, get_listing,
@@ -544,8 +545,8 @@ def load_job_order(args, # type: argparse.Namespace
544545
job_order_object, _ = loader.resolve_ref(job_order_file, checklinks=False)
545546

546547
if job_order_object and "http://commonwl.org/cwltool#overrides" in job_order_object:
547-
overrides.extend(resolve_overrides(job_order_object, file_uri(job_order_file), tool_file_uri))
548-
del job_order_object["http://commonwl.org/cwltool#overrides"]
548+
overrides.extend(resolve_overrides(job_order_object, file_uri(job_order_file), tool_file_uri))
549+
del job_order_object["http://commonwl.org/cwltool#overrides"]
549550

550551
if not job_order_object:
551552
input_basedir = args.basedir if args.basedir else os.getcwd()
@@ -641,6 +642,7 @@ def addSizes(p):
641642
ns = {} # type: Dict[Text, Union[Dict[Any, Any], Text, Iterable[Text]]]
642643
ns.update(t.metadata.get("$namespaces", {}))
643644
ld = Loader(ns)
645+
644646
def expand_formats(p):
645647
if "format" in p:
646648
p["format"] = ld.expand_url(p["format"], "")
@@ -734,7 +736,7 @@ def main(argsl=None, # type: List[str]
734736
versionfunc=versionstring, # type: Callable[[], Text]
735737
job_order_object=None, # type: MutableMapping[Text, Any]
736738
make_fs_access=StdFsAccess, # type: Callable[[Text], StdFsAccess]
737-
fetcher_constructor=None, # type: Callable[[Dict[Text, Text], requests.sessions.Session], Fetcher]
739+
fetcher_constructor=None, # type: FetcherConstructorType
738740
resolver=tool_resolver,
739741
logger_handler=None,
740742
custom_schema_callback=None # type: Callable[[], None]

cwltool/workflow.py

+8-4
Original file line numberDiff line numberDiff line change
@@ -70,11 +70,15 @@ def match_types(sinktype, src, iid, inputobj, linkMerge, valueFrom):
7070
elif isinstance(src.parameter["type"], list):
7171
# Source is union type
7272
# Check that at least one source type is compatible with the sink.
73-
for st in src.parameter["type"]:
74-
srccopy = copy.deepcopy(src)
75-
srccopy.parameter["type"] = st
76-
if match_types(sinktype, srccopy, iid, inputobj, linkMerge, valueFrom):
73+
original_types = src.parameter["type"]
74+
for source_type in original_types:
75+
src.parameter["type"] = source_type
76+
match = match_types(
77+
sinktype, src, iid, inputobj, linkMerge, valueFrom)
78+
if match:
79+
src.parameter["type"] = original_types
7780
return True
81+
src.parameter["type"] = original_types
7882
return False
7983
elif linkMerge:
8084
if iid not in inputobj:

tests/test_pack.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212

1313
import cwltool.pack
1414
import cwltool.workflow
15+
from cwltool.resolver import tool_resolver
16+
from cwltool import load_tool
1517
from cwltool.load_tool import fetch_document, validate_document
1618
from cwltool.main import makeRelative, main, print_pack
1719
from cwltool.pathmapper import adjustDirObjs, adjustFileObjs
@@ -23,6 +25,7 @@ class TestPack(unittest.TestCase):
2325
maxDiff = None
2426

2527
def test_pack(self):
28+
load_tool.loaders = {}
2629

2730
document_loader, workflowobj, uri = fetch_document(
2831
get_data("tests/wf/revsort.cwl"))
@@ -97,10 +100,11 @@ def _pack_idempotently(self, document):
97100
reason="Instance of cwltool is used, on Windows it invokes a default docker container"
98101
"which is not supported on AppVeyor")
99102
def test_packed_workflow_execution(self):
103+
load_tool.loaders = {}
100104
test_wf = "tests/wf/count-lines1-wf.cwl"
101105
test_wf_job = "tests/wf/wc-job.json"
102106
document_loader, workflowobj, uri = fetch_document(
103-
get_data(test_wf))
107+
get_data(test_wf), resolver=tool_resolver)
104108
document_loader, avsc_names, processobj, metadata, uri = validate_document(
105109
document_loader, workflowobj, uri)
106110
packed = json.loads(print_pack(document_loader, processobj, uri, metadata))

0 commit comments

Comments
 (0)