8
8
import uuid
9
9
import hashlib
10
10
import json
11
- from typing import Any , Callable , Dict , List , Text , Tuple , Union , cast
11
+ import copy
12
+ from typing import Any , Callable , Dict , List , Text , Tuple , Union , cast , Iterable
12
13
13
14
import requests .sessions
14
15
from six import itervalues , string_types
23
24
24
25
from . import process , update
25
26
from .errors import WorkflowException
26
- from .process import Process , shortname
27
+ from .process import Process , shortname , get_schema
27
28
from .update import ALLUPDATES
28
29
29
30
_logger = logging .getLogger ("cwltool" )
30
31
31
32
jobloaderctx = {
32
33
u"cwl" : "https://w3id.org/cwl/cwl#" ,
34
+ u"cwltool" : "http://commonwl.org/cwltool#" ,
33
35
u"path" : {u"@type" : u"@id" },
34
36
u"location" : {u"@type" : u"@id" },
35
37
u"format" : {u"@type" : u"@id" },
36
38
u"id" : u"@id"
37
39
}
38
40
41
+
42
+ overrides_ctx = {
43
+ u"overrideTarget" : {u"@type" : u"@id" },
44
+ u"cwltool" : "http://commonwl.org/cwltool#" ,
45
+ u"overrides" : {
46
+ "@id" : "cwltool:overrides" ,
47
+ "mapSubject" : "overrideTarget" ,
48
+ "mapPredicate" : "override"
49
+ },
50
+ u"override" : {
51
+ "@id" : "cwltool:override" ,
52
+ "mapSubject" : "class"
53
+ }
54
+ } # type: Dict[Text, Union[Dict[Any, Any], Text, Iterable[Text]]]
55
+
56
+ def resolve_tool_uri (argsworkflow , # type: Text
57
+ resolver = None , # type: Callable[[Loader, Union[Text, Dict[Text, Any]]], Text]
58
+ fetcher_constructor = None ,
59
+ # type: Callable[[Dict[Text, Text], requests.sessions.Session], Fetcher]
60
+ document_loader = None # type: Loader
61
+ ):
62
+ # type: (...) -> Tuple[Text, Text]
63
+
64
+ uri = None # type: Text
65
+ split = urllib .parse .urlsplit (argsworkflow )
66
+ # In case of Windows path, urlsplit misjudge Drive letters as scheme, here we are skipping that
67
+ if split .scheme and split .scheme in [u'http' ,u'https' ,u'file' ]:
68
+ uri = argsworkflow
69
+ elif os .path .exists (os .path .abspath (argsworkflow )):
70
+ uri = file_uri (str (os .path .abspath (argsworkflow )))
71
+ elif resolver :
72
+ if document_loader is None :
73
+ document_loader = Loader (jobloaderctx , fetcher_constructor = fetcher_constructor ) # type: ignore
74
+ uri = resolver (document_loader , argsworkflow )
75
+
76
+ if uri is None :
77
+ raise ValidationException ("Not found: '%s'" % argsworkflow )
78
+
79
+ if argsworkflow != uri :
80
+ _logger .info ("Resolved '%s' to '%s'" , argsworkflow , uri )
81
+
82
+ fileuri = urllib .parse .urldefrag (uri )[0 ]
83
+ return uri , fileuri
84
+
85
+
39
86
def fetch_document (argsworkflow , # type: Union[Text, Dict[Text, Any]]
40
87
resolver = None , # type: Callable[[Loader, Union[Text, Dict[Text, Any]]], Text]
41
88
fetcher_constructor = None
@@ -49,22 +96,7 @@ def fetch_document(argsworkflow, # type: Union[Text, Dict[Text, Any]]
49
96
uri = None # type: Text
50
97
workflowobj = None # type: CommentedMap
51
98
if isinstance (argsworkflow , string_types ):
52
- split = urllib .parse .urlsplit (argsworkflow )
53
- # In case of Windows path, urlsplit misjudge Drive letters as scheme, here we are skipping that
54
- if split .scheme and split .scheme in [u'http' ,u'https' ,u'file' ]:
55
- uri = argsworkflow
56
- elif os .path .exists (os .path .abspath (argsworkflow )):
57
- uri = file_uri (str (os .path .abspath (argsworkflow )))
58
- elif resolver :
59
- uri = resolver (document_loader , argsworkflow )
60
-
61
- if uri is None :
62
- raise ValidationException ("Not found: '%s'" % argsworkflow )
63
-
64
- if argsworkflow != uri :
65
- _logger .info ("Resolved '%s' to '%s'" , argsworkflow , uri )
66
-
67
- fileuri = urllib .parse .urldefrag (uri )[0 ]
99
+ uri , fileuri = resolve_tool_uri (argsworkflow , resolver = resolver , document_loader = document_loader )
68
100
workflowobj = document_loader .fetch (fileuri )
69
101
elif isinstance (argsworkflow , dict ):
70
102
uri = "#" + Text (id (argsworkflow ))
@@ -139,8 +171,9 @@ def validate_document(document_loader, # type: Loader
139
171
strict = True , # type: bool
140
172
preprocess_only = False , # type: bool
141
173
fetcher_constructor = None ,
142
- skip_schemas = None
174
+ skip_schemas = None ,
143
175
# type: Callable[[Dict[Text, Text], requests.sessions.Session], Fetcher]
176
+ overrides = None # type: List[Dict]
144
177
):
145
178
# type: (...) -> Tuple[Loader, Names, Union[Dict[Text, Any], List[Dict[Text, Any]]], Dict[Text, Any], Text]
146
179
"""Validate a CWL document."""
@@ -155,9 +188,15 @@ def validate_document(document_loader, # type: Loader
155
188
156
189
jobobj = None
157
190
if "cwl:tool" in workflowobj :
158
- jobobj , _ = document_loader .resolve_all (workflowobj , uri )
191
+ job_loader = Loader (jobloaderctx , fetcher_constructor = fetcher_constructor ) # type: ignore
192
+ jobobj , _ = job_loader .resolve_all (workflowobj , uri )
159
193
uri = urllib .parse .urljoin (uri , workflowobj ["https://w3id.org/cwl/cwl#tool" ])
160
194
del cast (dict , jobobj )["https://w3id.org/cwl/cwl#tool" ]
195
+
196
+ if "http://commonwl.org/cwltool#overrides" in jobobj :
197
+ overrides .extend (resolve_overrides (jobobj , uri , uri ))
198
+ del jobobj ["http://commonwl.org/cwltool#overrides" ]
199
+
161
200
workflowobj = fetch_document (uri , fetcher_constructor = fetcher_constructor )[1 ]
162
201
163
202
fileuri = urllib .parse .urldefrag (uri )[0 ]
@@ -225,6 +264,9 @@ def validate_document(document_loader, # type: Loader
225
264
if jobobj :
226
265
metadata [u"cwl:defaults" ] = jobobj
227
266
267
+ if overrides :
268
+ metadata [u"cwltool:overrides" ] = overrides
269
+
228
270
return document_loader , avsc_names , processobj , metadata , uri
229
271
230
272
@@ -277,14 +319,29 @@ def load_tool(argsworkflow, # type: Union[Text, Dict[Text, Any]]
277
319
enable_dev = False , # type: bool
278
320
strict = True , # type: bool
279
321
resolver = None , # type: Callable[[Loader, Union[Text, Dict[Text, Any]]], Text]
280
- fetcher_constructor = None # type: Callable[[Dict[Text, Text], requests.sessions.Session], Fetcher]
322
+ fetcher_constructor = None , # type: Callable[[Dict[Text, Text], requests.sessions.Session], Fetcher]
323
+ overrides = None
281
324
):
282
325
# type: (...) -> Process
283
326
284
327
document_loader , workflowobj , uri = fetch_document (argsworkflow , resolver = resolver ,
285
328
fetcher_constructor = fetcher_constructor )
286
329
document_loader , avsc_names , processobj , metadata , uri = validate_document (
287
330
document_loader , workflowobj , uri , enable_dev = enable_dev ,
288
- strict = strict , fetcher_constructor = fetcher_constructor )
331
+ strict = strict , fetcher_constructor = fetcher_constructor ,
332
+ overrides = overrides )
289
333
return make_tool (document_loader , avsc_names , metadata , uri ,
290
334
makeTool , kwargs if kwargs else {})
335
+
336
+ def resolve_overrides (ov , ov_uri , baseurl ): # type: (CommentedMap, Text, Text) -> List[Dict[Text, Any]]
337
+ ovloader = Loader (overrides_ctx )
338
+ ret , _ = ovloader .resolve_all (ov , baseurl )
339
+ if not isinstance (ret , CommentedMap ):
340
+ raise Exception ("Expected CommentedMap, got %s" % type (ret ))
341
+ cwl_docloader = get_schema ("v1.0" )[0 ]
342
+ cwl_docloader .resolve_all (ret , ov_uri )
343
+ return ret ["overrides" ]
344
+
345
+ def load_overrides (ov , base_url ): # type: (Text, Text) -> List[Dict[Text, Any]]
346
+ ovloader = Loader (overrides_ctx )
347
+ return resolve_overrides (ovloader .fetch (ov ), ov , base_url )
0 commit comments