diff --git a/appveyor.yml b/appveyor.yml new file mode 100644 index 000000000..ecc711256 --- /dev/null +++ b/appveyor.yml @@ -0,0 +1,28 @@ +version: .{build}-{branch} + +environment: + + matrix: + - PYTHON: "C:\\Python27" + PYTHON_VERSION: "2.7.x" + PYTHON_ARCH: "32" + + - PYTHON: "C:\\Python27-x64" + PYTHON_VERSION: "2.7.x" + PYTHON_ARCH: "64" + +install: + - "SET PATH=%PYTHON%;%PYTHON%\\Scripts;%PATH%" + - "python --version" + - "python -c \"import struct; print(struct.calcsize('P') * 8)\"" + - "pip install --disable-pip-version-check --user --upgrade pip" + - "pip install --upgrade --no-deps --force-reinstall git+git://github.com/kapilkd13/schema_salad@windows#egg=schema_salad-2.4.201706261942" + +build_script: + - "%CMD_IN_ENV% python setup.py install" + + +test_script: + + - "%CMD_IN_ENV% python setup.py test" + diff --git a/cwltool/builder.py b/cwltool/builder.py index 5b98ea40e..36159f326 100644 --- a/cwltool/builder.py +++ b/cwltool/builder.py @@ -1,5 +1,6 @@ from __future__ import absolute_import import copy +import os from typing import Any, Callable, Dict, List, Text, Type, Union import six @@ -15,7 +16,7 @@ from .pathmapper import (PathMapper, get_listing, normalizeFilesDirs, visit_class) from .stdfsaccess import StdFsAccess -from .utils import aslist +from .utils import aslist, get_feature, docker_windows_path_adjust, onWindows # if six.PY3: # AvroSchemaFromJSONData = avro.schema.SchemaFromJSONData @@ -185,6 +186,11 @@ def tostr(self, value): # type: (Any) -> Text if isinstance(value, dict) and value.get("class") in ("File", "Directory"): if "path" not in value: raise WorkflowException(u"%s object missing \"path\": %s" % (value["class"], value)) + + # Path adjust for windows file path when passing to docker, docker accepts unix like path only + (docker_req, docker_is_req) = get_feature(self, "DockerRequirement") + if onWindows() and docker_req is not None: # docker_req is none only when there is no dockerRequirement mentioned in hints and Requirement + return docker_windows_path_adjust(value["path"]) return value["path"] else: return Text(value) diff --git a/cwltool/draft2tool.py b/cwltool/draft2tool.py index 4e4442c50..0e1e8f634 100644 --- a/cwltool/draft2tool.py +++ b/cwltool/draft2tool.py @@ -28,7 +28,7 @@ _logger_validation_warnings, compute_checksums, normalizeFilesDirs, shortname, uniquename) from .stdfsaccess import StdFsAccess -from .utils import aslist +from .utils import aslist, docker_windows_path_adjust, convert_pathsep_to_unix from six.moves import map ACCEPTLIST_EN_STRICT_RE = re.compile(r"^[a-zA-Z0-9._+-]+$") @@ -107,7 +107,7 @@ def revmap_file(builder, outdir, f): if "location" in f: if f["location"].startswith("file://"): - path = uri_file_path(f["location"]) + path = convert_pathsep_to_unix(uri_file_path(f["location"])) revmap_f = builder.pathmapper.reversemap(path) if revmap_f: f["location"] = revmap_f[1] @@ -156,7 +156,7 @@ def run(self, **kwargs): def check_adjust(builder, f): # type: (Builder, Dict[Text, Any]) -> Dict[Text, Any] - f["path"] = builder.pathmapper.mapper(f["location"])[1] + f["path"] = docker_windows_path_adjust(builder.pathmapper.mapper(f["location"])[1]) f["dirname"], f["basename"] = os.path.split(f["path"]) if f["class"] == "File": f["nameroot"], f["nameext"] = os.path.splitext(f["basename"]) @@ -230,6 +230,10 @@ def job(self, (docker_req, docker_is_req) = self.get_requirement("DockerRequirement") if docker_req and kwargs.get("use_container") is not False: dockerimg = docker_req.get("dockerImageId") or docker_req.get("dockerPull") + elif kwargs.get("default_container", None) is not None and kwargs.get("use_container") is not False: + dockerimg = kwargs.get("default_container") + + if dockerimg: cmdline = ["docker", "run", dockerimg] + cmdline keydict = {u"cmdline": cmdline} diff --git a/cwltool/expression.py b/cwltool/expression.py index 492eba618..777e9f9b8 100644 --- a/cwltool/expression.py +++ b/cwltool/expression.py @@ -4,6 +4,7 @@ import logging import re from typing import Any, AnyStr, Dict, List, Text, Union +from .utils import docker_windows_path_adjust import six from six import u @@ -203,8 +204,8 @@ def do_eval(ex, jobinput, requirements, outdir, tmpdir, resources, # type: (Union[dict, AnyStr], Dict[Text, Union[Dict, List, Text]], List[Dict[Text, Any]], Text, Text, Dict[Text, Union[int, Text]], Any, bool, int, bool) -> Any runtime = copy.copy(resources) - runtime["tmpdir"] = tmpdir - runtime["outdir"] = outdir + runtime["tmpdir"] = docker_windows_path_adjust(tmpdir) + runtime["outdir"] = docker_windows_path_adjust(outdir) rootvars = { u"inputs": jobinput, diff --git a/cwltool/job.py b/cwltool/job.py index 407991df7..22b9e1bff 100644 --- a/cwltool/job.py +++ b/cwltool/job.py @@ -16,6 +16,7 @@ import shellescape +from .utils import copytree_with_merge, docker_windows_path_adjust, onWindows from . import docker from .builder import Builder from .docker_uid import docker_vm_uid @@ -106,8 +107,14 @@ def relink_initialworkdir(pathmapper, inplace_update=False): if os.path.islink(vol.target) or os.path.isfile(vol.target): os.remove(vol.target) elif os.path.isdir(vol.target): - os.rmdir(vol.target) - os.symlink(vol.resolved, vol.target) + shutil.rmtree(vol.target) + if onWindows(): + if vol.type in ("File", "WritableFile"): + shutil.copy(vol.resolved,vol.target) + elif vol.type in ("Directory", "WritableDirectory"): + copytree_with_merge(vol.resolved, vol.target) + else: + os.symlink(vol.resolved, vol.target) class JobBase(object): def __init__(self): # type: () -> None @@ -278,13 +285,16 @@ def run(self, pull_image=True, rm_container=True, if vars_to_preserve is not None: for key, value in os.environ.items(): if key in vars_to_preserve and key not in env: - env[key] = value - env["HOME"] = self.outdir - env["TMPDIR"] = self.tmpdir - - stageFiles(self.pathmapper, os.symlink, ignoreWritable=True) + # On Windows, subprocess env can't handle unicode. + env[key] = str(value) if onWindows() else value + env["HOME"] = str(self.outdir) if onWindows() else self.outdir + env["TMPDIR"] = str(self.tmpdir) if onWindows() else self.tmpdir + if "PATH" not in env: + env["PATH"] = str(os.environ["PATH"]) if onWindows() else os.environ["PATH"] + + stageFiles(self.pathmapper, ignoreWritable=True, symLink=True) if self.generatemapper: - stageFiles(self.generatemapper, os.symlink, ignoreWritable=self.inplace_update) + stageFiles(self.generatemapper, ignoreWritable=self.inplace_update, symLink=True) relink_initialworkdir(self.generatemapper, inplace_update=self.inplace_update) self._execute([], env, rm_tmpdir=rm_tmpdir, move_outputs=move_outputs) @@ -306,10 +316,10 @@ def add_volumes(self, pathmapper, runtime, stage_output): containertgt = vol.target if vol.type in ("File", "Directory"): if not vol.resolved.startswith("_:"): - runtime.append(u"--volume=%s:%s:ro" % (vol.resolved, containertgt)) + runtime.append(u"--volume=%s:%s:ro" % (docker_windows_path_adjust(vol.resolved), docker_windows_path_adjust(containertgt))) elif vol.type == "WritableFile": if self.inplace_update: - runtime.append(u"--volume=%s:%s:rw" % (vol.resolved, containertgt)) + runtime.append(u"--volume=%s:%s:rw" % (docker_windows_path_adjust(vol.resolved), docker_windows_path_adjust(containertgt))) else: shutil.copy(vol.resolved, vol.target) elif vol.type == "WritableDirectory": @@ -317,14 +327,15 @@ def add_volumes(self, pathmapper, runtime, stage_output): os.makedirs(vol.target, 0o0755) else: if self.inplace_update: - runtime.append(u"--volume=%s:%s:rw" % (vol.resolved, containertgt)) + runtime.append(u"--volume=%s:%s:rw" % (docker_windows_path_adjust(vol.resolved), docker_windows_path_adjust(containertgt))) else: shutil.copytree(vol.resolved, vol.target) elif vol.type == "CreateFile": createtmp = os.path.join(host_outdir, os.path.basename(vol.target)) with open(createtmp, "wb") as f: f.write(vol.resolved.encode("utf-8")) - runtime.append(u"--volume=%s:%s:ro" % (createtmp, vol.target)) + runtime.append(u"--volume=%s:%s:ro" % (docker_windows_path_adjust(createtmp), docker_windows_path_adjust(vol.target))) + def run(self, pull_image=True, rm_container=True, rm_tmpdir=True, move_outputs="move", **kwargs): @@ -361,14 +372,14 @@ def run(self, pull_image=True, rm_container=True, runtime = [u"docker", u"run", u"-i"] - runtime.append(u"--volume=%s:%s:rw" % (os.path.realpath(self.outdir), self.builder.outdir)) - runtime.append(u"--volume=%s:%s:rw" % (os.path.realpath(self.tmpdir), "/tmp")) + runtime.append(u"--volume=%s:%s:rw" % (docker_windows_path_adjust(os.path.realpath(self.outdir)), self.builder.outdir)) + runtime.append(u"--volume=%s:%s:rw" % (docker_windows_path_adjust(os.path.realpath(self.tmpdir)), "/tmp")) self.add_volumes(self.pathmapper, runtime, False) if self.generatemapper: self.add_volumes(self.generatemapper, runtime, True) - runtime.append(u"--workdir=%s" % (self.builder.outdir)) + runtime.append(u"--workdir=%s" % (docker_windows_path_adjust(self.builder.outdir))) runtime.append(u"--read-only=true") if kwargs.get("custom_net", None) is not None: @@ -379,9 +390,12 @@ def run(self, pull_image=True, rm_container=True, if self.stdout: runtime.append("--log-driver=none") - euid = docker_vm_uid() or os.geteuid() + if onWindows(): # windows os dont have getuid or geteuid functions + euid = docker_vm_uid() + else: + euid = docker_vm_uid() or os.geteuid() - if kwargs.get("no_match_user", None) is False: + if kwargs.get("no_match_user", None) is False and euid is not None: runtime.append(u"--user=%s" % (euid)) if rm_container: @@ -436,7 +450,7 @@ def _job_popen( sp = subprocess.Popen(commands, shell=False, - close_fds=True, + close_fds=not onWindows(), stdin=stdin, stdout=stdout, stderr=stderr, @@ -478,14 +492,14 @@ def _job_popen( stderr_path=stderr_path, stdin_path=stdin_path, ) - with open(os.path.join(job_dir, "job.json"), "w") as f: + with open(os.path.join(job_dir, "job.json"), "wb") as f: json.dump(job_description, f) try: job_script = os.path.join(job_dir, "run_job.bash") with open(job_script, "wb") as f: f.write(job_script_contents.encode('utf-8')) job_run = os.path.join(job_dir, "run_job.py") - with open(job_run, "w") as f: + with open(job_run, "wb") as f: f.write(PYTHON_RUN_SCRIPT) sp = subprocess.Popen( ["bash", job_script.encode("utf-8")], diff --git a/cwltool/load_tool.py b/cwltool/load_tool.py index 0a56b0794..986255364 100644 --- a/cwltool/load_tool.py +++ b/cwltool/load_tool.py @@ -47,7 +47,8 @@ def fetch_document(argsworkflow, # type: Union[Text, Dict[Text, Any]] workflowobj = None # type: CommentedMap if isinstance(argsworkflow, string_types): split = urllib.parse.urlsplit(argsworkflow) - if split.scheme: + # In case of Windows path, urlsplit misjudge Drive letters as scheme, here we are skipping that + if split.scheme and split.scheme in [u'http',u'https',u'file']: uri = argsworkflow elif os.path.exists(os.path.abspath(argsworkflow)): uri = file_uri(str(os.path.abspath(argsworkflow))) diff --git a/cwltool/main.py b/cwltool/main.py index e70674a64..001792ced 100755 --- a/cwltool/main.py +++ b/cwltool/main.py @@ -39,6 +39,8 @@ from .software_requirements import DependenciesConfiguration, get_container_from_software_requirements, SOFTWARE_REQUIREMENTS_ENABLED from .stdfsaccess import StdFsAccess from .update import ALLUPDATES, UPDATES +from .utils import onWindows +from ruamel.yaml.comments import Comment, CommentedSeq, CommentedMap _logger = logging.getLogger("cwltool") @@ -695,6 +697,10 @@ def main(argsl=None, # type: List[str] argsl = sys.argv[1:] args = arg_parser().parse_args(argsl) + # If On windows platform, A default Docker Container is Used if not explicitely provided by user + if onWindows() and not args.default_container: + args.default_container = "ubuntu" + # If caller provided custom arguments, it may be not every expected # option is set, so fill in no-op defaults to avoid crashing when # dereferencing them in args. diff --git a/cwltool/pathmapper.py b/cwltool/pathmapper.py index 13559cff3..0964186b6 100644 --- a/cwltool/pathmapper.py +++ b/cwltool/pathmapper.py @@ -12,6 +12,8 @@ from schema_salad.sourceline import SourceLine from six.moves import urllib +from .utils import convert_pathsep_to_unix + from .stdfsaccess import StdFsAccess, abspath _logger = logging.getLogger("cwltool") @@ -186,7 +188,8 @@ def visitlisting(self, listing, stagedir, basedir, copy=False, staged=False): def visit(self, obj, stagedir, basedir, copy=False, staged=False): # type: (Dict[Text, Any], Text, Text, bool, bool) -> None - tgt = os.path.join(stagedir, obj["basename"]) + tgt = convert_pathsep_to_unix( + os.path.join(stagedir, obj["basename"])) if obj["location"] in self._pathmap: return if obj["class"] == "Directory": diff --git a/cwltool/process.py b/cwltool/process.py index 6ab6ba9fc..ca32e0ccd 100644 --- a/cwltool/process.py +++ b/cwltool/process.py @@ -36,7 +36,7 @@ from .pathmapper import (PathMapper, adjustDirObjs, get_listing, normalizeFilesDirs, visit_class) from .stdfsaccess import StdFsAccess -from .utils import aslist, get_feature +from .utils import aslist, get_feature, copytree_with_merge, onWindows # if six.PY3: # AvroSchemaFromJSONData = avro.schema.SchemaFromJSONData @@ -206,15 +206,26 @@ def adjustFilesWithSecondary(rec, op, primary=None): adjustFilesWithSecondary(d, op, primary) -def stageFiles(pm, stageFunc, ignoreWritable=False): - # type: (PathMapper, Callable[..., Any], bool) -> None +def stageFiles(pm, stageFunc=None, ignoreWritable=False, symLink=True): + # type: (PathMapper, Callable[..., Any], bool, bool) -> None for f, p in pm.items(): if not p.staged: continue if not os.path.exists(os.path.dirname(p.target)): os.makedirs(os.path.dirname(p.target), 0o0755) - if p.type in ("File", "Directory") and (p.resolved.startswith("/") or p.resolved.startswith("file:///")): - stageFunc(p.resolved, p.target) + if p.type in ("File", "Directory") and (os.path.exists(p.resolved)): + if symLink: # Use symlink func if allowed + if onWindows(): + if p.type == "File": + shutil.copy(p.resolved, p.target) + elif p.type == "Directory": + if os.path.exists(p.target) and os.path.isdir(p.target): + shutil.rmtree(p.target) + copytree_with_merge(p.resolved, p.target) + else: + os.symlink(p.resolved, p.target) + elif stageFunc is not None: + stageFunc(p.resolved, p.target) elif p.type == "Directory" and not os.path.exists(p.target) and p.resolved.startswith("_:"): os.makedirs(p.target, 0o0755) elif p.type == "WritableFile" and not ignoreWritable: @@ -244,7 +255,6 @@ def collectFilesAndDirs(obj, out): def relocateOutputs(outputObj, outdir, output_dirs, action, fs_access): # type: (Union[Dict[Text, Any], List[Dict[Text, Any]]], Text, Set[Text], Text, StdFsAccess) -> Union[Dict[Text, Any], List[Dict[Text, Any]]] - adjustDirObjs(outputObj, functools.partial(get_listing, fs_access, recursive=True)) if action not in ("move", "copy"): @@ -273,7 +283,7 @@ def moveIt(src, dst): outfiles = [] # type: List[Dict[Text, Any]] collectFilesAndDirs(outputObj, outfiles) pm = PathMapper(outfiles, "", outdir, separateDirs=False) - stageFiles(pm, moveIt) + stageFiles(pm, stageFunc=moveIt,symLink=False) def _check_adjust(f): f["location"] = file_uri(pm.mapper(f["location"])[1]) @@ -296,8 +306,15 @@ def _check_adjust(f): rp = os.path.realpath(path) if path != rp: if rp in relinked: - os.unlink(path) - os.symlink(os.path.relpath(relinked[rp], path), path) + if onWindows(): + if os.path.isfile(path): + shutil.copy(os.path.relpath(relinked[rp], path), path) + elif os.path.exists(path) and os.path.isdir(path): + shutil.rmtree(path) + copytree_with_merge(os.path.relpath(relinked[rp], path), path) + else: + os.unlink(path) + os.symlink(os.path.relpath(relinked[rp], path), path) else: for od in output_dirs: if rp.startswith(od+"/"): @@ -540,7 +557,6 @@ def _init_job(self, joborder, **kwargs): builder.debug = kwargs.get("debug") builder.mutation_manager = kwargs.get("mutation_manager") - dockerReq, is_req = self.get_requirement("DockerRequirement") builder.make_fs_access = kwargs.get("make_fs_access") or StdFsAccess builder.fs_access = builder.make_fs_access(kwargs["basedir"]) @@ -548,11 +564,25 @@ def _init_job(self, joborder, **kwargs): if loadListingReq: builder.loadListing = loadListingReq.get("loadListing") - if dockerReq and kwargs.get("use_container"): - builder.outdir = builder.fs_access.realpath( - dockerReq.get("dockerOutputDirectory") or kwargs.get("docker_outdir") or "/var/spool/cwl") - builder.tmpdir = builder.fs_access.realpath(kwargs.get("docker_tmpdir") or "/tmp") - builder.stagedir = builder.fs_access.realpath(kwargs.get("docker_stagedir") or "/var/lib/cwl") + dockerReq, is_req = self.get_requirement("DockerRequirement") + defaultDocker = None + + if dockerReq is None and "default_container" in kwargs: + defaultDocker = kwargs["default_container"] + + if dockerReq or defaultDocker and kwargs.get("use_container"): + if dockerReq: + # Check if docker output directory is absolute + if dockerReq.get("dockerOutputDirectory") and dockerReq.get("dockerOutputDirectory").startswith('/'): + builder.outdir = dockerReq.get("dockerOutputDirectory") + else: + builder.outdir = builder.fs_access.docker_compatible_realpath( + dockerReq.get("dockerOutputDirectory") or kwargs.get("docker_outdir") or "/var/spool/cwl") + elif defaultDocker: + builder.outdir = builder.fs_access.docker_compatible_realpath( + kwargs.get("docker_outdir") or "/var/spool/cwl") + builder.tmpdir = builder.fs_access.docker_compatible_realpath(kwargs.get("docker_tmpdir") or "/tmp") + builder.stagedir = builder.fs_access.docker_compatible_realpath(kwargs.get("docker_stagedir") or "/var/lib/cwl") else: builder.outdir = builder.fs_access.realpath(kwargs.get("outdir") or tempfile.mkdtemp()) builder.tmpdir = builder.fs_access.realpath(kwargs.get("tmpdir") or tempfile.mkdtemp()) @@ -611,7 +641,6 @@ def _init_job(self, joborder, **kwargs): key = lambda dict: dict["position"] builder.bindings.sort(key=key) builder.resources = self.evalResources(builder, kwargs) - builder.job_script_provider = kwargs.get("job_script_provider", None) return builder diff --git a/cwltool/resolver.py b/cwltool/resolver.py index fab6b0bcb..56e16e37e 100644 --- a/cwltool/resolver.py +++ b/cwltool/resolver.py @@ -12,7 +12,7 @@ def resolve_local(document_loader, uri): if uri.startswith("/"): return None - shares = [os.environ.get("XDG_DATA_HOME", os.path.join(os.environ["HOME"], ".local", "share"))] + shares = [os.environ.get("XDG_DATA_HOME", os.path.join(os.path.expanduser('~'), ".local", "share"))] shares.extend(os.environ.get("XDG_DATA_DIRS", "/usr/local/share/:/usr/share/").split(":")) shares = [os.path.join(s, "commonwl", uri) for s in shares] shares.insert(0, os.path.join(os.getcwd(), uri)) diff --git a/cwltool/sandboxjs.py b/cwltool/sandboxjs.py index c3960daa2..8e7b91543 100644 --- a/cwltool/sandboxjs.py +++ b/cwltool/sandboxjs.py @@ -6,13 +6,19 @@ import select import subprocess import threading +import sys from io import BytesIO from typing import Any, Dict, List, Mapping, Text, Tuple, Union - +from .utils import onWindows from pkg_resources import resource_stream import six +try: + import queue # type: ignore +except ImportError: + import Queue as queue # type: ignore + class JavascriptException(Exception): pass @@ -118,7 +124,7 @@ def new_js_proc(): def execjs(js, jslib, timeout=None, debug=False): # type: (Union[Mapping, Text], Any, int, bool) -> JSON - if not hasattr(localdata, "proc") or localdata.proc.poll() is not None: + if not hasattr(localdata, "proc") or localdata.proc.poll() is not None or onWindows(): localdata.proc = new_js_proc() nodejs = localdata.proc @@ -147,26 +153,116 @@ def term(): rselect = [nodejs.stdout, nodejs.stderr] # type: List[BytesIO] wselect = [nodejs.stdin] # type: List[BytesIO] - while (len(wselect) + len(rselect)) > 0: - rready, wready, _ = select.select(rselect, wselect, []) - try: - if nodejs.stdin in wready: - b = stdin_buf.read(select.PIPE_BUF) + + # On windows system standard input/output are not handled properly by select module(modules like pywin32, msvcrt, gevent don't work either) + if sys.platform=='win32': + READ_BYTES_SIZE = 512 + + # creating queue for reading from a thread to queue + input_queue = queue.Queue() + output_queue = queue.Queue() + error_queue = queue.Queue() + + # To tell threads that output has ended and threads can safely exit + no_more_output = threading.Lock() + no_more_output.acquire() + no_more_error = threading.Lock() + no_more_error.acquire() + + # put constructed command to input queue which then will be passed to nodejs's stdin + def put_input(input_queue): + while True: + b = stdin_buf.read(READ_BYTES_SIZE) if b: - os.write(nodejs.stdin.fileno(), b) + input_queue.put(b) else: - wselect = [] - for pipes in ((nodejs.stdout, stdout_buf), (nodejs.stderr, stderr_buf)): - if pipes[0] in rready: - b = os.read(pipes[0].fileno(), select.PIPE_BUF) + break + + # get the output from nodejs's stdout and continue till otuput ends + def get_output(output_queue): + while not no_more_output.acquire(False): + b=os.read(nodejs.stdout.fileno(), READ_BYTES_SIZE) + if b: + output_queue.put(b) + + # get the output from nodejs's stderr and continue till error output ends + def get_error(error_queue): + while not no_more_error.acquire(False): + b = os.read(nodejs.stderr.fileno(), READ_BYTES_SIZE) + if b: + error_queue.put(b) + + # Threads managing nodejs.stdin, nodejs.stdout and nodejs.stderr respectively + input_thread = threading.Thread(target=put_input, args=(input_queue,)) + input_thread.daemon=True + input_thread.start() + output_thread = threading.Thread(target=get_output, args=(output_queue,)) + output_thread.daemon=True + output_thread.start() + error_thread = threading.Thread(target=get_error, args=(error_queue,)) + error_thread.daemon=True + error_thread.start() + + # mark if output/error is ready + output_ready=False + error_ready=False + + while (len(wselect) + len(rselect)) > 0: + try: + if nodejs.stdin in wselect: + if not input_queue.empty(): + os.write(nodejs.stdin.fileno(), input_queue.get()) + elif not input_thread.is_alive(): + wselect = [] + if nodejs.stdout in rselect: + if not output_queue.empty(): + output_ready = True + stdout_buf.write(output_queue.get()) + elif output_ready: + rselect = [] + no_more_output.release() + no_more_error.release() + output_thread.join() + + if nodejs.stderr in rselect: + if not error_queue.empty(): + error_ready = True + stderr_buf.write(error_queue.get()) + elif error_ready: + rselect = [] + no_more_output.release() + no_more_error.release() + output_thread.join() + error_thread.join() + if stdout_buf.getvalue().endswith("\n"): + rselect = [] + no_more_output.release() + no_more_error.release() + output_thread.join() + except OSError as e: + break + + else: + while (len(wselect) + len(rselect)) > 0: + rready, wready, _ = select.select(rselect, wselect, []) + try: + if nodejs.stdin in wready: + b = stdin_buf.read(select.PIPE_BUF) if b: - pipes[1].write(b) + os.write(nodejs.stdin.fileno(), b) else: - rselect.remove(pipes[0]) - if stdout_buf.getvalue().endswith("\n".encode()): - rselect = [] - except OSError as e: - break + wselect = [] + for pipes in ((nodejs.stdout, stdout_buf), (nodejs.stderr, stderr_buf)): + if pipes[0] in rready: + b = os.read(pipes[0].fileno(), select.PIPE_BUF) + if b: + pipes[1].write(b) + else: + rselect.remove(pipes[0]) + if stdout_buf.getvalue().endswith("\n".encode()): + rselect = [] + except OSError as e: + break tm.cancel() stdin_buf.close() @@ -202,6 +298,9 @@ def stdfmt(data): # type: (Text) -> Text raise JavascriptException(info) else: try: + # On windows currently a new instance of nodejs process is used due to problem with blocking on read operation on windows + if onWindows(): + nodejs.kill() return json.loads(stdoutdata.decode('utf-8')) except ValueError as e: raise JavascriptException(u"%s\nscript was:\n%s\nstdout was: '%s'\nstderr was: '%s'\n" % diff --git a/cwltool/stdfsaccess.py b/cwltool/stdfsaccess.py index 5b26b7410..0c03e5804 100644 --- a/cwltool/stdfsaccess.py +++ b/cwltool/stdfsaccess.py @@ -3,6 +3,8 @@ import os from typing import BinaryIO, List, Union, Text, IO, overload +from .utils import onWindows + import six from six.moves import urllib from schema_salad.ref_resolver import file_uri, uri_file_path @@ -11,7 +13,10 @@ def abspath(src, basedir): # type: (Text, Text) -> Text if src.startswith(u"file://"): ab = six.text_type(uri_file_path(str(src))) else: - ab = src if os.path.isabs(src) else os.path.join(basedir, src) + if basedir.startswith(u"file://"): + ab = src if os.path.isabs(src) else basedir+ '/'+ src + else: + ab = src if os.path.isabs(src) else os.path.join(basedir, src) return ab class StdFsAccess(object): @@ -54,3 +59,11 @@ def join(self, path, *paths): # type: (Text, *Text) -> Text def realpath(self, path): # type: (Text) -> Text return os.path.realpath(path) + + # On windows os.path.realpath appends unecessary Drive, here we would avoid that + def docker_compatible_realpath(self, path): # type: (Text) -> Text + if onWindows(): + if path.startswith('/'): + return path + return '/'+path + return os.path.realpath(path) diff --git a/cwltool/utils.py b/cwltool/utils.py index da71c9699..b78359e51 100644 --- a/cwltool/utils.py +++ b/cwltool/utils.py @@ -2,8 +2,13 @@ # no imports from cwltool allowed +import os +import shutil +import stat +import six +from six.moves import urllib from six.moves import zip_longest -from typing import Any, Dict, List, Tuple, Text, Union +from typing import Any,Callable, Dict, List, Tuple, Text, Union def aslist(l): # type: (Any) -> List[Any] @@ -22,6 +27,95 @@ def get_feature(self, feature): # type: (Any, Any) -> Tuple[Any, bool] return (t, False) return (None, None) + +def copytree_with_merge(src, dst, symlinks=False, ignore=None): + # type: (Text, Text, bool, Callable[..., Any]) -> None + if not os.path.exists(dst): + os.makedirs(dst) + shutil.copystat(src, dst) + lst = os.listdir(src) + if ignore: + excl = ignore(src, lst) + lst = [x for x in lst if x not in excl] + for item in lst: + s = os.path.join(src, item) + d = os.path.join(dst, item) + if symlinks and os.path.islink(s): + if os.path.lexists(d): + os.remove(d) + os.symlink(os.readlink(s), d) + try: + st = os.lstat(s) + mode = stat.S_IMODE(st.st_mode) + os.lchmod(d, mode) + except: + pass # lchmod not available, only available on unix + elif os.path.isdir(s): + copytree_with_merge(s, d, symlinks, ignore) + else: + shutil.copy2(s, d) + + +# changes windowspath(only) appropriately to be passed to docker run command +# as docker treat them as unix paths so convert C:\Users\foo to /C/Users/foo +def docker_windows_path_adjust(path): + # type: (Text) -> (Text) + if path is not None and onWindows(): + sp=path.split(':') + if len(sp)==2: + sp[0]=sp[0].capitalize() # Capitalizing windows Drive letters + path=':'.join(sp) + path = path.replace(':', '').replace('\\', '/') + return path if path[0] == '/' else '/' + path + return path + + +# changes docker path(only on windows os) appropriately back to Windows path +# so convert /C/Users/foo to C:\Users\foo +def docker_windows_reverse_path_adjust(path): + # type: (Text) -> (Text) + if path is not None and onWindows(): + if path[0] == '/': + path=path[1:] + else: + raise ValueError("not a docker path") + splitpath=path.split('/') + splitpath[0]= splitpath[0]+':' + return '\\'.join(splitpath) + return path + + +# On docker in windows fileuri do not contain : in path +# To convert this file uri to windows compatible add : after drove letter, +# so file:///E/var becomes file:///E:/var +def docker_windows_reverse_fileuri_adjust(fileuri): + # type: (Text) -> (Text) + if fileuri is not None and onWindows(): + if urllib.parse.urlsplit(fileuri).scheme == "file": + filesplit= fileuri.split("/") + if filesplit[3][-1] != ':': + filesplit[3]=filesplit[3]+':' + return '/'.join(filesplit) + else: + return fileuri + else: + raise ValueError("not a file URI") + return fileuri + + +# Check if we are on windows OS +def onWindows(): + # type: () -> (bool) + return os.name == 'nt' + + + +# On windows os.path.join would use backslash to join path, since we would use these paths in Docker we would convert it to / +def convert_pathsep_to_unix(path): # type: (Text) -> (Text) + if path is not None and onWindows(): + return path.replace('\\', '/') + return path + # comparision function to be used in sorting # python3 doesn't allow sorting of different # types like str() and int(). @@ -52,6 +146,7 @@ def cmp_like_py2(dict1, dict2): # type: (Dict[Text, Any], Dict[Text, Any]) -> i # if both lists are equal return 0 + # util function to convert any present byte string # to unicode string. input is a dict of nested dicts and lists def bytes2str_in_dicts(a): diff --git a/requirements.txt b/requirements.txt index 1adf38667..320d77bb2 100755 --- a/requirements.txt +++ b/requirements.txt @@ -5,3 +5,4 @@ rdflib-jsonld==0.4.0 shellescape==3.4.1 schema-salad>=2.6,<3 typing==3.5.3 + diff --git a/tests/test_check.py b/tests/test_check.py index 4e8b13efc..cd715412b 100644 --- a/tests/test_check.py +++ b/tests/test_check.py @@ -6,12 +6,16 @@ import cwltool.pathmapper import cwltool.process import cwltool.workflow +import pytest from cwltool.main import main +from cwltool.utils import onWindows from .util import get_data class TestCheck(unittest.TestCase): + @pytest.mark.skipif(onWindows(), + reason="Instance of Cwltool is used, On windows that invoke a default docker Container") def test_output_checking(self): self.assertEquals(main([get_data('tests/wf/badout1.cwl')]), 1) self.assertEquals(main([get_data('tests/wf/badout2.cwl')]), 1) diff --git a/tests/test_ext.py b/tests/test_ext.py index dedfa4d74..6f690d350 100644 --- a/tests/test_ext.py +++ b/tests/test_ext.py @@ -3,6 +3,7 @@ import shutil import tempfile import unittest +import pytest import cwltool.expression as expr import cwltool.factory @@ -10,10 +11,12 @@ import cwltool.process import cwltool.workflow from cwltool.main import main - +from cwltool.utils import onWindows from .util import get_data +@pytest.mark.skipif(onWindows(), + reason="Instance of Cwltool is used, On windows that invoke a default docker Container") class TestListing(unittest.TestCase): def test_missing_enable_ext(self): # Require that --enable-ext is provided. @@ -40,6 +43,8 @@ def test_listing_v1_0(self): # # Default behavior in 1.1 will be no expansion # self.assertEquals(main([get_data('tests/wf/listing_v1_1.cwl'), get_data('tests/listing-job.yml')]), 1) +@pytest.mark.skipif(onWindows(), + reason="InplaceUpdate uses symlinks,does not run on windows without admin privileges") class TestInplaceUpdate(unittest.TestCase): def test_updateval(self): diff --git a/tests/test_relax_path_checks.py b/tests/test_relax_path_checks.py index 559c11099..52bf5e4ce 100644 --- a/tests/test_relax_path_checks.py +++ b/tests/test_relax_path_checks.py @@ -1,8 +1,10 @@ from __future__ import absolute_import import unittest +import pytest from tempfile import NamedTemporaryFile from cwltool.main import main +from cwltool.utils import onWindows class ToolArgparse(unittest.TestCase): @@ -24,11 +26,15 @@ class ToolArgparse(unittest.TestCase): baseCommand: [cat] ''' + @pytest.mark.skipif(onWindows(), + reason="Instance of Cwltool is used, On windows that invoke a default docker Container") def test_spaces_in_input_files(self): - with NamedTemporaryFile(mode='w') as f: + with NamedTemporaryFile(mode='w', delete=False) as f: f.write(self.script) f.flush() - with NamedTemporaryFile(prefix="test with spaces") as spaces: + f.close() + with NamedTemporaryFile(prefix="test with spaces", delete=False) as spaces: + spaces.close() self.assertEquals( main(["--debug", f.name, '--input', spaces.name]), 1) self.assertEquals( diff --git a/tests/test_toolargparse.py b/tests/test_toolargparse.py index ad2cd55ba..62dcca130 100644 --- a/tests/test_toolargparse.py +++ b/tests/test_toolargparse.py @@ -1,12 +1,13 @@ from __future__ import absolute_import import unittest +import pytest from tempfile import NamedTemporaryFile from cwltool.main import main +from cwltool.utils import onWindows from .util import get_data - class ToolArgparse(unittest.TestCase): script = ''' #!/usr/bin/env cwl-runner @@ -65,37 +66,46 @@ class ToolArgparse(unittest.TestCase): outputs: [] ''' + @pytest.mark.skipif(onWindows(), + reason="Instance of Cwltool is used, On windows that invoke a default docker Container") def test_help(self): - with NamedTemporaryFile(mode='w') as f: + with NamedTemporaryFile(mode='w', delete=False) as f: f.write(self.script) f.flush() + f.close() self.assertEquals(main(["--debug", f.name, '--input', get_data('tests/echo.cwl')]), 0) self.assertEquals(main(["--debug", f.name, '--input', get_data('tests/echo.cwl')]), 0) + + @pytest.mark.skipif(onWindows(), + reason="Instance of Cwltool is used, On windows that invoke a default docker Container") def test_bool(self): - with NamedTemporaryFile(mode='w') as f: + with NamedTemporaryFile(mode='w', delete=False) as f: f.write(self.script2) f.flush() + f.close() try: self.assertEquals(main([f.name, '--help']), 0) except SystemExit as e: self.assertEquals(e.code, 0) def test_record_help(self): - with NamedTemporaryFile(mode='w') as f: + with NamedTemporaryFile(mode='w', delete=False) as f: f.write(self.script3) f.flush() + f.close() try: self.assertEquals(main([f.name, '--help']), 0) except SystemExit as e: self.assertEquals(e.code, 0) def test_record(self): - with NamedTemporaryFile(mode='w') as f: + with NamedTemporaryFile(mode='w', delete=False) as f: f.write(self.script3) f.flush() + f.close() try: self.assertEquals(main([f.name, '--foo.one', get_data('tests/echo.cwl'), '--foo.two', 'test']), 0) diff --git a/tests/util.py b/tests/util.py index a9251ccbf..3195a305d 100644 --- a/tests/util.py +++ b/tests/util.py @@ -6,6 +6,8 @@ def get_data(filename): + filename = os.path.normpath( + filename) # normalizing path depending on OS or else it will cause problem when joining path filepath = None try: filepath = resource_filename( diff --git a/tests/wf/echo.cwl b/tests/wf/echo.cwl index 9634be278..0dbbffc12 100644 --- a/tests/wf/echo.cwl +++ b/tests/wf/echo.cwl @@ -11,8 +11,9 @@ inputs: if sys.argv[1] == "2": exit(1) else: - f = open("foo"+sys.argv[1]+".txt", "w") - f.write(sys.argv[1]+"\n") + f = open("foo"+sys.argv[1]+".txt", "wb") + content = sys.argv[1]+"\n" + f.write(content.encode('utf-8')) if sys.argv[1] == "5": exit(1) outputs: