diff --git a/.travis.yml b/.travis.yml index f5ffeec..9b2e08e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,12 +1,10 @@ language: python python: -- '2.7' -- '3.5' - '3.6' +- '3.7' +- '3.8' before_install: -- sudo apt-get update -qq -- pip install toil[all]==3.20.0 -- pip install . --process-dependency-links +- pip install .[toil] - pip install -r dev-requirements.txt script: - flake8 wes_service wes_client diff --git a/Dockerfile b/Dockerfile index 2f1e5d7..4a450a1 100644 --- a/Dockerfile +++ b/Dockerfile @@ -12,8 +12,8 @@ RUN apt-get update && \ sh -c 'echo deb https://oss-binaries.phusionpassenger.com/apt/passenger stretch main > /etc/apt/sources.list.d/passenger.list' RUN apt-get update && \ - apt-get install -y --no-install-recommends passenger python-setuptools build-essential python-dev python-pip git && \ - pip install pip==9.0.3 + apt-get install -y --no-install-recommends passenger python3-setuptools build-essential python3-dev python3-pip git && \ + pip3 install pip==9.0.3 RUN apt-get install -y --no-install-recommends libcurl4-openssl-dev libssl1.0-dev diff --git a/cwl_flask.py b/cwl_flask.py index c269453..96d4a55 100644 --- a/cwl_flask.py +++ b/cwl_flask.py @@ -27,24 +27,27 @@ def begin(self): loghandle, self.logname = tempfile.mkstemp() with self.updatelock: self.outdir = tempfile.mkdtemp() - self.proc = subprocess.Popen(["cwl-runner", self.path, "-"], - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=loghandle, - close_fds=True, - cwd=self.outdir) + self.proc = subprocess.Popen( + ["cwl-runner", self.path, "-"], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=loghandle, + close_fds=True, + cwd=self.outdir, + ) self.status = { "id": "%sjobs/%i" % (request.url_root, self.jobid), "log": "%sjobs/%i/log" % (request.url_root, self.jobid), "run": self.path, "state": "Running", "input": json.loads(self.inputobj), - "output": None} + "output": None, + } def run(self): self.stdoutdata, self.stderrdata = self.proc.communicate(self.inputobj) if self.proc.returncode == 0: - outobj = yaml.load(self.stdoutdata) + outobj = yaml.load(self.stdoutdata, Loader=yaml.FullLoader) with self.updatelock: self.status["state"] = "Success" self.status["output"] = outobj @@ -75,7 +78,7 @@ def resume(self): self.status["state"] = "Running" -@app.route("/run", methods=['POST']) +@app.route("/run", methods=["POST"]) def runworkflow(): path = request.args["wf"] with jobs_lock: @@ -86,11 +89,11 @@ def runworkflow(): return redirect("/jobs/%i" % jobid, code=303) -@app.route("/jobs/", methods=['GET', 'POST']) +@app.route("/jobs/", methods=["GET", "POST"]) def jobcontrol(jobid): with jobs_lock: job = jobs[jobid] - if request.method == 'POST': + if request.method == "POST": action = request.args.get("action") if action: if action == "cancel": @@ -117,14 +120,14 @@ def logspooler(job): time.sleep(1) -@app.route("/jobs//log", methods=['GET']) +@app.route("/jobs//log", methods=["GET"]) def getlog(jobid): with jobs_lock: job = jobs[jobid] return Response(logspooler(job)) -@app.route("/jobs", methods=['GET']) +@app.route("/jobs", methods=["GET"]) def getjobs(): with jobs_lock: jobscopy = copy.copy(jobs) @@ -139,6 +142,7 @@ def spool(jc): else: yield ", " + json.dumps(j.getstatus(), indent=4) yield "]" + return Response(spool(jobscopy)) diff --git a/cwltool_stream.py b/cwltool_stream.py index 4d9440a..c9d3d95 100644 --- a/cwltool_stream.py +++ b/cwltool_stream.py @@ -34,7 +34,12 @@ def main(args=None): t = StringIO.StringIO(msg) err = StringIO.StringIO() - if cwltool.main.main(["--outdir="+outdir] + args + ["-"], stdin=t, stderr=err) != 0: + if ( + cwltool.main.main( + ["--outdir=" + outdir] + args + ["-"], stdin=t, stderr=err + ) + != 0 + ): sys.stdout.write(json.dumps({"cwl:error": err.getvalue()})) sys.stdout.write("\n\n") sys.stdout.flush() diff --git a/dev-requirements.txt b/dev-requirements.txt index 28ecaca..1a68f8c 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -1,2 +1,3 @@ flake8 pytest +black diff --git a/setup.py b/setup.py index 44bdf76..5e20c7f 100644 --- a/setup.py +++ b/setup.py @@ -10,47 +10,50 @@ with open("README.pypi.rst") as readmeFile: long_description = readmeFile.read() -setup(name='wes-service', - version='3.3', - description='GA4GH Workflow Execution Service reference implementation', - long_description=long_description, - author='GA4GH Containers and Workflows task team', - author_email='common-workflow-language@googlegroups.com', - url="https://github.com/common-workflow-language/cwltool-service", - download_url="https://github.com/common-workflow-language/cwltool-service", - license='Apache 2.0', - packages=["wes_service", "wes_client"], - package_data={'wes_service': ['openapi/workflow_execution_service.swagger.yaml']}, - include_package_data=True, - install_requires=[ - 'future', - 'connexion >= 2.0.2, < 3', - 'ruamel.yaml >= 0.12.4, <= 0.15.77', - 'schema-salad', - 'subprocess32==3.5.2' - ], - entry_points={ - 'console_scripts': ["wes-server=wes_service.wes_service_main:main", - "wes-client=wes_client.wes_client_main:main"] - }, - extras_require={ - "cwltool": ['cwlref-runner'], - "arvados": ["arvados-cwl-runner" - ], - "toil": ["toil[all]==3.20.0" - ]}, - zip_safe=False, - platforms=['MacOS X', 'Posix'], - classifiers=[ - 'Intended Audience :: Developers', - 'License :: OSI Approved :: Apache Software License', - 'Operating System :: MacOS :: MacOS X', - 'Operating System :: POSIX', - 'Programming Language :: Python', - 'Programming Language :: Python :: 2.7', - 'Programming Language :: Python :: 3.5', - 'Programming Language :: Python :: 3.6', - 'Programming Language :: Python :: 3.7', - 'Topic :: Software Development :: Libraries :: Python Modules' +setup( + name="wes-service", + version="4.0", + description="GA4GH Workflow Execution Service reference implementation", + long_description=long_description, + author="GA4GH Containers and Workflows task team", + author_email="common-workflow-language@googlegroups.com", + url="https://github.com/common-workflow-language/cwltool-service", + download_url="https://github.com/common-workflow-language/cwltool-service", + license="Apache 2.0", + python_requires="~=3.5", + setup_requires=['pytest-runner'], + tests_require=['pytest'], + packages=["wes_service", "wes_client"], + package_data={"wes_service": ["openapi/workflow_execution_service.swagger.yaml"]}, + include_package_data=True, + install_requires=[ + "connexion >= 2.0.2, < 3", + "ruamel.yaml >= 0.15.78, < 0.16", + "schema-salad", + "subprocess32==3.5.2", + ], + entry_points={ + "console_scripts": [ + "wes-server=wes_service.wes_service_main:main", + "wes-client=wes_client.wes_client_main:main", ] - ) + }, + extras_require={ + "cwltool": ["cwlref-runner"], + "arvados": ["arvados-cwl-runner"], + "toil": ["toil[cwl]==4.1.0"], + }, + zip_safe=False, + platforms=["MacOS X", "Posix"], + classifiers=[ + "Intended Audience :: Developers", + "License :: OSI Approved :: Apache Software License", + "Operating System :: MacOS :: MacOS X", + "Operating System :: POSIX", + "Programming Language :: Python", + "Programming Language :: Python :: 3.6", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", + "Topic :: Software Development :: Libraries :: Python Modules", + ], +) diff --git a/test/test_client_util.py b/test/test_client_util.py index 0122078..11ff7d6 100644 --- a/test/test_client_util.py +++ b/test/test_client_util.py @@ -1,5 +1,3 @@ -from __future__ import absolute_import - import unittest import os import logging diff --git a/test/test_integration.py b/test/test_integration.py index 3ed8ab2..cd17bcf 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -1,5 +1,3 @@ -from __future__ import absolute_import - import unittest import time import os diff --git a/wes_client/util.py b/wes_client/util.py index 255e8de..3dd68cc 100644 --- a/wes_client/util.py +++ b/wes_client/util.py @@ -18,24 +18,28 @@ def two_seven_compatible(filePath): """Determines if a python file is 2.7 compatible by seeing if it compiles in a subprocess""" try: - check_call(['python2', '-m', 'py_compile', filePath], stderr=DEVNULL) + check_call(["python2", "-m", "py_compile", filePath], stderr=DEVNULL) except CalledProcessError: - raise RuntimeError('Python files must be 2.7 compatible') + raise RuntimeError("Python files must be 2.7 compatible") return True def get_version(extension, workflow_file): - '''Determines the version of a .py, .wdl, or .cwl file.''' - if extension == 'py' and two_seven_compatible(workflow_file): - return '2.7' - elif extension == 'cwl': - return yaml.load(open(workflow_file))['cwlVersion'] + """Determines the version of a .py, .wdl, or .cwl file.""" + if extension == "py" and two_seven_compatible(workflow_file): + return "2.7" + elif extension == "cwl": + return yaml.load(open(workflow_file), Loader=yaml.FullLoader)["cwlVersion"] else: # Must be a wdl file. # Borrowed from https://github.com/Sage-Bionetworks/synapse-orchestrator/blob/develop/synorchestrator/util.py#L142 try: - return [l.lstrip('version') for l in workflow_file.splitlines() if 'version' in l.split(' ')][0] + return [ + entry.lstrip("version") + for entry in workflow_file.splitlines() + if "version" in entry.split(" ") + ][0] except IndexError: - return 'draft-2' + return "draft-2" def wf_info(workflow_path): @@ -47,25 +51,39 @@ def wf_info(workflow_path): enable our approach to version checking, then removed after version is extracted. """ - supported_formats = ['py', 'wdl', 'cwl'] - file_type = workflow_path.lower().split('.')[-1] # Grab the file extension - workflow_path = workflow_path if ':' in workflow_path else 'file://' + workflow_path + supported_formats = ["py", "wdl", "cwl"] + file_type = workflow_path.lower().split(".")[-1] # Grab the file extension + workflow_path = workflow_path if ":" in workflow_path else "file://" + workflow_path if file_type in supported_formats: - if workflow_path.startswith('file://'): + if workflow_path.startswith("file://"): version = get_version(file_type, workflow_path[7:]) - elif workflow_path.startswith('https://') or workflow_path.startswith('http://'): + elif workflow_path.startswith("https://") or workflow_path.startswith( + "http://" + ): # If file not local go fetch it. html = urlopen(workflow_path).read() - local_loc = os.path.join(os.getcwd(), 'fetchedFromRemote.' + file_type) - with open(local_loc, 'w') as f: + local_loc = os.path.join(os.getcwd(), "fetchedFromRemote." + file_type) + with open(local_loc, "w") as f: f.write(html.decode()) - version = wf_info('file://' + local_loc)[0] # Don't take the file_type here, found it above. - os.remove(local_loc) # TODO: Find a way to avoid recreating file before version determination. + version = wf_info("file://" + local_loc)[ + 0 + ] # Don't take the file_type here, found it above. + os.remove( + local_loc + ) # TODO: Find a way to avoid recreating file before version determination. else: - raise NotImplementedError('Unsupported workflow file location: {}. Must be local or HTTP(S).'.format(workflow_path)) + raise NotImplementedError( + "Unsupported workflow file location: {}. Must be local or HTTP(S).".format( + workflow_path + ) + ) else: - raise TypeError('Unsupported workflow type: .{}. Must be {}.'.format(file_type, '.py, .cwl, or .wdl')) + raise TypeError( + "Unsupported workflow type: .{}. Must be {}.".format( + file_type, ".py, .cwl, or .wdl" + ) + ) return version, file_type.upper() @@ -76,10 +94,9 @@ def modify_jsonyaml_paths(jsonyaml_file): :param jsonyaml_file: Path to a json/yaml file. """ - loader = schema_salad.ref_resolver.Loader({ - "location": {"@type": "@id"}, - "path": {"@type": "@id"} - }) + loader = schema_salad.ref_resolver.Loader( + {"location": {"@type": "@id"}, "path": {"@type": "@id"}} + ) input_dict, _ = loader.resolve_ref(jsonyaml_file, checklinks=False) basedir = os.path.dirname(jsonyaml_file) @@ -88,7 +105,9 @@ def fixpaths(d): if isinstance(d, dict): if "path" in d: if ":" not in d["path"]: - local_path = os.path.normpath(os.path.join(os.getcwd(), basedir, d["path"])) + local_path = os.path.normpath( + os.path.join(os.getcwd(), basedir, d["path"]) + ) d["location"] = pathname2url(local_path) else: d["location"] = d["path"] @@ -106,7 +125,9 @@ def build_wes_request(workflow_file, json_path, attachments=None): :return: A list of tuples formatted to be sent in a post to the wes-server (Swagger API). """ - workflow_file = "file://" + workflow_file if ":" not in workflow_file else workflow_file + workflow_file = ( + "file://" + workflow_file if ":" not in workflow_file else workflow_file + ) wfbase = None if json_path.startswith("file://"): wfbase = os.path.dirname(json_path[7:]) @@ -119,14 +140,21 @@ def build_wes_request(workflow_file, json_path, attachments=None): wf_params = json_path wf_version, wf_type = wf_info(workflow_file) - parts = [("workflow_params", wf_params), - ("workflow_type", wf_type), - ("workflow_type_version", wf_version)] + parts = [ + ("workflow_params", wf_params), + ("workflow_type", wf_type), + ("workflow_type_version", wf_version), + ] if workflow_file.startswith("file://"): if wfbase is None: wfbase = os.path.dirname(workflow_file[7:]) - parts.append(("workflow_attachment", (os.path.basename(workflow_file[7:]), open(workflow_file[7:], "rb")))) + parts.append( + ( + "workflow_attachment", + (os.path.basename(workflow_file[7:]), open(workflow_file[7:], "rb")), + ) + ) parts.append(("workflow_url", os.path.basename(workflow_file[7:]))) else: parts.append(("workflow_url", workflow_file)) @@ -151,12 +179,12 @@ def build_wes_request(workflow_file, json_path, attachments=None): def expand_globs(attachments): expanded_list = [] for filepath in attachments: - if 'file://' in filepath: + if "file://" in filepath: for f in glob.glob(filepath[7:]): - expanded_list += ['file://' + os.path.abspath(f)] - elif ':' not in filepath: + expanded_list += ["file://" + os.path.abspath(f)] + elif ":" not in filepath: for f in glob.glob(filepath): - expanded_list += ['file://' + os.path.abspath(f)] + expanded_list += ["file://" + os.path.abspath(f)] else: expanded_list += [filepath] return set(expanded_list) @@ -173,9 +201,9 @@ def wes_reponse(postresult): class WESClient(object): def __init__(self, service): - self.auth = service['auth'] - self.proto = service['proto'] - self.host = service['host'] + self.auth = service["auth"] + self.proto = service["proto"] + self.host = service["host"] def get_service_info(self): """ @@ -190,8 +218,10 @@ def get_service_info(self): :param host: Port where the post request will be sent and the wes server listens at (default 8080) :return: The body of the get result as a dictionary. """ - postresult = requests.get("%s://%s/ga4gh/wes/v1/service-info" % (self.proto, self.host), - headers=self.auth) + postresult = requests.get( + "%s://%s/ga4gh/wes/v1/service-info" % (self.proto, self.host), + headers=self.auth, + ) return wes_reponse(postresult) def list_runs(self): @@ -206,8 +236,9 @@ def list_runs(self): :param host: Port where the post request will be sent and the wes server listens at (default 8080) :return: The body of the get result as a dictionary. """ - postresult = requests.get("%s://%s/ga4gh/wes/v1/runs" % (self.proto, self.host), - headers=self.auth) + postresult = requests.get( + "%s://%s/ga4gh/wes/v1/runs" % (self.proto, self.host), headers=self.auth + ) return wes_reponse(postresult) def run(self, wf, jsonyaml, attachments): @@ -225,9 +256,11 @@ def run(self, wf, jsonyaml, attachments): """ attachments = list(expand_globs(attachments)) parts = build_wes_request(wf, jsonyaml, attachments) - postresult = requests.post("%s://%s/ga4gh/wes/v1/runs" % (self.proto, self.host), - files=parts, - headers=self.auth) + postresult = requests.post( + "%s://%s/ga4gh/wes/v1/runs" % (self.proto, self.host), + files=parts, + headers=self.auth, + ) return wes_reponse(postresult) def cancel(self, run_id): @@ -240,8 +273,10 @@ def cancel(self, run_id): :param host: Port where the post request will be sent and the wes server listens at (default 8080) :return: The body of the delete result as a dictionary. """ - postresult = requests.post("%s://%s/ga4gh/wes/v1/runs/%s/cancel" % (self.proto, self.host, run_id), - headers=self.auth) + postresult = requests.post( + "%s://%s/ga4gh/wes/v1/runs/%s/cancel" % (self.proto, self.host, run_id), + headers=self.auth, + ) return wes_reponse(postresult) def get_run_log(self, run_id): @@ -254,8 +289,10 @@ def get_run_log(self, run_id): :param host: Port where the post request will be sent and the wes server listens at (default 8080) :return: The body of the get result as a dictionary. """ - postresult = requests.get("%s://%s/ga4gh/wes/v1/runs/%s" % (self.proto, self.host, run_id), - headers=self.auth) + postresult = requests.get( + "%s://%s/ga4gh/wes/v1/runs/%s" % (self.proto, self.host, run_id), + headers=self.auth, + ) return wes_reponse(postresult) def get_run_status(self, run_id): @@ -268,6 +305,8 @@ def get_run_status(self, run_id): :param host: Port where the post request will be sent and the wes server listens at (default 8080) :return: The body of the get result as a dictionary. """ - postresult = requests.get("%s://%s/ga4gh/wes/v1/runs/%s/status" % (self.proto, self.host, run_id), - headers=self.auth) + postresult = requests.get( + "%s://%s/ga4gh/wes/v1/runs/%s/status" % (self.proto, self.host, run_id), + headers=self.auth, + ) return wes_reponse(postresult) diff --git a/wes_client/wes_client_main.py b/wes_client/wes_client_main.py index e820a1c..34902c5 100644 --- a/wes_client/wes_client_main.py +++ b/wes_client/wes_client_main.py @@ -13,25 +13,50 @@ def main(argv=sys.argv[1:]): parser = argparse.ArgumentParser(description="Workflow Execution Service") - parser.add_argument("--host", type=str, default=os.environ.get("WES_API_HOST"), - help="Example: '--host=localhost:8080'. Defaults to WES_API_HOST.") - parser.add_argument("--auth", type=str, default=os.environ.get("WES_API_AUTH"), help="Format is 'Header: value' or just 'value'. If header name is not provided, value goes in the 'Authorization'. Defaults to WES_API_AUTH.") - parser.add_argument("--proto", type=str, default=os.environ.get("WES_API_PROTO", "https"), - help="Options: [http, https]. Defaults to WES_API_PROTO (https).") + parser.add_argument( + "--host", + type=str, + default=os.environ.get("WES_API_HOST"), + help="Example: '--host=localhost:8080'. Defaults to WES_API_HOST.", + ) + parser.add_argument( + "--auth", + type=str, + default=os.environ.get("WES_API_AUTH"), + help="Format is 'Header: value' or just 'value'. If header name is not provided, value goes in the 'Authorization'. Defaults to WES_API_AUTH.", + ) + parser.add_argument( + "--proto", + type=str, + default=os.environ.get("WES_API_PROTO", "https"), + help="Options: [http, https]. Defaults to WES_API_PROTO (https).", + ) parser.add_argument("--quiet", action="store_true", default=False) parser.add_argument("--outdir", type=str) - parser.add_argument("--attachments", type=str, default=None, - help='A comma separated list of attachments to include. Example: ' - '--attachments="testdata/dockstore-tool-md5sum.cwl,testdata/md5sum.input"') + parser.add_argument( + "--attachments", + type=str, + default=None, + help="A comma separated list of attachments to include. Example: " + '--attachments="testdata/dockstore-tool-md5sum.cwl,testdata/md5sum.input"', + ) parser.add_argument("--page", type=str, default=None) parser.add_argument("--page-size", type=int, default=None) exgroup = parser.add_mutually_exclusive_group() exgroup.add_argument("--run", action="store_true", default=False) - exgroup.add_argument("--get", type=str, default=None, - help="Specify a . Example: '--get='") - exgroup.add_argument("--log", type=str, default=None, - help="Specify a . Example: '--log='") + exgroup.add_argument( + "--get", + type=str, + default=None, + help="Specify a . Example: '--get='", + ) + exgroup.add_argument( + "--log", + type=str, + default=None, + help="Specify a . Example: '--log='", + ) exgroup.add_argument("--list", action="store_true", default=False) exgroup.add_argument("--info", action="store_true", default=False) exgroup.add_argument("--version", action="store_true", default=False) @@ -57,10 +82,12 @@ def main(argv=sys.argv[1:]): else: auth["Authorization"] = args.auth - client = WESClient({'auth': auth, 'proto': args.proto, 'host': args.host}) + client = WESClient({"auth": auth, "proto": args.proto, "host": args.host}) if args.list: - response = client.list_runs() # how to include: page_token=args.page, page_size=args.page_size ? + response = ( + client.list_runs() + ) # how to include: page_token=args.page, page_size=args.page_size ? json.dump(response, sys.stdout, indent=4) return 0 @@ -94,7 +121,7 @@ def main(argv=sys.argv[1:]): else: logging.basicConfig(level=logging.INFO) - args.attachments = "" if not args.attachments else args.attachments.split(',') + args.attachments = "" if not args.attachments else args.attachments.split(",") r = client.run(args.workflow_url, job_order, args.attachments) if args.wait: diff --git a/wes_service/arvados_wes.py b/wes_service/arvados_wes.py index 7e09a94..2157872 100644 --- a/wes_service/arvados_wes.py +++ b/wes_service/arvados_wes.py @@ -21,17 +21,22 @@ class MissingAuthorization(Exception): def get_api(authtoken=None): if authtoken is None: - if not connexion.request.headers.get('Authorization'): + if not connexion.request.headers.get("Authorization"): raise MissingAuthorization() - authtoken = connexion.request.headers['Authorization'] + authtoken = connexion.request.headers["Authorization"] if not authtoken.startswith("Bearer ") or authtoken.startswith("OAuth2 "): raise ValueError("Authorization token must start with 'Bearer '") authtoken = authtoken[7:] - return arvados.api_from_config(version="v1", apiconfig={ - "ARVADOS_API_HOST": os.environ["ARVADOS_API_HOST"], - "ARVADOS_API_TOKEN": authtoken, - "ARVADOS_API_HOST_INSECURE": os.environ.get("ARVADOS_API_HOST_INSECURE", "false"), # NOQA - }) + return arvados.api_from_config( + version="v1", + apiconfig={ + "ARVADOS_API_HOST": os.environ["ARVADOS_API_HOST"], + "ARVADOS_API_TOKEN": authtoken, + "ARVADOS_API_HOST_INSECURE": os.environ.get( + "ARVADOS_API_HOST_INSECURE", "false" + ), # NOQA + }, + ) statemap = { @@ -39,7 +44,7 @@ def get_api(authtoken=None): "Locked": "INITIALIZING", "Running": "RUNNING", "Complete": "COMPLETE", - "Cancelled": "CANCELED" + "Cancelled": "CANCELED", } @@ -52,11 +57,20 @@ def catch_exceptions_wrapper(self, *args, **kwargs): return orig_func(self, *args, **kwargs) except arvados.errors.ApiError as e: logging.exception("Failure") - return {"msg": e._get_reason(), "status_code": e.resp.status}, int(e.resp.status) + return ( + {"msg": e._get_reason(), "status_code": e.resp.status}, + int(e.resp.status), + ) except subprocess.CalledProcessError as e: return {"msg": str(e), "status_code": 500}, 500 except MissingAuthorization: - return {"msg": "'Authorization' header is missing or empty, expecting Arvados API token", "status_code": 401}, 401 + return ( + { + "msg": "'Authorization' header is missing or empty, expecting Arvados API token", + "status_code": 401, + }, + 401, + ) except ValueError as e: return {"msg": str(e), "status_code": 400}, 400 except Exception as e: @@ -67,22 +81,18 @@ def catch_exceptions_wrapper(self, *args, **kwargs): class ArvadosBackend(WESBackend): def GetServiceInfo(self): - stdout, stderr = subprocess.Popen(["arvados-cwl-runner", "--version"], stderr=subprocess.PIPE).communicate() + stdout, stderr = subprocess.Popen( + ["arvados-cwl-runner", "--version"], stderr=subprocess.PIPE + ).communicate() return { - "workflow_type_versions": { - "CWL": {"workflow_type_version": ["v1.0"]} - }, + "workflow_type_versions": {"CWL": {"workflow_type_version": ["v1.0"]}}, "supported_wes_versions": ["0.3.0", "1.0.0"], "supported_filesystem_protocols": ["http", "https", "keep"], - "workflow_engine_versions": { - "arvados-cwl-runner": str(stderr) - }, + "workflow_engine_versions": {"arvados-cwl-runner": str(stderr)}, "default_workflow_engine_parameters": [], "system_state_counts": {}, "auth_instructions_url": "http://doc.arvados.org/user/reference/api-tokens.html", - "tags": { - "ARVADOS_API_HOST": os.environ["ARVADOS_API_HOST"] - } + "tags": {"ARVADOS_API_HOST": os.environ["ARVADOS_API_HOST"]}, } @catch_exceptions @@ -93,43 +103,68 @@ def ListRuns(self, page_size=None, page_token=None, state_search=None): if page_token: paging = [["uuid", ">", page_token]] - requests = api.container_requests().list( - filters=[["requesting_container_uuid", "=", None], - ["container_uuid", "!=", None]] + paging, - select=["uuid", "command", "container_uuid"], - order=["uuid"], - limit=page_size).execute()["items"] - containers = api.containers().list( - filters=[["uuid", "in", [w["container_uuid"] for w in requests]]], - select=["uuid", "state"]).execute()["items"] + requests = ( + api.container_requests() + .list( + filters=[ + ["requesting_container_uuid", "=", None], + ["container_uuid", "!=", None], + ] + + paging, + select=["uuid", "command", "container_uuid"], + order=["uuid"], + limit=page_size, + ) + .execute()["items"] + ) + containers = ( + api.containers() + .list( + filters=[["uuid", "in", [w["container_uuid"] for w in requests]]], + select=["uuid", "state"], + ) + .execute()["items"] + ) uuidmap = {c["uuid"]: statemap[c["state"]] for c in containers} - workflow_list = [{"run_id": cr["uuid"], - "state": uuidmap.get(cr["container_uuid"])} - for cr in requests - if cr["command"] and cr["command"][0] == "arvados-cwl-runner"] + workflow_list = [ + {"run_id": cr["uuid"], "state": uuidmap.get(cr["container_uuid"])} + for cr in requests + if cr["command"] and cr["command"][0] == "arvados-cwl-runner" + ] return { "workflows": workflow_list, - "next_page_token": workflow_list[-1]["run_id"] if workflow_list else "" + "next_page_token": workflow_list[-1]["run_id"] if workflow_list else "", } def log_for_run(self, run_id, message, authtoken=None): - get_api(authtoken).logs().create(body={"log": {"object_uuid": run_id, - "event_type": "stderr", - "properties": {"text": message+"\n"}}}).execute() - - def invoke_cwl_runner(self, cr_uuid, workflow_url, workflow_params, - env, project_uuid, - tempdir): - api = arvados.api_from_config(version="v1", apiconfig={ - "ARVADOS_API_HOST": env["ARVADOS_API_HOST"], - "ARVADOS_API_TOKEN": env['ARVADOS_API_TOKEN'], - "ARVADOS_API_HOST_INSECURE": env["ARVADOS_API_HOST_INSECURE"] # NOQA - }) + get_api(authtoken).logs().create( + body={ + "log": { + "object_uuid": run_id, + "event_type": "stderr", + "properties": {"text": message + "\n"}, + } + } + ).execute() + + def invoke_cwl_runner( + self, cr_uuid, workflow_url, workflow_params, env, project_uuid, tempdir + ): + api = arvados.api_from_config( + version="v1", + apiconfig={ + "ARVADOS_API_HOST": env["ARVADOS_API_HOST"], + "ARVADOS_API_TOKEN": env["ARVADOS_API_TOKEN"], + "ARVADOS_API_HOST_INSECURE": env["ARVADOS_API_HOST_INSECURE"], # NOQA + }, + ) try: - with tempfile.NamedTemporaryFile("wt", dir=tempdir, suffix=".json") as inputtemp: + with tempfile.NamedTemporaryFile( + "wt", dir=tempdir, suffix=".json" + ) as inputtemp: json.dump(workflow_params, inputtemp) inputtemp.flush() @@ -138,47 +173,70 @@ def invoke_cwl_runner(self, cr_uuid, workflow_url, workflow_params, for f in files: msg += " " + dirpath + "/" + f + "\n" - self.log_for_run(cr_uuid, "Contents of %s:\n%s" % (tempdir, msg), - env['ARVADOS_API_TOKEN']) + self.log_for_run( + cr_uuid, + "Contents of %s:\n%s" % (tempdir, msg), + env["ARVADOS_API_TOKEN"], + ) # TODO: run submission process in a container to prevent # a-c-r submission processes from seeing each other. - cmd = ["arvados-cwl-runner", "--submit-request-uuid="+cr_uuid, - "--submit", "--no-wait", "--api=containers", "--debug"] + cmd = [ + "arvados-cwl-runner", + "--submit-request-uuid=" + cr_uuid, + "--submit", + "--no-wait", + "--api=containers", + "--debug", + ] if project_uuid: - cmd.append("--project-uuid="+project_uuid) + cmd.append("--project-uuid=" + project_uuid) cmd.append(workflow_url) cmd.append(inputtemp.name) - self.log_for_run(cr_uuid, "Executing %s" % cmd, env['ARVADOS_API_TOKEN']) - - proc = subprocess.Popen(cmd, env=env, - cwd=tempdir, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) + self.log_for_run( + cr_uuid, "Executing %s" % cmd, env["ARVADOS_API_TOKEN"] + ) + + proc = subprocess.Popen( + cmd, + env=env, + cwd=tempdir, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) (stdoutdata, stderrdata) = proc.communicate() if proc.returncode != 0: - api.container_requests().update(uuid=cr_uuid, body={"priority": 0}).execute() + api.container_requests().update( + uuid=cr_uuid, body={"priority": 0} + ).execute() - self.log_for_run(cr_uuid, stderrdata.decode("utf-8"), env['ARVADOS_API_TOKEN']) + self.log_for_run( + cr_uuid, stderrdata.decode("utf-8"), env["ARVADOS_API_TOKEN"] + ) if tempdir: shutil.rmtree(tempdir) except subprocess.CalledProcessError as e: - api.container_requests().update(uuid=cr_uuid, body={"priority": 0, - "name": "Cancelled container request", - "properties": {"arvados-cwl-runner-log": str(e)}}).execute() + api.container_requests().update( + uuid=cr_uuid, + body={ + "priority": 0, + "name": "Cancelled container request", + "properties": {"arvados-cwl-runner-log": str(e)}, + }, + ).execute() @catch_exceptions def RunWorkflow(self, **args): - if not connexion.request.headers.get('Authorization'): + if not connexion.request.headers.get("Authorization"): raise MissingAuthorization() - authtoken = connexion.request.headers['Authorization'] + authtoken = connexion.request.headers["Authorization"] if authtoken.startswith("Bearer ") or authtoken.startswith("OAuth2 "): authtoken = authtoken[7:] @@ -186,17 +244,28 @@ def RunWorkflow(self, **args): "PATH": os.environ["PATH"], "ARVADOS_API_HOST": os.environ["ARVADOS_API_HOST"], "ARVADOS_API_TOKEN": authtoken, - "ARVADOS_API_HOST_INSECURE": os.environ.get("ARVADOS_API_HOST_INSECURE", "false") # NOQA + "ARVADOS_API_HOST_INSECURE": os.environ.get( + "ARVADOS_API_HOST_INSECURE", "false" + ), # NOQA } api = get_api() - cr = api.container_requests().create(body={"container_request": - {"command": [""], - "container_image": "n/a", - "state": "Uncommitted", - "output_path": "n/a", - "priority": 500}}).execute() + cr = ( + api.container_requests() + .create( + body={ + "container_request": { + "command": [""], + "container_image": "n/a", + "state": "Uncommitted", + "output_path": "n/a", + "priority": 500, + } + } + ) + .execute() + ) try: tempdir, body = self.collect_attachments(cr["uuid"]) @@ -206,26 +275,52 @@ def RunWorkflow(self, **args): if workflow_engine_parameters: project_uuid = workflow_engine_parameters.get("project_uuid") - threading.Thread(target=self.invoke_cwl_runner, args=(cr["uuid"], - body["workflow_url"], - body["workflow_params"], - env, - project_uuid, - tempdir)).start() + threading.Thread( + target=self.invoke_cwl_runner, + args=( + cr["uuid"], + body["workflow_url"], + body["workflow_params"], + env, + project_uuid, + tempdir, + ), + ).start() except ValueError as e: self.log_for_run(cr["uuid"], "Bad request: " + str(e)) - cr = api.container_requests().update(uuid=cr["uuid"], - body={"container_request": { - "name": "Cancelled container request", - "priority": 0}}).execute() + cr = ( + api.container_requests() + .update( + uuid=cr["uuid"], + body={ + "container_request": { + "name": "Cancelled container request", + "priority": 0, + } + }, + ) + .execute() + ) return {"msg": str(e), "status_code": 400}, 400 except Exception as e: logging.exception("Error") - self.log_for_run(cr["uuid"], "An exception ocurred while handling your request: " + str(e)) - cr = api.container_requests().update(uuid=cr["uuid"], - body={"container_request": { - "name": "Cancelled container request", - "priority": 0}}).execute() + self.log_for_run( + cr["uuid"], + "An exception ocurred while handling your request: " + str(e), + ) + cr = ( + api.container_requests() + .update( + uuid=cr["uuid"], + body={ + "container_request": { + "name": "Cancelled container request", + "priority": 0, + } + }, + ) + .execute() + ) return {"msg": str(e), "status_code": 500}, 500 else: return {"run_id": cr["uuid"]} @@ -236,16 +331,24 @@ def GetRunLog(self, run_id): request = api.container_requests().get(uuid=run_id).execute() if request["container_uuid"]: - container = api.containers().get(uuid=request["container_uuid"]).execute() # NOQA - task_reqs = arvados.util.list_all(api.container_requests().list, filters=[["requesting_container_uuid", "=", container["uuid"]]]) - tasks = arvados.util.list_all(api.containers().list, filters=[["uuid", "in", [tr["container_uuid"] for tr in task_reqs]]]) + container = ( + api.containers().get(uuid=request["container_uuid"]).execute() + ) # NOQA + task_reqs = arvados.util.list_all( + api.container_requests().list, + filters=[["requesting_container_uuid", "=", container["uuid"]]], + ) + tasks = arvados.util.list_all( + api.containers().list, + filters=[["uuid", "in", [tr["container_uuid"] for tr in task_reqs]]], + ) containers_map = {c["uuid"]: c for c in tasks} containers_map[container["uuid"]] = container else: container = { "state": "Queued" if request["priority"] > 0 else "Cancelled", "exit_code": None, - "log": None + "log": None, } tasks = [] containers_map = {} @@ -253,7 +356,9 @@ def GetRunLog(self, run_id): outputobj = {} if request["output_uuid"]: - c = arvados.collection.CollectionReader(request["output_uuid"], api_client=api) + c = arvados.collection.CollectionReader( + request["output_uuid"], api_client=api + ) with c.open("cwl.output.json") as f: try: outputobj = json.load(f) @@ -262,7 +367,11 @@ def GetRunLog(self, run_id): def keepref(d): if isinstance(d, dict) and "location" in d: - d["location"] = "%sc=%s/_/%s" % (api._resourceDesc["keepWebServiceUrl"], c.portable_data_hash(), d["location"]) # NOQA + d["location"] = "%sc=%s/_/%s" % ( + api._resourceDesc["keepWebServiceUrl"], + c.portable_data_hash(), + d["location"], + ) # NOQA visit(outputobj, keepref) @@ -270,10 +379,12 @@ def log_object(cr): if cr["container_uuid"]: containerlog = containers_map[cr["container_uuid"]] else: - containerlog = {"started_at": "", - "finished_at": "", - "exit_code": None, - "log": ""} + containerlog = { + "started_at": "", + "finished_at": "", + "exit_code": None, + "log": "", + } r = { "name": cr["name"] or "", "cmd": cr["command"], @@ -281,11 +392,19 @@ def log_object(cr): "end_time": containerlog["finished_at"] or "", "stdout": "", "stderr": "", - "exit_code": containerlog["exit_code"] or 0 + "exit_code": containerlog["exit_code"] or 0, } if containerlog["log"]: - r["stdout_keep"] = "%sc=%s/_/%s" % (api._resourceDesc["keepWebServiceUrl"], containerlog["log"], "stdout.txt") # NOQA - r["stderr_keep"] = "%sc=%s/_/%s" % (api._resourceDesc["keepWebServiceUrl"], containerlog["log"], "stderr.txt") # NOQA + r["stdout_keep"] = "%sc=%s/_/%s" % ( + api._resourceDesc["keepWebServiceUrl"], + containerlog["log"], + "stdout.txt", + ) # NOQA + r["stderr_keep"] = "%sc=%s/_/%s" % ( + api._resourceDesc["keepWebServiceUrl"], + containerlog["log"], + "stderr.txt", + ) # NOQA r["stdout"] = "%s/x-dynamic-logs/stdout" % (connexion.request.url) r["stderr"] = "%s/x-dynamic-logs/stderr" % (connexion.request.url) @@ -295,12 +414,14 @@ def log_object(cr): "run_id": request["uuid"], "request": { "workflow_url": "", - "workflow_params": request["mounts"].get("/var/lib/cwl/cwl.input.json", {}).get("content", {}) + "workflow_params": request["mounts"] + .get("/var/lib/cwl/cwl.input.json", {}) + .get("content", {}), }, "state": statemap[container["state"]], "run_log": log_object(request), "task_logs": [log_object(t) for t in task_reqs], - "outputs": outputobj + "outputs": outputobj, } return r @@ -308,7 +429,9 @@ def log_object(cr): @catch_exceptions def CancelRun(self, run_id): # NOQA api = get_api() - request = api.container_requests().update(uuid=run_id, body={"priority": 0}).execute() # NOQA + request = ( + api.container_requests().update(uuid=run_id, body={"priority": 0}).execute() + ) # NOQA return {"run_id": request["uuid"]} @catch_exceptions @@ -316,29 +439,43 @@ def GetRunStatus(self, run_id): api = get_api() request = api.container_requests().get(uuid=run_id).execute() if request["container_uuid"]: - container = api.containers().get(uuid=request["container_uuid"]).execute() # NOQA + container = ( + api.containers().get(uuid=request["container_uuid"]).execute() + ) # NOQA elif request["priority"] == 0: container = {"state": "Cancelled"} else: container = {"state": "Queued"} - return {"run_id": request["uuid"], - "state": statemap[container["state"]]} + return {"run_id": request["uuid"], "state": statemap[container["state"]]} def dynamic_logs(run_id, logstream): api = get_api() cr = api.container_requests().get(uuid=run_id).execute() - l1 = [t["properties"]["text"] - for t in api.logs().list(filters=[["object_uuid", "=", run_id], - ["event_type", "=", logstream]], - order="created_at desc", - limit=100).execute()["items"]] + l1 = [ + t["properties"]["text"] + for t in api.logs() + .list( + filters=[["object_uuid", "=", run_id], ["event_type", "=", logstream]], + order="created_at desc", + limit=100, + ) + .execute()["items"] + ] if cr["container_uuid"]: - l2 = [t["properties"]["text"] - for t in api.logs().list(filters=[["object_uuid", "=", cr["container_uuid"]], - ["event_type", "=", logstream]], - order="created_at desc", - limit=100).execute()["items"]] + l2 = [ + t["properties"]["text"] + for t in api.logs() + .list( + filters=[ + ["object_uuid", "=", cr["container_uuid"]], + ["event_type", "=", logstream], + ], + order="created_at desc", + limit=100, + ) + .execute()["items"] + ] else: l2 = [] return "".join(reversed(l1)) + "".join(reversed(l2)) @@ -346,5 +483,7 @@ def dynamic_logs(run_id, logstream): def create_backend(app, opts): ab = ArvadosBackend(opts) - app.app.route('/ga4gh/wes/v1/runs//x-dynamic-logs/')(dynamic_logs) + app.app.route("/ga4gh/wes/v1/runs//x-dynamic-logs/")( + dynamic_logs + ) return ab diff --git a/wes_service/cwl_runner.py b/wes_service/cwl_runner.py index 483b994..13fb3a4 100644 --- a/wes_service/cwl_runner.py +++ b/wes_service/cwl_runner.py @@ -1,4 +1,3 @@ -from __future__ import print_function import json import os import subprocess @@ -12,7 +11,7 @@ def __init__(self, run_id): super(Workflow, self).__init__() self.run_id = run_id self.workdir = os.path.join(os.getcwd(), "workflows", self.run_id) - self.outdir = os.path.join(self.workdir, 'outdir') + self.outdir = os.path.join(self.workdir, "outdir") if not os.path.exists(self.outdir): os.makedirs(self.outdir) @@ -43,7 +42,9 @@ def run(self, request, tempdir, opts): with open(os.path.join(self.workdir, "cwl.input.json"), "w") as inputtemp: json.dump(request["workflow_params"], inputtemp) - workflow_url = request.get("workflow_url") # Will always be local path to descriptor cwl, or url. + workflow_url = request.get( + "workflow_url" + ) # Will always be local path to descriptor cwl, or url. output = open(os.path.join(self.workdir, "cwl.output.json"), "w") stderr = open(os.path.join(self.workdir, "stderr"), "w") @@ -53,12 +54,12 @@ def run(self, request, tempdir, opts): # replace any locally specified outdir with the default for e in extra: - if e.startswith('--outdir='): + if e.startswith("--outdir="): extra.remove(e) - extra.append('--outdir=' + self.outdir) + extra.append("--outdir=" + self.outdir) # link the cwl and json into the tempdir/cwd - if workflow_url.startswith('file://'): + if workflow_url.startswith("file://"): os.symlink(workflow_url[7:], os.path.join(tempdir, "wes_workflow.cwl")) workflow_url = os.path.join(tempdir, "wes_workflow.cwl") os.symlink(inputtemp.name, os.path.join(tempdir, "cwl.input.json")) @@ -66,11 +67,9 @@ def run(self, request, tempdir, opts): # build args and run command_args = [runner] + extra + [workflow_url, jsonpath] - proc = subprocess.Popen(command_args, - stdout=output, - stderr=stderr, - close_fds=True, - cwd=tempdir) + proc = subprocess.Popen( + command_args, stdout=output, stderr=stderr, close_fds=True, cwd=tempdir + ) output.close() stderr.close() with open(os.path.join(self.workdir, "pid"), "w") as pid: @@ -118,10 +117,7 @@ def getstate(self): def getstatus(self): state, exit_code = self.getstate() - return { - "run_id": self.run_id, - "state": state - } + return {"run_id": self.run_id, "state": state} def getlog(self): state, exit_code = self.getstate() @@ -148,10 +144,10 @@ def getlog(self): "end_time": "", "stdout": "", "stderr": stderr, - "exit_code": exit_code + "exit_code": exit_code, }, "task_logs": [], - "outputs": outputobj + "outputs": outputobj, } def cancel(self): @@ -161,18 +157,16 @@ def cancel(self): class CWLRunnerBackend(WESBackend): def GetServiceInfo(self): runner = self.getopt("runner", default="cwl-runner") - stdout, stderr = subprocess.Popen([runner, "--version"], stderr=subprocess.PIPE).communicate() + stdout, stderr = subprocess.Popen( + [runner, "--version"], stderr=subprocess.PIPE + ).communicate() r = { - "workflow_type_versions": { - "CWL": {"workflow_type_version": ["v1.0"]} - }, + "workflow_type_versions": {"CWL": {"workflow_type_version": ["v1.0"]}}, "supported_wes_versions": ["0.3.0", "1.0.0"], "supported_filesystem_protocols": ["file", "http", "https"], - "workflow_engine_versions": { - "cwl-runner": str(stderr) - }, + "workflow_engine_versions": {"cwl-runner": str(stderr)}, "system_state_counts": {}, - "tags": {} + "tags": {}, } return r @@ -181,15 +175,12 @@ def ListRuns(self, page_size=None, page_token=None, state_search=None): if not os.path.exists(os.path.join(os.getcwd(), "workflows")): return {"workflows": [], "next_page_token": ""} wf = [] - for l in os.listdir(os.path.join(os.getcwd(), "workflows")): - if os.path.isdir(os.path.join(os.getcwd(), "workflows", l)): - wf.append(Workflow(l)) + for entry in os.listdir(os.path.join(os.getcwd(), "workflows")): + if os.path.isdir(os.path.join(os.getcwd(), "workflows", entry)): + wf.append(Workflow(entry)) workflows = [{"run_id": w.run_id, "state": w.getstate()[0]} for w in wf] # NOQA - return { - "workflows": workflows, - "next_page_token": "" - } + return {"workflows": workflows, "next_page_token": ""} def RunWorkflow(self, **args): tempdir, body = self.collect_attachments() diff --git a/wes_service/toil_wes.py b/wes_service/toil_wes.py index 6f244a2..9f9aa30 100644 --- a/wes_service/toil_wes.py +++ b/wes_service/toil_wes.py @@ -1,4 +1,3 @@ -from __future__ import print_function import json import os import subprocess @@ -24,75 +23,87 @@ def __init__(self, run_id): super(ToilWorkflow, self).__init__() self.run_id = run_id - self.workdir = os.path.join(os.getcwd(), 'workflows', self.run_id) - self.outdir = os.path.join(self.workdir, 'outdir') + self.workdir = os.path.join(os.getcwd(), "workflows", self.run_id) + self.outdir = os.path.join(self.workdir, "outdir") if not os.path.exists(self.outdir): os.makedirs(self.outdir) - self.outfile = os.path.join(self.workdir, 'stdout') - self.errfile = os.path.join(self.workdir, 'stderr') - self.starttime = os.path.join(self.workdir, 'starttime') - self.endtime = os.path.join(self.workdir, 'endtime') - self.pidfile = os.path.join(self.workdir, 'pid') - self.statcompletefile = os.path.join(self.workdir, 'status_completed') - self.staterrorfile = os.path.join(self.workdir, 'status_error') - self.cmdfile = os.path.join(self.workdir, 'cmd') - self.jobstorefile = os.path.join(self.workdir, 'jobstore') - self.request_json = os.path.join(self.workdir, 'request.json') + self.outfile = os.path.join(self.workdir, "stdout") + self.errfile = os.path.join(self.workdir, "stderr") + self.starttime = os.path.join(self.workdir, "starttime") + self.endtime = os.path.join(self.workdir, "endtime") + self.pidfile = os.path.join(self.workdir, "pid") + self.statcompletefile = os.path.join(self.workdir, "status_completed") + self.staterrorfile = os.path.join(self.workdir, "status_error") + self.cmdfile = os.path.join(self.workdir, "cmd") + self.jobstorefile = os.path.join(self.workdir, "jobstore") + self.request_json = os.path.join(self.workdir, "request.json") self.input_json = os.path.join(self.workdir, "wes_input.json") - self.jobstore_default = 'file:' + os.path.join(self.workdir, 'toiljobstore') + self.jobstore_default = "file:" + os.path.join(self.workdir, "toiljobstore") self.jobstore = None def sort_toil_options(self, extra): # determine jobstore and set a new default if the user did not set one cloud = False for e in extra: - if e.startswith('--jobStore='): + if e.startswith("--jobStore="): self.jobstore = e[11:] - if self.jobstore.startswith(('aws', 'google', 'azure')): + if self.jobstore.startswith(("aws", "google", "azure")): cloud = True - if e.startswith(('--outdir=', '-o=')): + if e.startswith(("--outdir=", "-o=")): extra.remove(e) if not cloud: - extra.append('--outdir=' + self.outdir) + extra.append("--outdir=" + self.outdir) if not self.jobstore: - extra.append('--jobStore=' + self.jobstore_default) + extra.append("--jobStore=" + self.jobstore_default) self.jobstore = self.jobstore_default # store the jobstore location - with open(self.jobstorefile, 'w') as f: + with open(self.jobstorefile, "w") as f: f.write(self.jobstore) return extra - def write_workflow(self, request, opts, cwd, wftype='cwl'): + def write_workflow(self, request, opts, cwd, wftype="cwl"): """Writes a cwl, wdl, or python file as appropriate from the request dictionary.""" workflow_url = request.get("workflow_url") # link the cwl and json into the cwd - if workflow_url.startswith('file://'): - os.link(workflow_url[7:], os.path.join(cwd, "wes_workflow." + wftype)) + if workflow_url.startswith("file://"): + try: + os.link(workflow_url[7:], os.path.join(cwd, "wes_workflow." + wftype)) + except OSError: + os.symlink(workflow_url[7:], os.path.join(cwd, "wes_workflow." + wftype)) workflow_url = os.path.join(cwd, "wes_workflow." + wftype) - os.link(self.input_json, os.path.join(cwd, "wes_input.json")) + try: + os.link(self.input_json, os.path.join(cwd, "wes_input.json")) + except OSError: + os.symlink(self.input_json, os.path.join(cwd, "wes_input.json")) self.input_json = os.path.join(cwd, "wes_input.json") extra_options = self.sort_toil_options(opts.getoptlist("extra")) - if wftype == 'cwl': - command_args = ['toil-cwl-runner'] + extra_options + [workflow_url, self.input_json] - elif wftype == 'wdl': - command_args = ['toil-wdl-runner'] + extra_options + [workflow_url, self.input_json] - elif wftype == 'py': - command_args = ['python'] + extra_options + [workflow_url] + if wftype == "cwl": + command_args = ( + ["toil-cwl-runner"] + extra_options + [workflow_url, self.input_json] + ) + elif wftype == "wdl": + command_args = ( + ["toil-wdl-runner"] + extra_options + [workflow_url, self.input_json] + ) + elif wftype == "py": + command_args = ["python"] + extra_options + [workflow_url] else: - raise RuntimeError('workflow_type is not "cwl", "wdl", or "py": ' + str(wftype)) + raise RuntimeError( + 'workflow_type is not "cwl", "wdl", or "py": ' + str(wftype) + ) return command_args def write_json(self, request_dict): - input_json = os.path.join(self.workdir, 'input.json') - with open(input_json, 'w') as f: - json.dump(request_dict['workflow_params'], f) + input_json = os.path.join(self.workdir, "input.json") + with open(input_json, "w") as f: + json.dump(request_dict["workflow_params"], f) return input_json def call_cmd(self, cmd, cwd): @@ -104,16 +115,14 @@ def call_cmd(self, cmd, cwd): :param tempdir: :return: The pid of the command. """ - with open(self.cmdfile, 'w') as f: + with open(self.cmdfile, "w") as f: f.write(str(cmd)) - stdout = open(self.outfile, 'w') - stderr = open(self.errfile, 'w') - logging.info('Calling: ' + ' '.join(cmd)) - process = subprocess.Popen(cmd, - stdout=stdout, - stderr=stderr, - close_fds=True, - cwd=cwd) + stdout = open(self.outfile, "w") + stderr = open(self.errfile, "w") + logging.info("Calling: " + " ".join(cmd)) + process = subprocess.Popen( + cmd, stdout=stdout, stderr=stderr, close_fds=True, cwd=cwd + ) stdout.close() stderr.close() @@ -124,17 +133,17 @@ def cancel(self): def fetch(self, filename): if os.path.exists(filename): - with open(filename, 'r') as f: + with open(filename, "r") as f: return f.read() - return '' + return "" def getlog(self): state, exit_code = self.getstate() - with open(self.request_json, 'r') as f: + with open(self.request_json, "r") as f: request = json.load(f) - with open(self.jobstorefile, 'r') as f: + with open(self.jobstorefile, "r") as f: self.jobstore = f.read() stderr = self.fetch(self.errfile) @@ -145,14 +154,16 @@ def getlog(self): outputobj = {} if state == "COMPLETE": # only tested locally - if self.jobstore.startswith('file:'): + if self.jobstore.startswith("file:"): for f in os.listdir(self.outdir): - if f.startswith('out_tmpdir'): + if f.startswith("out_tmpdir"): shutil.rmtree(os.path.join(self.outdir, f)) for f in os.listdir(self.outdir): - outputobj[f] = {'location': os.path.join(self.outdir, f), - 'size': os.stat(os.path.join(self.outdir, f)).st_size, - 'class': 'File'} + outputobj[f] = { + "location": os.path.join(self.outdir, f), + "size": os.stat(os.path.join(self.outdir, f)).st_size, + "class": "File", + } return { "run_id": self.run_id, @@ -164,10 +175,10 @@ def getlog(self): "end_time": endtime, "stdout": "", "stderr": stderr, - "exit_code": exit_code + "exit_code": exit_code, }, "task_logs": [], - "outputs": outputobj + "outputs": outputobj, } def run(self, request, tempdir, opts): @@ -192,21 +203,25 @@ def run(self, request, tempdir, opts): specifically the runner and runner options :return: {"run_id": self.run_id, "state": state} """ - wftype = request['workflow_type'].lower().strip() - version = request['workflow_type_version'] - - if version != 'v1.0' and wftype == 'cwl': - raise RuntimeError('workflow_type "cwl" requires ' - '"workflow_type_version" to be "v1.0": ' + str(version)) - if version != '2.7' and wftype == 'py': - raise RuntimeError('workflow_type "py" requires ' - '"workflow_type_version" to be "2.7": ' + str(version)) - - logging.info('Beginning Toil Workflow ID: ' + str(self.run_id)) - - with open(self.starttime, 'w') as f: + wftype = request["workflow_type"].lower().strip() + version = request["workflow_type_version"] + + if version != "v1.0" and wftype == "cwl": + raise RuntimeError( + 'workflow_type "cwl" requires ' + '"workflow_type_version" to be "v1.0": ' + str(version) + ) + if version != "2.7" and wftype == "py": + raise RuntimeError( + 'workflow_type "py" requires ' + '"workflow_type_version" to be "2.7": ' + str(version) + ) + + logging.info("Beginning Toil Workflow ID: " + str(self.run_id)) + + with open(self.starttime, "w") as f: f.write(str(time.time())) - with open(self.request_json, 'w') as f: + with open(self.request_json, "w") as f: json.dump(request, f) with open(self.input_json, "w") as inputtemp: json.dump(request["workflow_params"], inputtemp) @@ -214,9 +229,9 @@ def run(self, request, tempdir, opts): command_args = self.write_workflow(request, opts, tempdir, wftype=wftype) pid = self.call_cmd(command_args, tempdir) - with open(self.endtime, 'w') as f: + with open(self.endtime, "w") as f: f.write(str(time.time())) - with open(self.pidfile, 'w') as f: + with open(self.pidfile, "w") as f: f.write(str(pid)) return self.getstatus() @@ -232,51 +247,48 @@ def getstate(self): """ # the jobstore never existed if not os.path.exists(self.jobstorefile): - logging.info('Workflow ' + self.run_id + ': QUEUED') + logging.info("Workflow " + self.run_id + ": QUEUED") return "QUEUED", -1 # completed earlier if os.path.exists(self.statcompletefile): - logging.info('Workflow ' + self.run_id + ': COMPLETE') + logging.info("Workflow " + self.run_id + ": COMPLETE") return "COMPLETE", 0 # errored earlier if os.path.exists(self.staterrorfile): - logging.info('Workflow ' + self.run_id + ': EXECUTOR_ERROR') + logging.info("Workflow " + self.run_id + ": EXECUTOR_ERROR") return "EXECUTOR_ERROR", 255 # the workflow is staged but has not run yet if not os.path.exists(self.errfile): - logging.info('Workflow ' + self.run_id + ': INITIALIZING') + logging.info("Workflow " + self.run_id + ": INITIALIZING") return "INITIALIZING", -1 # TODO: Query with "toil status" completed = False - with open(self.errfile, 'r') as f: + with open(self.errfile, "r") as f: for line in f: - if 'Traceback (most recent call last)' in line: - logging.info('Workflow ' + self.run_id + ': EXECUTOR_ERROR') - open(self.staterrorfile, 'a').close() + if "Traceback (most recent call last)" in line: + logging.info("Workflow " + self.run_id + ": EXECUTOR_ERROR") + open(self.staterrorfile, "a").close() return "EXECUTOR_ERROR", 255 # run can complete successfully but fail to upload outputs to cloud buckets # so save the completed status and make sure there was no error elsewhere - if 'Finished toil run successfully.' in line: + if "Finished toil run successfully." in line: completed = True if completed: - logging.info('Workflow ' + self.run_id + ': COMPLETE') - open(self.statcompletefile, 'a').close() + logging.info("Workflow " + self.run_id + ": COMPLETE") + open(self.statcompletefile, "a").close() return "COMPLETE", 0 - logging.info('Workflow ' + self.run_id + ': RUNNING') + logging.info("Workflow " + self.run_id + ": RUNNING") return "RUNNING", -1 def getstatus(self): state, exit_code = self.getstate() - return { - "run_id": self.run_id, - "state": state - } + return {"run_id": self.run_id, "state": state} class ToilBackend(WESBackend): @@ -284,16 +296,16 @@ class ToilBackend(WESBackend): def GetServiceInfo(self): return { - 'workflow_type_versions': { - 'CWL': {'workflow_type_version': ['v1.0']}, - 'WDL': {'workflow_type_version': ['draft-2']}, - 'PY': {'workflow_type_version': ['2.7']} + "workflow_type_versions": { + "CWL": {"workflow_type_version": ["v1.0"]}, + "WDL": {"workflow_type_version": ["draft-2"]}, + "PY": {"workflow_type_version": ["2.7"]}, }, - 'supported_wes_versions': ['0.3.0', '1.0.0'], - 'supported_filesystem_protocols': ['file', 'http', 'https'], - 'workflow_engine_versions': ['3.16.0'], - 'system_state_counts': {}, - 'key_values': {} + "supported_wes_versions": ["0.3.0", "1.0.0"], + "supported_filesystem_protocols": ["file", "http", "https"], + "workflow_engine_versions": ["3.16.0"], + "system_state_counts": {}, + "key_values": {}, } def ListRuns(self, page_size=None, page_token=None, state_search=None): @@ -301,15 +313,12 @@ def ListRuns(self, page_size=None, page_token=None, state_search=None): if not os.path.exists(os.path.join(os.getcwd(), "workflows")): return {"workflows": [], "next_page_token": ""} wf = [] - for l in os.listdir(os.path.join(os.getcwd(), "workflows")): - if os.path.isdir(os.path.join(os.getcwd(), "workflows", l)): - wf.append(ToilWorkflow(l)) + for entry in os.listdir(os.path.join(os.getcwd(), "workflows")): + if os.path.isdir(os.path.join(os.getcwd(), "workflows", entry)): + wf.append(ToilWorkflow(entry)) workflows = [{"run_id": w.run_id, "state": w.getstate()[0]} for w in wf] # NOQA - return { - "workflows": workflows, - "next_page_token": "" - } + return {"workflows": workflows, "next_page_token": ""} def RunWorkflow(self): tempdir, body = self.collect_attachments() @@ -319,7 +328,7 @@ def RunWorkflow(self): p = Process(target=job.run, args=(body, tempdir, self)) p.start() self.processes[run_id] = p - return {'run_id': run_id} + return {"run_id": run_id} def GetRunLog(self, run_id): job = ToilWorkflow(run_id) @@ -329,7 +338,7 @@ def CancelRun(self, run_id): # should this block with `p.is_alive()`? if run_id in self.processes: self.processes[run_id].terminate() - return {'run_id': run_id} + return {"run_id": run_id} def GetRunStatus(self, run_id): job = ToilWorkflow(run_id) diff --git a/wes_service/util.py b/wes_service/util.py index f720e34..c100d8c 100644 --- a/wes_service/util.py +++ b/wes_service/util.py @@ -3,7 +3,6 @@ import os import logging -from six import itervalues, iterlists import connexion from werkzeug.utils import secure_filename @@ -15,12 +14,13 @@ def visit(d, op): for i in d: visit(i, op) elif isinstance(d, dict): - for i in itervalues(d): + for i in d.values(): visit(i, op) class WESBackend(object): """Stores and retrieves options. Intended to be inherited.""" + def __init__(self, opts): """Parse and store options as a list of tuples.""" self.pairs = [] @@ -50,7 +50,7 @@ def collect_attachments(self, run_id=None): tempdir = tempfile.mkdtemp() body = {} has_attachments = False - for k, ls in iterlists(connexion.request.files): + for k, ls in connexion.request.files.lists(): try: for v in ls: if k == "workflow_attachment": @@ -62,10 +62,15 @@ def collect_attachments(self, run_id=None): dest = os.path.join(tempdir, *fn) if not os.path.isdir(os.path.dirname(dest)): os.makedirs(os.path.dirname(dest)) - self.log_for_run(run_id, "Staging attachment '%s' to '%s'" % (v.filename, dest)) + self.log_for_run( + run_id, + "Staging attachment '%s' to '%s'" % (v.filename, dest), + ) v.save(dest) has_attachments = True - body[k] = "file://%s" % tempdir # Reference to temp working dir. + body[k] = ( + "file://%s" % tempdir + ) # Reference to temp working dir. elif k in ("workflow_params", "tags", "workflow_engine_parameters"): content = v.read() body[k] = json.loads(content.decode("utf-8")) @@ -73,7 +78,7 @@ def collect_attachments(self, run_id=None): body[k] = v.read().decode() except Exception as e: raise ValueError("Error reading parameter '%s': %s" % (k, e)) - for k, ls in iterlists(connexion.request.form): + for k, ls in connexion.request.form.lists(): try: for v in ls: if not v: @@ -88,9 +93,15 @@ def collect_attachments(self, run_id=None): if "workflow_url" in body: if ":" not in body["workflow_url"]: if not has_attachments: - raise ValueError("Relative 'workflow_url' but missing 'workflow_attachment'") - body["workflow_url"] = "file://%s" % os.path.join(tempdir, secure_filename(body["workflow_url"])) - self.log_for_run(run_id, "Using workflow_url '%s'" % body.get("workflow_url")) + raise ValueError( + "Relative 'workflow_url' but missing 'workflow_attachment'" + ) + body["workflow_url"] = "file://%s" % os.path.join( + tempdir, secure_filename(body["workflow_url"]) + ) + self.log_for_run( + run_id, "Using workflow_url '%s'" % body.get("workflow_url") + ) else: raise ValueError("Missing 'workflow_url' in submission") diff --git a/wes_service/wes_service_main.py b/wes_service/wes_service_main.py index c6d08e7..6d089df 100644 --- a/wes_service/wes_service_main.py +++ b/wes_service/wes_service_main.py @@ -29,27 +29,36 @@ def setup(args=None): logging.info(" %s: %s", n, getattr(args, n)) app = connexion.App(__name__) - backend = utils.get_function_from_name( - args.backend + ".create_backend")(app, args.opt) + backend = utils.get_function_from_name(args.backend + ".create_backend")( + app, args.opt + ) def rs(x): - return getattr(backend, x.split('.')[-1]) + return getattr(backend, x.split(".")[-1]) app.add_api( - 'openapi/workflow_execution_service.swagger.yaml', - resolver=Resolver(rs)) + "openapi/workflow_execution_service.swagger.yaml", resolver=Resolver(rs) + ) return app def main(argv=sys.argv[1:]): - parser = argparse.ArgumentParser(description='Workflow Execution Service') - parser.add_argument("--backend", type=str, default="wes_service.cwl_runner", - help="Either: '--backend=wes_service.arvados_wes' or '--backend=wes_service.cwl_runner'") + parser = argparse.ArgumentParser(description="Workflow Execution Service") + parser.add_argument( + "--backend", + type=str, + default="wes_service.cwl_runner", + help="Either: '--backend=wes_service.arvados_wes' or '--backend=wes_service.cwl_runner'", + ) parser.add_argument("--port", type=int, default=8080) - parser.add_argument("--opt", type=str, action="append", - help="Example: '--opt runner=cwltoil --opt extra=--logLevel=CRITICAL' " - "or '--opt extra=--workDir=/'. Accepts multiple values.") + parser.add_argument( + "--opt", + type=str, + action="append", + help="Example: '--opt runner=cwltoil --opt extra=--logLevel=CRITICAL' " + "or '--opt extra=--workDir=/'. Accepts multiple values.", + ) parser.add_argument("--debug", action="store_true", default=False) parser.add_argument("--version", action="store_true", default=False) args = parser.parse_args(argv)