Skip to content

Commit 16740b1

Browse files
authored
Merge branch 'master' into py3_window
2 parents d3c884f + 9be7ad8 commit 16740b1

File tree

6 files changed

+144
-39
lines changed

6 files changed

+144
-39
lines changed

cwltool/builder.py

+21-11
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import absolute_import
22
import copy
33
import os
4+
import logging
45
from typing import Any, Callable, Dict, List, Text, Type, Union
56

67
import six
@@ -18,6 +19,8 @@
1819
from .stdfsaccess import StdFsAccess
1920
from .utils import aslist, get_feature, docker_windows_path_adjust, onWindows
2021

22+
_logger = logging.getLogger("cwltool")
23+
2124
AvroSchemaFromJSONData = avro.schema.make_avsc_object
2225

2326
CONTENT_LIMIT = 64 * 1024
@@ -147,18 +150,25 @@ def bind_input(self, schema, datum, lead_pos=None, tail_pos=None):
147150
datum["secondaryFiles"] = []
148151
for sf in aslist(schema["secondaryFiles"]):
149152
if isinstance(sf, dict) or "$(" in sf or "${" in sf:
150-
secondary_eval = self.do_eval(sf, context=datum)
151-
if isinstance(secondary_eval, string_types):
152-
sfpath = {"location": secondary_eval,
153-
"class": "File"}
154-
else:
155-
sfpath = secondary_eval
156-
else:
157-
sfpath = {"location": substitute(datum["location"], sf), "class": "File"}
158-
if isinstance(sfpath, list):
159-
datum["secondaryFiles"].extend(sfpath)
153+
sfpath = self.do_eval(sf, context=datum)
160154
else:
161-
datum["secondaryFiles"].append(sfpath)
155+
sfpath = substitute(datum["basename"], sf)
156+
for sfname in aslist(sfpath):
157+
found = False
158+
for d in datum["secondaryFiles"]:
159+
if not d.get("basename"):
160+
d["basename"] = d["location"][d["location"].rindex("/")+1:]
161+
if d["basename"] == sfname:
162+
found = True
163+
if not found:
164+
if isinstance(sfname, dict):
165+
datum["secondaryFiles"].append(sfname)
166+
else:
167+
datum["secondaryFiles"].append({
168+
"location": datum["location"][0:datum["location"].rindex("/")+1]+sfname,
169+
"basename": sfname,
170+
"class": "File"})
171+
162172
normalizeFilesDirs(datum["secondaryFiles"])
163173

164174
def _capture_files(f):

cwltool/draft2tool.py

+47-17
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*") # Accept anything
3636
ACCEPTLIST_RE = ACCEPTLIST_EN_STRICT_RE
3737
DEFAULT_CONTAINER_MSG="""We are on Microsoft Windows and not all components of this CWL description have a
38-
container specified. This means that these steps will be executed in the default container,
38+
container specified. This means that these steps will be executed in the default container,
3939
which is %s.
4040
4141
Note, this could affect portability if this CWL description relies on non-POSIX features
@@ -116,17 +116,26 @@ def revmap_file(builder, outdir, f):
116116
if not split.scheme:
117117
outdir = file_uri(str(outdir))
118118

119+
# builder.outdir is the inner (container/compute node) output directory
120+
# outdir is the outer (host/storage system) output directory
121+
119122
if "location" in f:
120123
if f["location"].startswith("file://"):
121124
path = convert_pathsep_to_unix(uri_file_path(f["location"]))
122125
revmap_f = builder.pathmapper.reversemap(path)
126+
123127
if revmap_f and not builder.pathmapper.mapper(revmap_f[0]).type.startswith("Writable"):
124128
f["basename"] = os.path.basename(path)
125-
f["location"] = revmap_f[0]
129+
f["location"] = revmap_f[1]
126130
elif path == builder.outdir:
127131
f["location"] = outdir
128132
elif path.startswith(builder.outdir):
129133
f["location"] = builder.fs_access.join(outdir, path[len(builder.outdir) + 1:])
134+
elif f["location"].startswith(outdir):
135+
revmap_f = builder.pathmapper.reversemap(builder.fs_access.join(builder.outdir, f["location"][len(outdir) + 1:]))
136+
if revmap_f and not builder.pathmapper.mapper(revmap_f[0]).type.startswith("Writable"):
137+
f["basename"] = os.path.basename(path)
138+
f["location"] = revmap_f[1]
130139
return f
131140

132141
if "path" in f:
@@ -190,7 +199,7 @@ def __init__(self, toolpath_object, **kwargs):
190199
super(CommandLineTool, self).__init__(toolpath_object, **kwargs)
191200
self.find_default_container = kwargs.get("find_default_container", None)
192201

193-
def makeJobRunner(self, use_container=True): # type: (Optional[bool]) -> JobBase
202+
def makeJobRunner(self, use_container=True, **kwargs): # type: (Optional[bool], **Any) -> JobBase
194203
dockerReq, _ = self.get_requirement("DockerRequirement")
195204
if not dockerReq and use_container:
196205
if self.find_default_container:
@@ -216,7 +225,8 @@ def makeJobRunner(self, use_container=True): # type: (Optional[bool]) -> JobBas
216225

217226
def makePathMapper(self, reffiles, stagedir, **kwargs):
218227
# type: (List[Any], Text, **Any) -> PathMapper
219-
return PathMapper(reffiles, kwargs["basedir"], stagedir)
228+
return PathMapper(reffiles, kwargs["basedir"], stagedir,
229+
separateDirs=kwargs.get("separateDirs", True))
220230

221231
def updatePathmap(self, outdir, pathmap, fn):
222232
# type: (Text, PathMapper, Dict) -> None
@@ -325,9 +335,10 @@ def rm_pending_output_callback(output_callbacks, jobcachepending,
325335

326336
reffiles = copy.deepcopy(builder.files)
327337

328-
j = self.makeJobRunner(kwargs.get("use_container"))
338+
j = self.makeJobRunner(**kwargs)
329339
j.builder = builder
330340
j.joborder = builder.job
341+
j.make_pathmapper = self.makePathMapper
331342
j.stdin = None
332343
j.stderr = None
333344
j.stdout = None
@@ -350,6 +361,7 @@ def rm_pending_output_callback(output_callbacks, jobcachepending,
350361
if "stagedir" in make_path_mapper_kwargs:
351362
make_path_mapper_kwargs = make_path_mapper_kwargs.copy()
352363
del make_path_mapper_kwargs["stagedir"]
364+
353365
builder.pathmapper = self.makePathMapper(reffiles, builder.stagedir, **make_path_mapper_kwargs)
354366
builder.requirements = j.requirements
355367

@@ -566,7 +578,12 @@ def collect_output(self, schema, builder, outdir, fs_access, compute_checksum=Tr
566578
elif gb.startswith("/"):
567579
raise WorkflowException("glob patterns must not start with '/'")
568580
try:
581+
prefix = fs_access.glob(outdir)
569582
r.extend([{"location": g,
583+
"path": fs_access.join(builder.outdir, g[len(prefix[0])+1:]),
584+
"basename": os.path.basename(g),
585+
"nameroot": os.path.splitext(os.path.basename(g))[0],
586+
"nameext": os.path.splitext(os.path.basename(g))[1],
570587
"class": "File" if fs_access.isfile(g) else "Directory"}
571588
for g in fs_access.glob(fs_access.join(outdir, gb))])
572589
except (OSError, IOError) as e:
@@ -576,12 +593,14 @@ def collect_output(self, schema, builder, outdir, fs_access, compute_checksum=Tr
576593
raise
577594

578595
for files in r:
596+
rfile = files.copy()
597+
revmap(rfile)
579598
if files["class"] == "Directory":
580599
ll = builder.loadListing or (binding and binding.get("loadListing"))
581600
if ll and ll != "no_listing":
582601
get_listing(fs_access, files, (ll == "deep_listing"))
583602
else:
584-
with fs_access.open(files["location"], "rb") as f:
603+
with fs_access.open(rfile["location"], "rb") as f:
585604
contents = b""
586605
if binding.get("loadContents") or compute_checksum:
587606
contents = f.read(CONTENT_LIMIT)
@@ -625,28 +644,39 @@ def collect_output(self, schema, builder, outdir, fs_access, compute_checksum=Tr
625644
else:
626645
r = r[0]
627646

628-
# Ensure files point to local references outside of the run environment
629-
adjustFileObjs(r, cast( # known bug in mypy
630-
# https://github.com/python/mypy/issues/797
631-
Callable[[Any], Any], revmap))
632-
633647
if "secondaryFiles" in schema:
634648
with SourceLine(schema, "secondaryFiles", WorkflowException):
635649
for primary in aslist(r):
636650
if isinstance(primary, dict):
637-
primary["secondaryFiles"] = []
651+
primary.setdefault("secondaryFiles", [])
652+
pathprefix = primary["path"][0:primary["path"].rindex("/")+1]
638653
for sf in aslist(schema["secondaryFiles"]):
639654
if isinstance(sf, dict) or "$(" in sf or "${" in sf:
640655
sfpath = builder.do_eval(sf, context=primary)
641-
if isinstance(sfpath, string_types):
642-
sfpath = revmap({"location": sfpath, "class": "File"})
656+
subst = False
643657
else:
644-
sfpath = {"location": substitute(primary["location"], sf), "class": "File"}
645-
658+
sfpath = sf
659+
subst = True
646660
for sfitem in aslist(sfpath):
647-
if fs_access.exists(sfitem["location"]):
661+
if isinstance(sfitem, string_types):
662+
if subst:
663+
sfitem = {"path": substitute(primary["path"], sfitem)}
664+
else:
665+
sfitem = {"path": pathprefix+sfitem}
666+
if "path" in sfitem and "location" not in sfitem:
667+
revmap(sfitem)
668+
if fs_access.isfile(sfitem["location"]):
669+
sfitem["class"] = "File"
670+
primary["secondaryFiles"].append(sfitem)
671+
elif fs_access.isdir(sfitem["location"]):
672+
sfitem["class"] = "Directory"
648673
primary["secondaryFiles"].append(sfitem)
649674

675+
# Ensure files point to local references outside of the run environment
676+
adjustFileObjs(r, cast( # known bug in mypy
677+
# https://github.com/python/mypy/issues/797
678+
Callable[[Any], Any], revmap))
679+
650680
if not r and optional:
651681
r = None
652682

cwltool/job.py

+10-5
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ def __init__(self): # type: () -> None
132132
self.name = None # type: Text
133133
self.command_line = None # type: List[Text]
134134
self.pathmapper = None # type: PathMapper
135+
self.make_pathmapper = None # type: Callable[..., PathMapper]
135136
self.generatemapper = None # type: PathMapper
136137
self.collect_outputs = None # type: Union[Callable[[Any], Any], functools.partial[Any]]
137138
self.output_callback = None # type: Callable[[Any, Any], Any]
@@ -142,7 +143,7 @@ def __init__(self): # type: () -> None
142143
self.stagedir = None # type: Text
143144
self.inplace_update = None # type: bool
144145

145-
def _setup(self): # type: () -> None
146+
def _setup(self, kwargs): # type: (Dict) -> None
146147
if not os.path.exists(self.outdir):
147148
os.makedirs(self.outdir)
148149

@@ -154,8 +155,12 @@ def _setup(self): # type: () -> None
154155
"file." % (knownfile, self.pathmapper.mapper(knownfile)[0]))
155156

156157
if self.generatefiles["listing"]:
157-
self.generatemapper = PathMapper(cast(List[Any], self.generatefiles["listing"]),
158-
self.outdir, self.outdir, separateDirs=False)
158+
make_path_mapper_kwargs = kwargs
159+
if "basedir" in make_path_mapper_kwargs:
160+
make_path_mapper_kwargs = make_path_mapper_kwargs.copy()
161+
del make_path_mapper_kwargs["basedir"]
162+
self.generatemapper = self.make_pathmapper(cast(List[Any], self.generatefiles["listing"]),
163+
self.outdir, basedir=self.outdir, separateDirs=False, **make_path_mapper_kwargs)
159164
_logger.debug(u"[job %s] initial work dir %s", self.name,
160165
json.dumps({p: self.generatemapper.mapper(p) for p in self.generatemapper.files()}, indent=4))
161166

@@ -275,7 +280,7 @@ def run(self, pull_image=True, rm_container=True,
275280
rm_tmpdir=True, move_outputs="move", **kwargs):
276281
# type: (bool, bool, bool, Text, **Any) -> None
277282

278-
self._setup()
283+
self._setup(kwargs)
279284

280285
env = self.environment
281286
if not os.path.exists(self.tmpdir):
@@ -371,7 +376,7 @@ def run(self, pull_image=True, rm_container=True,
371376
"Docker is not available for this tool, try --no-container"
372377
" to disable Docker: %s" % e)
373378

374-
self._setup()
379+
self._setup(kwargs)
375380

376381
runtime = [u"docker", u"run", u"-i"]
377382

cwltool/pathmapper.py

+38-6
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,11 @@
55
import stat
66
import uuid
77
from functools import partial
8+
from tempfile import NamedTemporaryFile
9+
10+
import requests
11+
from cachecontrol import CacheControl
12+
from cachecontrol.caches import FileCache
813
from typing import Any, Callable, Dict, Iterable, List, Set, Text, Tuple, Union
914

1015
import schema_salad.validate as validate
@@ -139,6 +144,29 @@ def trim_listing(obj):
139144
if obj.get("location", "").startswith("file://") and "listing" in obj:
140145
del obj["listing"]
141146

147+
# Download http Files
148+
def downloadHttpFile(httpurl):
149+
# type: (Text) -> Text
150+
cache_session = None
151+
if "XDG_CACHE_HOME" in os.environ:
152+
directory = os.environ["XDG_CACHE_HOME"]
153+
elif "HOME" in os.environ:
154+
directory = os.environ["HOME"]
155+
else:
156+
directory = os.path.expanduser('~')
157+
158+
cache_session = CacheControl(
159+
requests.Session(),
160+
cache=FileCache(
161+
os.path.join(directory, ".cache", "cwltool")))
162+
163+
r = cache_session.get(httpurl, stream=True)
164+
with NamedTemporaryFile(mode='wb', delete=False) as f:
165+
for chunk in r.iter_content(chunk_size=16384):
166+
if chunk: # filter out keep-alive new chunks
167+
f.write(chunk)
168+
r.close()
169+
return f.name
142170

143171
class PathMapper(object):
144172
"""Mapping of files from relative path provided in the file to a tuple of
@@ -208,14 +236,18 @@ def visit(self, obj, stagedir, basedir, copy=False, staged=False):
208236
self._pathmap[obj["location"]] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
209237
else:
210238
with SourceLine(obj, "location", validate.ValidationException):
211-
# Dereference symbolic links
212239
deref = ab
213-
st = os.lstat(deref)
214-
while stat.S_ISLNK(st.st_mode):
215-
rl = os.readlink(deref)
216-
deref = rl if os.path.isabs(rl) else os.path.join(
217-
os.path.dirname(deref), rl)
240+
if urllib.parse.urlsplit(deref).scheme in ['http','https']:
241+
deref = downloadHttpFile(path)
242+
else:
243+
# Dereference symbolic links
218244
st = os.lstat(deref)
245+
while stat.S_ISLNK(st.st_mode):
246+
rl = os.readlink(deref)
247+
deref = rl if os.path.isabs(rl) else os.path.join(
248+
os.path.dirname(deref), rl)
249+
st = os.lstat(deref)
250+
219251
self._pathmap[path] = MapperEnt(deref, tgt, "WritableFile" if copy else "File", staged)
220252
self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir, copy=copy, staged=staged)
221253

cwltool/stdfsaccess.py

+2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
def abspath(src, basedir): # type: (Text, Text) -> Text
1414
if src.startswith(u"file://"):
1515
ab = six.text_type(uri_file_path(str(src)))
16+
elif urllib.parse.urlsplit(src).scheme in ['http','https']:
17+
return src
1618
else:
1719
if basedir.startswith(u"file://"):
1820
ab = src if os.path.isabs(src) else basedir+ '/'+ src

tests/test_http_input.py

+26
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
from __future__ import absolute_import
2+
import unittest
3+
import os
4+
import tempfile
5+
from cwltool.pathmapper import PathMapper
6+
7+
8+
class TestHttpInput(unittest.TestCase):
9+
def test_http_path_mapping(self):
10+
class SubPathMapper(PathMapper):
11+
def __init__(self, referenced_files, basedir, stagedir):
12+
super(SubPathMapper, self).__init__(referenced_files, basedir, stagedir)
13+
input_file_path = "https://github.com/raw/common-workflow-language/cwltool/master/tests/2.fasta"
14+
tempdir = tempfile.mkdtemp()
15+
base_file = [{
16+
"class": "File",
17+
"location": "https://github.com/raw/common-workflow-language/cwltool/master/tests/2.fasta",
18+
"basename": "chr20.fa"
19+
}]
20+
path_map_obj = SubPathMapper(base_file, os.getcwd(), tempdir)
21+
22+
self.assertIn(input_file_path,path_map_obj._pathmap)
23+
assert os.path.exists(path_map_obj._pathmap[input_file_path].resolved) == 1
24+
with open(path_map_obj._pathmap[input_file_path].resolved) as f:
25+
self.assertIn(">Sequence 561 BP; 135 A; 106 C; 98 G; 222 T; 0 other;",f.read())
26+
f.close()

0 commit comments

Comments
 (0)