From d4851f60f4d7f8a4d52f52c63f687ebab2ec7a65 Mon Sep 17 00:00:00 2001 From: Jasper Koehorst Date: Tue, 16 Aug 2022 16:13:43 +0200 Subject: [PATCH 01/49] Implementation of store RDF export of the workflow in CWL Prov RO-Bundle. Added file size to prov RO-Bundle Creates a workflow.ttl file in the default folder while running. Then closes the file and after the provenance folder is created it is copied to the provenance/workflow/ folder. Update cwltool/main.py Co-authored-by: Michael R. Crusoe <1330696+mr-c@users.noreply.github.com> temp directory creation and removal added --- cwltool/main.py | 29 +++++++++++++++++++++++--- cwltool/provenance_profile.py | 39 ++++++++++++++++++++++++++++++++--- cwltool/workflow.py | 1 + 3 files changed, 63 insertions(+), 6 deletions(-) diff --git a/cwltool/main.py b/cwltool/main.py index 63e3d8679..8e21b1e19 100755 --- a/cwltool/main.py +++ b/cwltool/main.py @@ -8,9 +8,11 @@ import io import logging import os +import shutil import signal import subprocess # nosec import sys +import tempfile import time import urllib import warnings @@ -710,6 +712,7 @@ def setup_provenance( orcid=args.orcid, full_name=args.cwl_full_name, ) + runtimeContext.research_obj = ro log_file_io = ro.open_log_file_for_activity(ro.engine_uuid) prov_log_handler = logging.StreamHandler(log_file_io) @@ -1172,12 +1175,27 @@ def main( print(f"{args.workflow} is valid CWL.", file=stdout) return 0 - if args.print_rdf: + if args.print_rdf or args.provenance: + output = stdout + if args.provenance: + # Write workflow to temp directory + temp_workflow_dir = tempfile.TemporaryDirectory() + os.makedirs(temp_workflow_dir.name, exist_ok=True) + workflow_provenance = temp_workflow_dir.name + "/workflow.ttl" + # Sets up a turtle file for the workflow information (not yet in the provenance folder as it does + # not exist and creating it will give issues). + output = open(workflow_provenance, "w") + _logger.info("Writing workflow rdf to %s", workflow_provenance) print( printrdf(tool, loadingContext.loader.ctx, args.rdf_serializer), - file=stdout, + file=output, ) - return 0 + # close the output + if args.provenance: + output.close() + # Only print_rdf exits this way + if args.print_rdf: + return 0 if args.print_dot: printdot(tool, loadingContext.loader.ctx, stdout) @@ -1455,6 +1473,11 @@ def loc_to_path(obj: CWLObjectType) -> None: # public API for logging.StreamHandler prov_log_handler.close() research_obj.close(args.provenance) + # Copy workflow.ttl to args.provenance + if os.path.isfile(workflow_provenance): + shutil.copy(workflow_provenance, args.provenance + "/workflow/workflow.ttl") + # Remove temp directory + shutil.rmtree(temp_workflow_dir.name) _logger.removeHandler(stderr_handler) _logger.addHandler(defaultStreamHandler) diff --git a/cwltool/provenance_profile.py b/cwltool/provenance_profile.py index 0252127bb..edb47ab2b 100644 --- a/cwltool/provenance_profile.py +++ b/cwltool/provenance_profile.py @@ -19,7 +19,8 @@ cast, ) -from prov.identifier import Identifier, QualifiedName +from prov.identifier import Identifier, QualifiedName, Namespace + from prov.model import ( PROV, PROV_LABEL, @@ -32,6 +33,7 @@ from schema_salad.sourceline import SourceLine from typing_extensions import TYPE_CHECKING +import cwltool.workflow from .errors import WorkflowException from .job import CommandLineJob, JobBase from .loghandler import _logger @@ -272,11 +274,20 @@ def record_process_start( if isinstance(job, (CommandLineJob, JobBase, WorkflowJob)): name = job.name process_name = urllib.parse.quote(name, safe=":/,#") - process_run_id = self.start_process(process_name, datetime.datetime.now()) + # Iterator as step is not always 1, check with process_name to find the correct step.id + step = None + for step in process.steps: + if step.id.endswith("#" + process_name): + break + if step is None: + raise Exception("No / wrong step detected...!") + + process_run_id = self.start_process(step.id, process_name, datetime.datetime.now()) return process_run_id def start_process( self, + step_id: str, # The ID of the step involved process_name: str, when: datetime.datetime, process_run_id: Optional[str] = None, @@ -285,12 +296,28 @@ def start_process( if process_run_id is None: process_run_id = uuid.uuid4().urn prov_label = "Run of workflow/packed.cwl#main/" + process_name + # TESTING to include the Steps URI so linking to --print-rdf becomes possible + FILE_PATH = None + WORKFLOW_STEP = None + # Not sure if steps is always 1 element so a step name check including the # is performed + if step_id.endswith("#" + process_name): + # Temp import maybe there is another way to create the URI's ? + # Looked at --print-rdf for a possible URI + WORKFLOW = Namespace('Workflow', 'https://w3id.org/cwl/cwl#Workflow/') + WORKFLOW_STEP = WORKFLOW['steps'] + # Was not sure how to create a URI without a namespace + FILE = Namespace('', '') + # The entire file://....#step path + FILE_PATH = FILE[step_id] + + # Added the WORKFLOW_STEP and FILE_PATH to the object self.document.activity( process_run_id, None, None, - {PROV_TYPE: WFPROV["ProcessRun"], PROV_LABEL: prov_label}, + {PROV_TYPE: WFPROV["ProcessRun"], PROV_LABEL: prov_label, WORKFLOW_STEP: FILE_PATH}, ) + self.document.wasAssociatedWith( process_run_id, self.engine_uuid, str("wf:main/" + process_name) ) @@ -367,6 +394,12 @@ def declare_file(self, value: CWLObjectType) -> Tuple[ProvEntity, ProvEntity, st file_entity.add_attributes( {CWLPROV["nameext"]: cast(str, value["nameext"])} ) + + if "size" in value: + file_entity.add_attributes( + {CWLPROV["size"]: cast(int, value["size"])} + ) + self.document.specializationOf(file_entity, entity) # Check for secondaries diff --git a/cwltool/workflow.py b/cwltool/workflow.py index 986db5b99..586a0f888 100644 --- a/cwltool/workflow.py +++ b/cwltool/workflow.py @@ -442,6 +442,7 @@ def job( self.embedded_tool.parent_wf = self.prov_obj process_name = self.tool["id"].split("#")[1] self.prov_obj.start_process( + self.id, process_name, datetime.datetime.now(), self.embedded_tool.provenance_object.workflow_run_uri, From 00ce190683af0bfa7e239ca937ab6195603c78f7 Mon Sep 17 00:00:00 2001 From: Jasper Koehorst Date: Tue, 16 Aug 2022 16:34:37 +0200 Subject: [PATCH 02/49] formatting changes according to make format --- cwltool/main.py | 4 +++- cwltool/provenance_profile.py | 20 ++++++++++++-------- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/cwltool/main.py b/cwltool/main.py index 8e21b1e19..e8d4921eb 100755 --- a/cwltool/main.py +++ b/cwltool/main.py @@ -1475,7 +1475,9 @@ def loc_to_path(obj: CWLObjectType) -> None: research_obj.close(args.provenance) # Copy workflow.ttl to args.provenance if os.path.isfile(workflow_provenance): - shutil.copy(workflow_provenance, args.provenance + "/workflow/workflow.ttl") + shutil.copy( + workflow_provenance, args.provenance + "/workflow/workflow.ttl" + ) # Remove temp directory shutil.rmtree(temp_workflow_dir.name) diff --git a/cwltool/provenance_profile.py b/cwltool/provenance_profile.py index edb47ab2b..2270bcd65 100644 --- a/cwltool/provenance_profile.py +++ b/cwltool/provenance_profile.py @@ -282,7 +282,9 @@ def record_process_start( if step is None: raise Exception("No / wrong step detected...!") - process_run_id = self.start_process(step.id, process_name, datetime.datetime.now()) + process_run_id = self.start_process( + step.id, process_name, datetime.datetime.now() + ) return process_run_id def start_process( @@ -303,10 +305,10 @@ def start_process( if step_id.endswith("#" + process_name): # Temp import maybe there is another way to create the URI's ? # Looked at --print-rdf for a possible URI - WORKFLOW = Namespace('Workflow', 'https://w3id.org/cwl/cwl#Workflow/') - WORKFLOW_STEP = WORKFLOW['steps'] + WORKFLOW = Namespace("Workflow", "https://w3id.org/cwl/cwl#Workflow/") + WORKFLOW_STEP = WORKFLOW["steps"] # Was not sure how to create a URI without a namespace - FILE = Namespace('', '') + FILE = Namespace("", "") # The entire file://....#step path FILE_PATH = FILE[step_id] @@ -315,7 +317,11 @@ def start_process( process_run_id, None, None, - {PROV_TYPE: WFPROV["ProcessRun"], PROV_LABEL: prov_label, WORKFLOW_STEP: FILE_PATH}, + { + PROV_TYPE: WFPROV["ProcessRun"], + PROV_LABEL: prov_label, + WORKFLOW_STEP: FILE_PATH, + }, ) self.document.wasAssociatedWith( @@ -396,9 +402,7 @@ def declare_file(self, value: CWLObjectType) -> Tuple[ProvEntity, ProvEntity, st ) if "size" in value: - file_entity.add_attributes( - {CWLPROV["size"]: cast(int, value["size"])} - ) + file_entity.add_attributes({CWLPROV["size"]: cast(int, value["size"])}) self.document.specializationOf(file_entity, entity) From 562d13c8bd85dc8d946b1971d18bb6907375d49b Mon Sep 17 00:00:00 2001 From: Jasper Koehorst Date: Tue, 16 Aug 2022 16:44:34 +0200 Subject: [PATCH 03/49] formatting corrections Fix needed for the type for the activity function in adding the workflow step --- cwltool/provenance_profile.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/cwltool/provenance_profile.py b/cwltool/provenance_profile.py index 2270bcd65..fd84bd599 100644 --- a/cwltool/provenance_profile.py +++ b/cwltool/provenance_profile.py @@ -19,8 +19,7 @@ cast, ) -from prov.identifier import Identifier, QualifiedName, Namespace - +from prov.identifier import Identifier, Namespace, QualifiedName from prov.model import ( PROV, PROV_LABEL, @@ -34,6 +33,7 @@ from typing_extensions import TYPE_CHECKING import cwltool.workflow + from .errors import WorkflowException from .job import CommandLineJob, JobBase from .loghandler import _logger @@ -268,7 +268,9 @@ def record_process_start( ) -> Optional[str]: if not hasattr(process, "steps"): process_run_id = self.workflow_run_uri - elif not hasattr(job, "workflow"): + elif not hasattr(job, "workflow") and isinstance( + process, cwltool.workflow.Workflow + ): # commandline tool execution as part of workflow name = "" if isinstance(job, (CommandLineJob, JobBase, WorkflowJob)): @@ -320,7 +322,8 @@ def start_process( { PROV_TYPE: WFPROV["ProcessRun"], PROV_LABEL: prov_label, - WORKFLOW_STEP: FILE_PATH, + WORKFLOW_STEP: FILE_PATH, # type: ignore[dict-item] + # TODO fix type for the activity function }, ) From e063735d45da07215372508a78c179db5a18efe6 Mon Sep 17 00:00:00 2001 From: "Michael R. Crusoe" Date: Tue, 16 Aug 2022 18:02:23 +0200 Subject: [PATCH 04/49] remove need for type ignore --- cwltool/provenance_profile.py | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/cwltool/provenance_profile.py b/cwltool/provenance_profile.py index fd84bd599..adc6baccc 100644 --- a/cwltool/provenance_profile.py +++ b/cwltool/provenance_profile.py @@ -314,18 +314,17 @@ def start_process( # The entire file://....#step path FILE_PATH = FILE[step_id] - # Added the WORKFLOW_STEP and FILE_PATH to the object - self.document.activity( - process_run_id, - None, - None, - { - PROV_TYPE: WFPROV["ProcessRun"], - PROV_LABEL: prov_label, - WORKFLOW_STEP: FILE_PATH, # type: ignore[dict-item] - # TODO fix type for the activity function - }, - ) + # Added the WORKFLOW_STEP and FILE_PATH to the object + self.document.activity( + process_run_id, + None, + None, + { + PROV_TYPE: WFPROV["ProcessRun"], + PROV_LABEL: prov_label, + WORKFLOW_STEP: FILE_PATH, + }, + ) self.document.wasAssociatedWith( process_run_id, self.engine_uuid, str("wf:main/" + process_name) From e0bb0e01d19b4add16c1250e37a6a56265c78f94 Mon Sep 17 00:00:00 2001 From: Jasper Koehorst Date: Tue, 16 Aug 2022 20:01:46 +0200 Subject: [PATCH 05/49] hard change to checksum_only looking into accessing the arguments parser from the checksum_only option so we can add a --no-data like option to cwltool --- cwltool/provenance.py | 51 ++++++++++++++++++++++++++++++++++++++----- 1 file changed, 45 insertions(+), 6 deletions(-) diff --git a/cwltool/provenance.py b/cwltool/provenance.py index 5654c6eef..671a637c9 100644 --- a/cwltool/provenance.py +++ b/cwltool/provenance.py @@ -426,14 +426,14 @@ def add_tagfile( # adding checksums after closing. # Below probably OK for now as metadata files # are not too large..? - - checksums[SHA1] = checksum_copy(tag_file, hasher=hashlib.sha1) + _logger.info("Performing checksum calculations") + checksums[SHA1] = checksum_only(tag_file, hasher=hashlib.sha1) tag_file.seek(0) - checksums[SHA256] = checksum_copy(tag_file, hasher=hashlib.sha256) + checksums[SHA256] = checksum_only(tag_file, hasher=hashlib.sha256) tag_file.seek(0) - checksums[SHA512] = checksum_copy(tag_file, hasher=hashlib.sha512) + checksums[SHA512] = checksum_only(tag_file, hasher=hashlib.sha512) rel_path = posix_path(os.path.relpath(path, self.folder)) self.tagfiles.add(rel_path) @@ -799,7 +799,7 @@ def add_data_file( with tempfile.NamedTemporaryFile( prefix=tmp_prefix, dir=tmp_dir, delete=False ) as tmp: - checksum = checksum_copy(from_fp, tmp) + checksum = checksum_only(from_fp, tmp) # Calculate hash-based file path folder = os.path.join(self.folder, DATA, checksum[0:2]) @@ -887,7 +887,7 @@ def _add_to_bagit(self, rel_path: str, **checksums: str) -> None: checksums = dict(checksums) with open(lpath, "rb") as file_path: # FIXME: Need sha-256 / sha-512 as well for Research Object BagIt profile? - checksums[SHA1] = checksum_copy(file_path, hasher=hashlib.sha1) + checksums[SHA1] = checksum_only(file_path, hasher=hashlib.sha1) self.add_to_manifest(rel_path, checksums) @@ -1042,3 +1042,42 @@ def checksum_copy( if dst_file is not None: dst_file.flush() return checksum.hexdigest().lower() + + +def checksum_only( + src_file: IO[Any], + dst_file: Optional[IO[Any]] = None, + hasher=Hasher, # type: Callable[[], hashlib._Hash] + buffersize: int = 1024 * 1024, +) -> str: + # TODO, one level up with a provenance -no-data option? + # First step, force dst_file to be none so it computes the checksum but does not write it to its destination + _logger.error("Hard force for dst_file to be None") + dst_file = None + + """Compute checksums while copying a file.""" + # TODO: Use hashlib.new(Hasher_str) instead? + checksum = hasher() + contents = src_file.read(buffersize) + if dst_file and hasattr(dst_file, "name") and hasattr(src_file, "name"): + temp_location = os.path.join(os.path.dirname(dst_file.name), str(uuid.uuid4())) + _logger.warning("Is it now writing to...: %s", temp_location) + try: + os.rename(dst_file.name, temp_location) + os.link(src_file.name, dst_file.name) + dst_file = None + os.unlink(temp_location) + except OSError: + pass + if os.path.exists(temp_location): + pass # os.rename(temp_location, dst_file.name) # type: ignore + + while contents != b"": + if dst_file is not None: + _logger.error("WRITING!!! %s", dst_file) + dst_file.write(contents) + checksum.update(contents) + contents = src_file.read(buffersize) + if dst_file is not None: + dst_file.flush() + return checksum.hexdigest().lower() From 16335b719cdf84356d171b749d4f4318e6d3a7fb Mon Sep 17 00:00:00 2001 From: Jasper Koehorst Date: Tue, 16 Aug 2022 20:02:31 +0200 Subject: [PATCH 06/49] Added sha checksum to file_entity, need to look into what predicate should be used and if the SHA1 string can be a URI (faster indexing in triple stores) --- cwltool/provenance_profile.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/cwltool/provenance_profile.py b/cwltool/provenance_profile.py index fd84bd599..57abf67ef 100644 --- a/cwltool/provenance_profile.py +++ b/cwltool/provenance_profile.py @@ -407,6 +407,11 @@ def declare_file(self, value: CWLObjectType) -> Tuple[ProvEntity, ProvEntity, st if "size" in value: file_entity.add_attributes({CWLPROV["size"]: cast(int, value["size"])}) + # TODO check is there a URI for a checksum? and a base uri for checksum? + # e.g. + if "checksum" in value: + file_entity.add_attributes({CWLPROV["checksum"]: cast(str, value["checksum"])}) + self.document.specializationOf(file_entity, entity) # Check for secondaries From 40c6705341080685952b95c2de6a1c01dfaacfbb Mon Sep 17 00:00:00 2001 From: Jasper Koehorst Date: Wed, 17 Aug 2022 08:16:17 +0200 Subject: [PATCH 07/49] formatting cleanup --- cwltool/provenance.py | 2 +- cwltool/provenance_profile.py | 4 +++- mypy-stubs/prov/model.pyi | 1 - 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/cwltool/provenance.py b/cwltool/provenance.py index 671a637c9..4738158a4 100644 --- a/cwltool/provenance.py +++ b/cwltool/provenance.py @@ -1070,7 +1070,7 @@ def checksum_only( except OSError: pass if os.path.exists(temp_location): - pass # os.rename(temp_location, dst_file.name) # type: ignore + pass # os.rename(temp_location, dst_file.name) # type: ignore while contents != b"": if dst_file is not None: diff --git a/cwltool/provenance_profile.py b/cwltool/provenance_profile.py index 412b6f98b..b96de7d90 100644 --- a/cwltool/provenance_profile.py +++ b/cwltool/provenance_profile.py @@ -409,7 +409,9 @@ def declare_file(self, value: CWLObjectType) -> Tuple[ProvEntity, ProvEntity, st # TODO check is there a URI for a checksum? and a base uri for checksum? # e.g. if "checksum" in value: - file_entity.add_attributes({CWLPROV["checksum"]: cast(str, value["checksum"])}) + file_entity.add_attributes( + {CWLPROV["checksum"]: cast(str, value["checksum"])} + ) self.document.specializationOf(file_entity, entity) diff --git a/mypy-stubs/prov/model.pyi b/mypy-stubs/prov/model.pyi index 57ac5cb1e..7e816a989 100644 --- a/mypy-stubs/prov/model.pyi +++ b/mypy-stubs/prov/model.pyi @@ -3,7 +3,6 @@ from typing import IO, Any, Dict, Iterable, List, Set, Tuple from _typeshed import Incomplete from prov.constants import * - # from prov import Error as Error, serializers as serializers from prov.identifier import Identifier, Namespace, QualifiedName From 87c304bac319a74ffd7754edc6e0f7d05e60bf96 Mon Sep 17 00:00:00 2001 From: Jasper Koehorst Date: Wed, 17 Aug 2022 16:01:17 +0200 Subject: [PATCH 08/49] --no-data argument added --- cwltool/argparser.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/cwltool/argparser.py b/cwltool/argparser.py index 8dba4ce28..1ac44228a 100644 --- a/cwltool/argparser.py +++ b/cwltool/argparser.py @@ -249,6 +249,13 @@ def arg_parser() -> argparse.ArgumentParser: help="Record user account info as part of provenance.", dest="user_provenance", ) + provgroup.add_argument( + "--no-data", + default=False, + action="store_true", + help="Disables the storage of input and output data files of the workflow in the provenance data folder", + dest="no_data", + ) provgroup.add_argument( "--disable-user-provenance", default=False, From 049dcd71ceabd0a2253455f25231c9f6e07e3da8 Mon Sep 17 00:00:00 2001 From: Jasper Koehorst Date: Wed, 17 Aug 2022 16:02:15 +0200 Subject: [PATCH 09/49] added no_data variable to some functions as i was unable to access the arguments from the provenance environment --- cwltool/main.py | 3 +- cwltool/provenance.py | 70 +++++++++++++++++++++++-------------------- 2 files changed, 39 insertions(+), 34 deletions(-) diff --git a/cwltool/main.py b/cwltool/main.py index e8d4921eb..53f116709 100755 --- a/cwltool/main.py +++ b/cwltool/main.py @@ -8,6 +8,7 @@ import io import logging import os +import parser import shutil import signal import subprocess # nosec @@ -1451,7 +1452,7 @@ def loc_to_path(obj: CWLObjectType) -> None: research_obj = runtimeContext.research_obj if loadingContext.loader is not None: research_obj.generate_snapshot( - prov_deps(workflowobj, loadingContext.loader, uri) + prov_deps(workflowobj, loadingContext.loader, uri), args.no_data ) else: _logger.warning( diff --git a/cwltool/provenance.py b/cwltool/provenance.py index 4738158a4..c57ee2835 100644 --- a/cwltool/provenance.py +++ b/cwltool/provenance.py @@ -36,6 +36,7 @@ from schema_salad.utils import json_dumps from typing_extensions import TYPE_CHECKING, TypedDict +import cwltool from .loghandler import _logger from .provenance_constants import ( ACCOUNT_UUID, @@ -411,7 +412,10 @@ def write_bag_file( return bag_file def add_tagfile( - self, path: str, timestamp: Optional[datetime.datetime] = None + self, + path: str, + no_data: bool = False, + timestamp: Optional[datetime.datetime] = None, ) -> None: """Add tag files to our research object.""" self.self_check() @@ -421,19 +425,28 @@ def add_tagfile( return # FIXME: do the right thing for directories with open(path, "rb") as tag_file: + _logger.error("Path: %s", path) # FIXME: Should have more efficient open_tagfile() that # does all checksums in one go while writing through, # adding checksums after closing. # Below probably OK for now as metadata files # are not too large..? - _logger.info("Performing checksum calculations") - checksums[SHA1] = checksum_only(tag_file, hasher=hashlib.sha1) - tag_file.seek(0) - checksums[SHA256] = checksum_only(tag_file, hasher=hashlib.sha256) + _logger.info("Performing checksum calculations with no_data %s", no_data) + if no_data: + checksums[SHA1] = checksum_only(tag_file, hasher=hashlib.sha1) + tag_file.seek(0) + checksums[SHA256] = checksum_only(tag_file, hasher=hashlib.sha256) + tag_file.seek(0) + checksums[SHA512] = checksum_only(tag_file, hasher=hashlib.sha512) + else: + checksums[SHA1] = checksum_copy(tag_file, hasher=hashlib.sha1) + tag_file.seek(0) + checksums[SHA256] = checksum_copy(tag_file, hasher=hashlib.sha256) + tag_file.seek(0) + checksums[SHA512] = checksum_copy(tag_file, hasher=hashlib.sha512) + - tag_file.seek(0) - checksums[SHA512] = checksum_only(tag_file, hasher=hashlib.sha512) rel_path = posix_path(os.path.relpath(path, self.folder)) self.tagfiles.add(rel_path) @@ -738,7 +751,7 @@ def _write_bag_info(self) -> None: info_file.write("Payload-Oxum: %d.%d\n" % (total_size, num_files)) _logger.debug("[provenance] Generated bagit metadata: %s", self.folder) - def generate_snapshot(self, prov_dep: CWLObjectType) -> None: + def generate_snapshot(self, prov_dep: CWLObjectType, no_data: bool) -> None: """Copy all of the CWL files to the snapshot/ directory.""" self.self_check() for key, value in prov_dep.items(): @@ -754,6 +767,7 @@ def generate_snapshot(self, prov_dep: CWLObjectType) -> None: # FIXME: What if destination path already exists? if os.path.exists(filepath): + _logger.error("Filepath: %s", filepath) try: if os.path.isdir(filepath): shutil.copytree(filepath, path) @@ -762,13 +776,13 @@ def generate_snapshot(self, prov_dep: CWLObjectType) -> None: timestamp = datetime.datetime.fromtimestamp( os.path.getmtime(filepath) ) - self.add_tagfile(path, timestamp) + self.add_tagfile(path, no_data, timestamp) except PermissionError: pass # FIXME: avoids duplicate snapshotting; need better solution elif key in ("secondaryFiles", "listing"): for files in cast(MutableSequence[CWLObjectType], value): if isinstance(files, MutableMapping): - self.generate_snapshot(files) + self.generate_snapshot(files, no_data) else: pass @@ -1023,17 +1037,17 @@ def checksum_copy( # TODO: Use hashlib.new(Hasher_str) instead? checksum = hasher() contents = src_file.read(buffersize) - if dst_file and hasattr(dst_file, "name") and hasattr(src_file, "name"): - temp_location = os.path.join(os.path.dirname(dst_file.name), str(uuid.uuid4())) - try: - os.rename(dst_file.name, temp_location) - os.link(src_file.name, dst_file.name) - dst_file = None - os.unlink(temp_location) - except OSError: - pass - if os.path.exists(temp_location): - os.rename(temp_location, dst_file.name) # type: ignore + # if dst_file and hasattr(dst_file, "name") and hasattr(src_file, "name"): + # temp_location = os.path.join(os.path.dirname(dst_file.name), str(uuid.uuid4())) + # try: + # os.rename(dst_file.name, temp_location) + # os.link(src_file.name, dst_file.name) + # dst_file = None + # os.unlink(temp_location) + # except OSError: + # pass + # if os.path.exists(temp_location): + # os.rename(temp_location, dst_file.name) # type: ignore while contents != b"": if dst_file is not None: dst_file.write(contents) @@ -1053,25 +1067,15 @@ def checksum_only( # TODO, one level up with a provenance -no-data option? # First step, force dst_file to be none so it computes the checksum but does not write it to its destination _logger.error("Hard force for dst_file to be None") + _logger.error("src_file: %s", src_file) dst_file = None """Compute checksums while copying a file.""" # TODO: Use hashlib.new(Hasher_str) instead? checksum = hasher() contents = src_file.read(buffersize) - if dst_file and hasattr(dst_file, "name") and hasattr(src_file, "name"): - temp_location = os.path.join(os.path.dirname(dst_file.name), str(uuid.uuid4())) - _logger.warning("Is it now writing to...: %s", temp_location) - try: - os.rename(dst_file.name, temp_location) - os.link(src_file.name, dst_file.name) - dst_file = None - os.unlink(temp_location) - except OSError: - pass - if os.path.exists(temp_location): - pass # os.rename(temp_location, dst_file.name) # type: ignore + # TODO Could be a function for both checksum_only and checksum_copy? while contents != b"": if dst_file is not None: _logger.error("WRITING!!! %s", dst_file) From 21ecba9f6dddf238da6143a1f850609b7452b078 Mon Sep 17 00:00:00 2001 From: Jasper Koehorst Date: Wed, 17 Aug 2022 16:07:49 +0200 Subject: [PATCH 10/49] test provenance --no-data added and a TODO check for check_bagit if we can run it when there is no data in the data provenance folder. --- tests/test_provenance.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/test_provenance.py b/tests/test_provenance.py index 7dd7790df..92c3f5a1d 100644 --- a/tests/test_provenance.py +++ b/tests/test_provenance.py @@ -34,7 +34,7 @@ def cwltool(tmp_path: Path, *args: Any) -> Path: prov_folder = tmp_path / "provenance" prov_folder.mkdir() - new_args = ["--provenance", str(prov_folder)] + new_args = ["--no-data", "--provenance", str(prov_folder)] new_args.extend(args) # Run within a temporary directory to not pollute git checkout tmp_dir = tmp_path / "cwltool-run" @@ -240,6 +240,7 @@ def check_provenance( secondary_files: bool = False, ) -> None: check_folders(base_path) + # TODO can we run check_bagit if there is no data in data? check_bagit(base_path) check_ro(base_path, nested=nested) check_prov( From 879d5cedd876082dfa3b68598fb1389b0ec48a77 Mon Sep 17 00:00:00 2001 From: Jasper Koehorst Date: Wed, 17 Aug 2022 16:20:11 +0200 Subject: [PATCH 11/49] Global no-data option for now to test the same environment with or without data --- tests/test_provenance.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/tests/test_provenance.py b/tests/test_provenance.py index 92c3f5a1d..9e5de636e 100644 --- a/tests/test_provenance.py +++ b/tests/test_provenance.py @@ -30,11 +30,14 @@ CWLPROV = Namespace("https://w3id.org/cwl/prov#") OA = Namespace("http://www.w3.org/ns/oa#") +NO_DATA = True def cwltool(tmp_path: Path, *args: Any) -> Path: prov_folder = tmp_path / "provenance" prov_folder.mkdir() - new_args = ["--no-data", "--provenance", str(prov_folder)] + new_args = ["--provenance", str(prov_folder)] + if NO_DATA: + new_args = ["--no-data"] + new_args new_args.extend(args) # Run within a temporary directory to not pollute git checkout tmp_dir = tmp_path / "cwltool-run" @@ -266,6 +269,9 @@ def check_folders(base_path: Path) -> None: def check_bagit(base_path: Path) -> None: + if NO_DATA: + return + # check bagit structure required_files = [ "bagit.txt", @@ -525,7 +531,8 @@ def check_prov( g2.parse(file=f, format="nt", publicID=nt_uri) # TODO: Check g2 statements that it's the same UUID activity inside # as in the outer step - if directory: + # TODO check with NO_DATA being true is this then false?... + if directory and not NO_DATA: directories = set(g.subjects(RDF.type, RO.Folder)) assert directories From 723c643293231f0d6542e1e66a991fc6e750d1c5 Mon Sep 17 00:00:00 2001 From: Jasper Koehorst Date: Thu, 18 Aug 2022 11:19:46 +0200 Subject: [PATCH 12/49] NO_DATA global variable added to know if there should be no data for the provenance --- cwltool/main.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/cwltool/main.py b/cwltool/main.py index 53f116709..fe2974ebf 100755 --- a/cwltool/main.py +++ b/cwltool/main.py @@ -973,6 +973,9 @@ def print_targets( ) +NO_DATA = None + + def main( argsl: Optional[List[str]] = None, args: Optional[argparse.Namespace] = None, @@ -1049,6 +1052,11 @@ def main( return 0 _logger.info(versionfunc()) + # TODO How can we access args.no_data from other places in a nice way?... + _logger.error("No data status %s", args.no_data) + global NO_DATA + NO_DATA = args.no_data + if args.print_supported_versions: print("\n".join(supported_cwl_versions(args.enable_dev)), file=stdout) return 0 From 540a5a8c51095f8de85568ea81a99d6a695a19ad Mon Sep 17 00:00:00 2001 From: Jasper Koehorst Date: Thu, 18 Aug 2022 11:19:59 +0200 Subject: [PATCH 13/49] formatting --- tests/test_provenance.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_provenance.py b/tests/test_provenance.py index 9e5de636e..302a6d48b 100644 --- a/tests/test_provenance.py +++ b/tests/test_provenance.py @@ -32,6 +32,7 @@ NO_DATA = True + def cwltool(tmp_path: Path, *args: Any) -> Path: prov_folder = tmp_path / "provenance" prov_folder.mkdir() From 211348af528a1b0d334c64cbd95a86d5d4b9c158 Mon Sep 17 00:00:00 2001 From: Jasper Koehorst Date: Thu, 18 Aug 2022 11:20:46 +0200 Subject: [PATCH 14/49] cleaning logger and no_data access implementation --- cwltool/provenance.py | 82 +++++++++++++++++++++++-------------------- 1 file changed, 44 insertions(+), 38 deletions(-) diff --git a/cwltool/provenance.py b/cwltool/provenance.py index c57ee2835..d8aa6ba3a 100644 --- a/cwltool/provenance.py +++ b/cwltool/provenance.py @@ -412,10 +412,10 @@ def write_bag_file( return bag_file def add_tagfile( - self, - path: str, - no_data: bool = False, - timestamp: Optional[datetime.datetime] = None, + self, + path: str, + no_data: bool = False, + timestamp: Optional[datetime.datetime] = None, ) -> None: """Add tag files to our research object.""" self.self_check() @@ -425,15 +425,12 @@ def add_tagfile( return # FIXME: do the right thing for directories with open(path, "rb") as tag_file: - _logger.error("Path: %s", path) # FIXME: Should have more efficient open_tagfile() that # does all checksums in one go while writing through, # adding checksums after closing. # Below probably OK for now as metadata files # are not too large..? - - _logger.info("Performing checksum calculations with no_data %s", no_data) - if no_data: + if cwltool.main.NO_DATA: checksums[SHA1] = checksum_only(tag_file, hasher=hashlib.sha1) tag_file.seek(0) checksums[SHA256] = checksum_only(tag_file, hasher=hashlib.sha256) @@ -446,8 +443,6 @@ def add_tagfile( tag_file.seek(0) checksums[SHA512] = checksum_copy(tag_file, hasher=hashlib.sha512) - - rel_path = posix_path(os.path.relpath(path, self.folder)) self.tagfiles.add(rel_path) self.add_to_manifest(rel_path, checksums) @@ -767,7 +762,6 @@ def generate_snapshot(self, prov_dep: CWLObjectType, no_data: bool) -> None: # FIXME: What if destination path already exists? if os.path.exists(filepath): - _logger.error("Filepath: %s", filepath) try: if os.path.isdir(filepath): shutil.copytree(filepath, path) @@ -807,13 +801,18 @@ def add_data_file( timestamp: Optional[datetime.datetime] = None, content_type: Optional[str] = None, ) -> str: + # TODO only when --no-data is not used! """Copy inputs to data/ folder.""" self.self_check() tmp_dir, tmp_prefix = os.path.split(self.temp_prefix) with tempfile.NamedTemporaryFile( prefix=tmp_prefix, dir=tmp_dir, delete=False ) as tmp: - checksum = checksum_only(from_fp, tmp) + # TODO this should depend on the arguments + if cwltool.main.NO_DATA: + checksum = checksum_only(from_fp) + else: + checksum = checksum_copy(from_fp, tmp) # Calculate hash-based file path folder = os.path.join(self.folder, DATA, checksum[0:2]) @@ -825,6 +824,7 @@ def add_data_file( os.rename(tmp.name, path) # Relative posix path + # TODO only when no-data is False?... rel_path = posix_path(os.path.relpath(path, self.folder)) # Register in bagit checksum @@ -901,7 +901,10 @@ def _add_to_bagit(self, rel_path: str, **checksums: str) -> None: checksums = dict(checksums) with open(lpath, "rb") as file_path: # FIXME: Need sha-256 / sha-512 as well for Research Object BagIt profile? - checksums[SHA1] = checksum_only(file_path, hasher=hashlib.sha1) + if cwltool.main.NO_DATA: + checksums[SHA1] = checksum_only(file_path, hasher=hashlib.sha1) + else: + checksums[SHA1] = checksum_copy(file_path, hasher=hashlib.sha1) self.add_to_manifest(rel_path, checksums) @@ -1037,17 +1040,22 @@ def checksum_copy( # TODO: Use hashlib.new(Hasher_str) instead? checksum = hasher() contents = src_file.read(buffersize) - # if dst_file and hasattr(dst_file, "name") and hasattr(src_file, "name"): - # temp_location = os.path.join(os.path.dirname(dst_file.name), str(uuid.uuid4())) - # try: - # os.rename(dst_file.name, temp_location) - # os.link(src_file.name, dst_file.name) - # dst_file = None - # os.unlink(temp_location) - # except OSError: - # pass - # if os.path.exists(temp_location): - # os.rename(temp_location, dst_file.name) # type: ignore + if dst_file and hasattr(dst_file, "name") and hasattr(src_file, "name"): + temp_location = os.path.join(os.path.dirname(dst_file.name), str(uuid.uuid4())) + try: + os.rename(dst_file.name, temp_location) + os.link(src_file.name, dst_file.name) + dst_file = None + os.unlink(temp_location) + except OSError: + pass + if os.path.exists(temp_location): + os.rename(temp_location, dst_file.name) # type: ignore + + return content_processor(contents, src_file, dst_file, checksum, buffersize) + + +def content_processor(contents, src_file, dst_file, checksum, buffersize): while contents != b"": if dst_file is not None: dst_file.write(contents) @@ -1064,11 +1072,9 @@ def checksum_only( hasher=Hasher, # type: Callable[[], hashlib._Hash] buffersize: int = 1024 * 1024, ) -> str: - # TODO, one level up with a provenance -no-data option? - # First step, force dst_file to be none so it computes the checksum but does not write it to its destination - _logger.error("Hard force for dst_file to be None") - _logger.error("src_file: %s", src_file) - dst_file = None + + if dst_file != None: + _logger.error("Destination file should be None but it is %s", dst_file) """Compute checksums while copying a file.""" # TODO: Use hashlib.new(Hasher_str) instead? @@ -1076,12 +1082,12 @@ def checksum_only( contents = src_file.read(buffersize) # TODO Could be a function for both checksum_only and checksum_copy? - while contents != b"": - if dst_file is not None: - _logger.error("WRITING!!! %s", dst_file) - dst_file.write(contents) - checksum.update(contents) - contents = src_file.read(buffersize) - if dst_file is not None: - dst_file.flush() - return checksum.hexdigest().lower() + return content_processor(contents, src_file, dst_file, checksum, buffersize) + # while contents != b"": + # if dst_file is not None: + # dst_file.write(contents) + # checksum.update(contents) + # contents = src_file.read(buffersize) + # if dst_file is not None: + # dst_file.flush() + # return checksum.hexdigest().lower() From ad90be653357b076524362623141c3d553e05780 Mon Sep 17 00:00:00 2001 From: Jasper Koehorst Date: Thu, 25 Aug 2022 07:06:44 +0200 Subject: [PATCH 15/49] cleaning up imports --- cwltool/main.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/cwltool/main.py b/cwltool/main.py index fe2974ebf..c8f341220 100755 --- a/cwltool/main.py +++ b/cwltool/main.py @@ -8,7 +8,6 @@ import io import logging import os -import parser import shutil import signal import subprocess # nosec @@ -18,7 +17,6 @@ import urllib import warnings from codecs import StreamWriter, getwriter -from collections.abc import MutableMapping, MutableSequence from typing import ( IO, Any, From 76abff01114323efaf5dfa4e621beb9871c8e366 Mon Sep 17 00:00:00 2001 From: Jasper Koehorst Date: Mon, 5 Sep 2022 18:09:25 +0200 Subject: [PATCH 16/49] make remove_unused_imports, cleaning up all kinds of imports --- .gitignore | 1 + cwltool/argparser.py | 2 -- cwltool/command_line_tool.py | 1 - cwltool/cuda.py | 2 +- cwltool/docker.py | 1 - cwltool/provenance_profile.py | 1 - tests/test_cuda.py | 6 +++--- tests/test_stdout_stderr_log_dir.py | 4 +--- tests/test_toolargparse.py | 2 +- 9 files changed, 7 insertions(+), 13 deletions(-) diff --git a/.gitignore b/.gitignore index 3300dfac2..1707a206e 100644 --- a/.gitignore +++ b/.gitignore @@ -55,3 +55,4 @@ value # Folder created when using make cwltool_deps +testenv1 diff --git a/cwltool/argparser.py b/cwltool/argparser.py index 1ac44228a..341c95e9c 100644 --- a/cwltool/argparser.py +++ b/cwltool/argparser.py @@ -5,7 +5,6 @@ import urllib from typing import ( Any, - AnyStr, Callable, Dict, List, @@ -18,7 +17,6 @@ cast, ) -from schema_salad.ref_resolver import file_uri from .loghandler import _logger from .process import Process, shortname diff --git a/cwltool/command_line_tool.py b/cwltool/command_line_tool.py index e4ba6f961..6125d5d17 100644 --- a/cwltool/command_line_tool.py +++ b/cwltool/command_line_tool.py @@ -15,7 +15,6 @@ from functools import cmp_to_key, partial from typing import ( Any, - Callable, Dict, Generator, List, diff --git a/cwltool/cuda.py b/cwltool/cuda.py index 65dc19c10..50bee5599 100644 --- a/cwltool/cuda.py +++ b/cwltool/cuda.py @@ -1,6 +1,6 @@ import subprocess # nosec import xml.dom.minidom # nosec -from typing import Tuple, cast +from typing import Tuple from .loghandler import _logger from .utils import CWLObjectType diff --git a/cwltool/docker.py b/cwltool/docker.py index b7578faf9..1d7c9f978 100644 --- a/cwltool/docker.py +++ b/cwltool/docker.py @@ -16,7 +16,6 @@ from .builder import Builder from .context import RuntimeContext -from .cuda import cuda_check from .docker_id import docker_vm_id from .errors import WorkflowException from .job import ContainerCommandLineJob diff --git a/cwltool/provenance_profile.py b/cwltool/provenance_profile.py index b96de7d90..824a57a67 100644 --- a/cwltool/provenance_profile.py +++ b/cwltool/provenance_profile.py @@ -27,7 +27,6 @@ PROV_VALUE, ProvDocument, ProvEntity, - ProvRecord, ) from schema_salad.sourceline import SourceLine from typing_extensions import TYPE_CHECKING diff --git a/tests/test_cuda.py b/tests/test_cuda.py index 61920a969..3ebe476fb 100644 --- a/tests/test_cuda.py +++ b/tests/test_cuda.py @@ -11,10 +11,10 @@ from cwltool.job import CommandLineJob from cwltool.load_tool import load_tool from cwltool.main import main -from cwltool.pathmapper import MapperEnt, PathMapper -from cwltool.process import use_custom_schema, use_standard_schema +from cwltool.pathmapper import PathMapper +from cwltool.process import use_custom_schema from cwltool.stdfsaccess import StdFsAccess -from cwltool.update import INTERNAL_VERSION, ORIGINAL_CWLVERSION +from cwltool.update import INTERNAL_VERSION from cwltool.utils import CWLObjectType from .util import get_data, needs_docker, needs_singularity_3_or_newer diff --git a/tests/test_stdout_stderr_log_dir.py b/tests/test_stdout_stderr_log_dir.py index d80ca1b67..945841789 100644 --- a/tests/test_stdout_stderr_log_dir.py +++ b/tests/test_stdout_stderr_log_dir.py @@ -1,10 +1,8 @@ import json -import os from pathlib import Path -from cwltool.main import main -from .util import get_data, get_main_output, needs_docker +from .util import get_data, get_main_output def test_log_dir_echo_output(tmp_path: Path) -> None: diff --git a/tests/test_toolargparse.py b/tests/test_toolargparse.py index d7807084a..8200b6598 100644 --- a/tests/test_toolargparse.py +++ b/tests/test_toolargparse.py @@ -1,7 +1,7 @@ import argparse from io import StringIO from pathlib import Path -from typing import Any, Callable, Dict, List +from typing import Callable, List import pytest From 33d1551874420a0753938c7f25e448422e360d37 Mon Sep 17 00:00:00 2001 From: Jasper Koehorst Date: Mon, 5 Sep 2022 18:15:50 +0200 Subject: [PATCH 17/49] some empty line formatting --- cwltool/argparser.py | 1 - cwltool/provenance.py | 1 + mypy-stubs/rdflib/graph.pyi | 5 ----- 3 files changed, 1 insertion(+), 6 deletions(-) diff --git a/cwltool/argparser.py b/cwltool/argparser.py index 341c95e9c..c17df0872 100644 --- a/cwltool/argparser.py +++ b/cwltool/argparser.py @@ -17,7 +17,6 @@ cast, ) - from .loghandler import _logger from .process import Process, shortname from .resolver import ga4gh_tool_registries diff --git a/cwltool/provenance.py b/cwltool/provenance.py index d8aa6ba3a..b96736de0 100644 --- a/cwltool/provenance.py +++ b/cwltool/provenance.py @@ -37,6 +37,7 @@ from typing_extensions import TYPE_CHECKING, TypedDict import cwltool + from .loghandler import _logger from .provenance_constants import ( ACCOUNT_UUID, diff --git a/mypy-stubs/rdflib/graph.pyi b/mypy-stubs/rdflib/graph.pyi index 23c2e6e1f..8f356d857 100644 --- a/mypy-stubs/rdflib/graph.pyi +++ b/mypy-stubs/rdflib/graph.pyi @@ -106,7 +106,6 @@ class Graph(Node): ) -> Any: ... def namespaces(self) -> Iterator[Tuple[Any, Any]]: ... def absolutize(self, uri: Any, defrag: int = ...) -> Any: ... - # no destination and non-None positional encoding @overload def serialize( @@ -117,7 +116,6 @@ class Graph(Node): encoding: str, **args: Any, ) -> bytes: ... - # no destination and non-None keyword encoding @overload def serialize( @@ -129,7 +127,6 @@ class Graph(Node): encoding: str, **args: Any, ) -> bytes: ... - # no destination and None encoding @overload def serialize( @@ -140,7 +137,6 @@ class Graph(Node): encoding: None = ..., **args: Any, ) -> str: ... - # non-None destination @overload def serialize( @@ -151,7 +147,6 @@ class Graph(Node): encoding: Optional[str] = ..., **args: Any, ) -> "Graph": ... - # fallback @overload def serialize( From bc56733fcd230c40ad3dd59cbef33bd00745dc96 Mon Sep 17 00:00:00 2001 From: Jasper Koehorst Date: Mon, 5 Sep 2022 20:09:09 +0200 Subject: [PATCH 18/49] if not none instead of != --- cwltool/provenance.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cwltool/provenance.py b/cwltool/provenance.py index e44bf69ce..75907789e 100644 --- a/cwltool/provenance.py +++ b/cwltool/provenance.py @@ -1078,7 +1078,7 @@ def checksum_only( buffersize: int = 1024 * 1024, ) -> str: - if dst_file != None: + if dst_file is not None: _logger.error("Destination file should be None but it is %s", dst_file) """Compute checksums while copying a file.""" From e5b498d1bb6e8c57952c16960da5b42a9bd9fa1f Mon Sep 17 00:00:00 2001 From: Jasper Koehorst Date: Tue, 6 Sep 2022 16:24:43 +0200 Subject: [PATCH 19/49] make cleanup sync --- cwltool/provenance_profile.py | 9 +-------- mypy-stubs/prov/model.pyi | 1 + 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/cwltool/provenance_profile.py b/cwltool/provenance_profile.py index 824a57a67..d192c343f 100644 --- a/cwltool/provenance_profile.py +++ b/cwltool/provenance_profile.py @@ -20,14 +20,7 @@ ) from prov.identifier import Identifier, Namespace, QualifiedName -from prov.model import ( - PROV, - PROV_LABEL, - PROV_TYPE, - PROV_VALUE, - ProvDocument, - ProvEntity, -) +from prov.model import PROV, PROV_LABEL, PROV_TYPE, PROV_VALUE, ProvDocument, ProvEntity from schema_salad.sourceline import SourceLine from typing_extensions import TYPE_CHECKING diff --git a/mypy-stubs/prov/model.pyi b/mypy-stubs/prov/model.pyi index 7e816a989..57ac5cb1e 100644 --- a/mypy-stubs/prov/model.pyi +++ b/mypy-stubs/prov/model.pyi @@ -3,6 +3,7 @@ from typing import IO, Any, Dict, Iterable, List, Set, Tuple from _typeshed import Incomplete from prov.constants import * + # from prov import Error as Error, serializers as serializers from prov.identifier import Identifier, Namespace, QualifiedName From 3666b653e9c5670f2f27e62afd2432b702719f48 Mon Sep 17 00:00:00 2001 From: Jasper Koehorst Date: Tue, 6 Sep 2022 16:29:29 +0200 Subject: [PATCH 20/49] docstrings added --- cwltool/provenance.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/cwltool/provenance.py b/cwltool/provenance.py index 75907789e..670353302 100644 --- a/cwltool/provenance.py +++ b/cwltool/provenance.py @@ -1061,6 +1061,11 @@ def checksum_copy( def content_processor(contents, src_file, dst_file, checksum, buffersize): + """ + Calculate the checksum based on the content. + + @rtype: checksum + """ while contents != b"": if dst_file is not None: dst_file.write(contents) @@ -1077,7 +1082,11 @@ def checksum_only( hasher=Hasher, # type: Callable[[], hashlib._Hash] buffersize: int = 1024 * 1024, ) -> str: + """ + Calculate the checksum only, does not copy the data files. + @rtype: checksum + """ if dst_file is not None: _logger.error("Destination file should be None but it is %s", dst_file) @@ -1088,11 +1097,3 @@ def checksum_only( # TODO Could be a function for both checksum_only and checksum_copy? return content_processor(contents, src_file, dst_file, checksum, buffersize) - # while contents != b"": - # if dst_file is not None: - # dst_file.write(contents) - # checksum.update(contents) - # contents = src_file.read(buffersize) - # if dst_file is not None: - # dst_file.flush() - # return checksum.hexdigest().lower() From f58e90e974d6dcbd1217eb7f840a9416d3f7389c Mon Sep 17 00:00:00 2001 From: Jasper Koehorst Date: Tue, 6 Sep 2022 16:42:17 +0200 Subject: [PATCH 21/49] Default NO_DATA set to false --- cwltool/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cwltool/main.py b/cwltool/main.py index c8f341220..ace0caf9d 100755 --- a/cwltool/main.py +++ b/cwltool/main.py @@ -971,7 +971,7 @@ def print_targets( ) -NO_DATA = None +NO_DATA = False def main( From 4a6906bd6f8c818024e334de11f61b59d8664f0a Mon Sep 17 00:00:00 2001 From: Jasper Koehorst Date: Tue, 6 Sep 2022 17:09:29 +0200 Subject: [PATCH 22/49] move NO_DATA to utils --- cwltool/main.py | 205 +++++++++++++++++++++--------------------- cwltool/provenance.py | 26 +++--- cwltool/utils.py | 2 + 3 files changed, 113 insertions(+), 120 deletions(-) diff --git a/cwltool/main.py b/cwltool/main.py index ace0caf9d..d3fa2e6e9 100755 --- a/cwltool/main.py +++ b/cwltool/main.py @@ -45,7 +45,7 @@ from schema_salad.sourceline import cmap, strip_dup_lineno from schema_salad.utils import ContextType, FetcherCallableType, json_dumps, yaml_no_ts -from . import CWL_CONTENT_TYPES, workflow +from . import CWL_CONTENT_TYPES, workflow, utils from .argparser import arg_parser, generate_parser, get_default_args from .context import LoadingContext, RuntimeContext, getdefault from .cwlrdf import printdot, printrdf @@ -161,8 +161,8 @@ def _signal_handler(signum: int, _: Any) -> None: def generate_example_input( - inptype: Optional[CWLOutputType], - default: Optional[CWLOutputType], + inptype: Optional[CWLOutputType], + default: Optional[CWLOutputType], ) -> Tuple[Any, str]: """Convert a single input schema into an example.""" example = None @@ -258,8 +258,8 @@ def generate_example_input( def realize_input_schema( - input_types: MutableSequence[Union[str, CWLObjectType]], - schema_defs: MutableMapping[str, CWLObjectType], + input_types: MutableSequence[Union[str, CWLObjectType]], + schema_defs: MutableMapping[str, CWLObjectType], ) -> MutableSequence[Union[str, CWLObjectType]]: """Replace references to named typed with the actual types.""" for index, entry in enumerate(input_types): @@ -329,8 +329,8 @@ def generate_input_template(tool: Process) -> CWLObjectType: """Generate an example input object for the given CWL process.""" template = ruamel.yaml.comments.CommentedMap() for inp in cast( - List[MutableMapping[str, str]], - realize_input_schema(tool.tool["inputs"], tool.schemaDefs), + List[MutableMapping[str, str]], + realize_input_schema(tool.tool["inputs"], tool.schemaDefs), ): name = shortname(inp["id"]) value, comment = generate_example_input(inp["type"], inp.get("default", None)) @@ -339,13 +339,12 @@ def generate_input_template(tool: Process) -> CWLObjectType: def load_job_order( - args: argparse.Namespace, - stdin: IO[Any], - fetcher_constructor: Optional[FetcherCallableType], - overrides_list: List[CWLObjectType], - tool_file_uri: str, + args: argparse.Namespace, + stdin: IO[Any], + fetcher_constructor: Optional[FetcherCallableType], + overrides_list: List[CWLObjectType], + tool_file_uri: str, ) -> Tuple[Optional[CWLObjectType], str, Loader]: - job_order_object = None job_order_file = None @@ -378,8 +377,8 @@ def load_job_order( ) if ( - job_order_object is not None - and "http://commonwl.org/cwltool#overrides" in job_order_object + job_order_object is not None + and "http://commonwl.org/cwltool#overrides" in job_order_object ): ov_uri = file_uri(job_order_file or input_basedir) overrides_list.extend( @@ -391,7 +390,7 @@ def load_job_order( input_basedir = args.basedir if args.basedir else os.getcwd() if job_order_object is not None and not isinstance( - job_order_object, MutableMapping + job_order_object, MutableMapping ): _logger.error( "CWL input object at %s is not formatted correctly, it should be a " @@ -406,18 +405,18 @@ def load_job_order( def init_job_order( - job_order_object: Optional[CWLObjectType], - args: argparse.Namespace, - process: Process, - loader: Loader, - stdout: Union[TextIO, StreamWriter], - print_input_deps: bool = False, - relative_deps: str = "primary", - make_fs_access: Callable[[str], StdFsAccess] = StdFsAccess, - input_basedir: str = "", - secret_store: Optional[SecretStore] = None, - input_required: bool = True, - runtime_context: Optional[RuntimeContext] = None, + job_order_object: Optional[CWLObjectType], + args: argparse.Namespace, + process: Process, + loader: Loader, + stdout: Union[TextIO, StreamWriter], + print_input_deps: bool = False, + relative_deps: str = "primary", + make_fs_access: Callable[[str], StdFsAccess] = StdFsAccess, + input_basedir: str = "", + secret_store: Optional[SecretStore] = None, + input_required: bool = True, + runtime_context: Optional[RuntimeContext] = None, ) -> CWLObjectType: secrets_req, _ = process.get_requirement("http://commonwl.org/cwltool#Secrets") if job_order_object is None: @@ -442,7 +441,7 @@ def init_job_order( k: v for k, v in cmd_line.items() if k.startswith(record_name) } for key, value in record_items.items(): - record[key[len(record_name) + 1 :]] = value + record[key[len(record_name) + 1:]] = value del cmd_line[key] cmd_line[str(record_name)] = record if "job_order" in cmd_line and cmd_line["job_order"]: @@ -477,7 +476,7 @@ def init_job_order( for inp in process.tool["inputs"]: if "default" in inp and ( - not job_order_object or shortname(inp["id"]) not in job_order_object + not job_order_object or shortname(inp["id"]) not in job_order_object ): if not job_order_object: job_order_object = {} @@ -557,13 +556,13 @@ def make_relative(base: str, obj: CWLObjectType) -> None: def printdeps( - obj: CWLObjectType, - document_loader: Loader, - stdout: Union[TextIO, StreamWriter], - relative_deps: str, - uri: str, - basedir: Optional[str] = None, - nestdirs: bool = True, + obj: CWLObjectType, + document_loader: Loader, + stdout: Union[TextIO, StreamWriter], + relative_deps: str, + uri: str, + basedir: Optional[str] = None, + nestdirs: bool = True, ) -> None: """Print a JSON representation of the dependencies of the CWL document.""" deps = find_deps(obj, document_loader, uri, basedir=basedir, nestdirs=nestdirs) @@ -576,10 +575,10 @@ def printdeps( def prov_deps( - obj: CWLObjectType, - document_loader: Loader, - uri: str, - basedir: Optional[str] = None, + obj: CWLObjectType, + document_loader: Loader, + uri: str, + basedir: Optional[str] = None, ) -> CWLObjectType: deps = find_deps(obj, document_loader, uri, basedir=basedir) @@ -597,11 +596,11 @@ def remove_non_cwl(deps: CWLObjectType) -> None: def find_deps( - obj: CWLObjectType, - document_loader: Loader, - uri: str, - basedir: Optional[str] = None, - nestdirs: bool = True, + obj: CWLObjectType, + document_loader: Loader, + uri: str, + basedir: Optional[str] = None, + nestdirs: bool = True, ) -> CWLObjectType: """Find the dependencies of the CWL document.""" deps = { @@ -630,8 +629,8 @@ def loadref(base: str, uri: str) -> Union[CommentedMap, CommentedSeq, str, None] def print_pack( - loadingContext: LoadingContext, - uri: str, + loadingContext: LoadingContext, + uri: str, ) -> str: """Return a CWL serialization of the CWL document in JSON.""" packed = pack(loadingContext, uri) @@ -653,7 +652,7 @@ def supported_cwl_versions(enable_dev: bool) -> List[str]: def setup_schema( - args: argparse.Namespace, custom_schema_callback: Optional[Callable[[], None]] + args: argparse.Namespace, custom_schema_callback: Optional[Callable[[], None]] ) -> None: if custom_schema_callback is not None: custom_schema_callback() @@ -685,7 +684,7 @@ def __init__(self) -> None: super().__init__("[%(asctime)sZ] %(message)s") def formatTime( - self, record: logging.LogRecord, datefmt: Optional[str] = None + self, record: logging.LogRecord, datefmt: Optional[str] = None ) -> str: formatted_time = time.strftime( "%Y-%m-%dT%H:%M:%S", time.gmtime(float(record.created)) @@ -698,9 +697,9 @@ def formatTime( def setup_provenance( - args: argparse.Namespace, - argsl: List[str], - runtimeContext: RuntimeContext, + args: argparse.Namespace, + argsl: List[str], + runtimeContext: RuntimeContext, ) -> Tuple[ProvOut, "logging.StreamHandler[ProvOut]"]: if not args.compute_checksum: _logger.error("--provenance incompatible with --no-compute-checksum") @@ -727,9 +726,9 @@ def setup_provenance( def setup_loadingContext( - loadingContext: Optional[LoadingContext], - runtimeContext: RuntimeContext, - args: argparse.Namespace, + loadingContext: Optional[LoadingContext], + runtimeContext: RuntimeContext, + args: argparse.Namespace, ) -> LoadingContext: """Prepare a LoadingContext from the given arguments.""" if loadingContext is None: @@ -758,12 +757,12 @@ def setup_loadingContext( def make_template( - tool: Process, + tool: Process, ) -> None: """Make a template CWL input object for the give Process.""" def my_represent_none( - self: Any, data: Any + self: Any, data: Any ) -> Any: # pylint: disable=unused-argument """Force clean representation of 'null'.""" return self.represent_scalar("tag:yaml.org,2002:null", "null") @@ -807,9 +806,9 @@ def inherit_reqshints(tool: Process, parent: Process) -> None: def choose_target( - args: argparse.Namespace, - tool: Process, - loading_context: LoadingContext, + args: argparse.Namespace, + tool: Process, + loading_context: LoadingContext, ) -> Optional[Process]: """Walk the Workflow, extract the subset matches all the args.targets.""" if loading_context.loader is None: @@ -843,9 +842,9 @@ def choose_target( def choose_step( - args: argparse.Namespace, - tool: Process, - loading_context: LoadingContext, + args: argparse.Namespace, + tool: Process, + loading_context: LoadingContext, ) -> Optional[Process]: """Walk the given Workflow and extract just args.single_step.""" if loading_context.loader is None: @@ -875,9 +874,9 @@ def choose_step( def choose_process( - args: argparse.Namespace, - tool: Process, - loadingContext: LoadingContext, + args: argparse.Namespace, + tool: Process, + loadingContext: LoadingContext, ) -> Optional[Process]: """Walk the given Workflow and extract just args.single_process.""" if loadingContext.loader is None: @@ -909,18 +908,18 @@ def choose_process( def check_working_directories( - runtimeContext: RuntimeContext, + runtimeContext: RuntimeContext, ) -> Optional[int]: """Make any needed working directories.""" for dirprefix in ("tmpdir_prefix", "tmp_outdir_prefix", "cachedir"): if ( - getattr(runtimeContext, dirprefix) - and getattr(runtimeContext, dirprefix) != DEFAULT_TMP_PREFIX + getattr(runtimeContext, dirprefix) + and getattr(runtimeContext, dirprefix) != DEFAULT_TMP_PREFIX ): sl = ( "/" if getattr(runtimeContext, dirprefix).endswith("/") - or dirprefix == "cachedir" + or dirprefix == "cachedir" else "" ) setattr( @@ -938,10 +937,10 @@ def check_working_directories( def print_targets( - tool: Process, - stdout: Union[TextIO, StreamWriter], - loading_context: LoadingContext, - prefix: str = "", + tool: Process, + stdout: Union[TextIO, StreamWriter], + loading_context: LoadingContext, + prefix: str = "", ) -> None: """Recursively find targets for --subgraph and friends.""" for f in ("outputs", "inputs"): @@ -971,28 +970,25 @@ def print_targets( ) -NO_DATA = False - - def main( - argsl: Optional[List[str]] = None, - args: Optional[argparse.Namespace] = None, - job_order_object: Optional[CWLObjectType] = None, - stdin: IO[Any] = sys.stdin, - stdout: Optional[Union[TextIO, StreamWriter]] = None, - stderr: IO[Any] = sys.stderr, - versionfunc: Callable[[], str] = versionstring, - logger_handler: Optional[logging.Handler] = None, - custom_schema_callback: Optional[Callable[[], None]] = None, - executor: Optional[JobExecutor] = None, - loadingContext: Optional[LoadingContext] = None, - runtimeContext: Optional[RuntimeContext] = None, - input_required: bool = True, + argsl: Optional[List[str]] = None, + args: Optional[argparse.Namespace] = None, + job_order_object: Optional[CWLObjectType] = None, + stdin: IO[Any] = sys.stdin, + stdout: Optional[Union[TextIO, StreamWriter]] = None, + stderr: IO[Any] = sys.stderr, + versionfunc: Callable[[], str] = versionstring, + logger_handler: Optional[logging.Handler] = None, + custom_schema_callback: Optional[Callable[[], None]] = None, + executor: Optional[JobExecutor] = None, + loadingContext: Optional[LoadingContext] = None, + runtimeContext: Optional[RuntimeContext] = None, + input_required: bool = True, ) -> int: if not stdout: # force UTF-8 even if the console is configured differently if hasattr(sys.stdout, "encoding") and sys.stdout.encoding.upper() not in ( - "UTF-8", - "UTF8", + "UTF-8", + "UTF8", ): if hasattr(sys.stdout, "detach"): stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8") @@ -1051,9 +1047,8 @@ def main( _logger.info(versionfunc()) # TODO How can we access args.no_data from other places in a nice way?... - _logger.error("No data status %s", args.no_data) - global NO_DATA - NO_DATA = args.no_data + utils.NO_DATA = args.no_data + _logger.error("utils.NO_DATA: %s", utils.NO_DATA) if args.print_supported_versions: print("\n".join(supported_cwl_versions(args.enable_dev)), file=stdout) @@ -1449,16 +1444,16 @@ def loc_to_path(obj: CWLObjectType) -> None: finally: if ( - args - and runtimeContext - and runtimeContext.research_obj - and workflowobj - and loadingContext + args + and runtimeContext + and runtimeContext.research_obj + and workflowobj + and loadingContext ): research_obj = runtimeContext.research_obj if loadingContext.loader is not None: research_obj.generate_snapshot( - prov_deps(workflowobj, loadingContext.loader, uri), args.no_data + prov_deps(workflowobj, loadingContext.loader, uri) ) else: _logger.warning( @@ -1493,9 +1488,9 @@ def loc_to_path(obj: CWLObjectType) -> None: def find_default_container( - builder: HasReqsHints, - default_container: Optional[str] = None, - use_biocontainers: Optional[bool] = None, + builder: HasReqsHints, + default_container: Optional[str] = None, + use_biocontainers: Optional[bool] = None, ) -> Optional[str]: """Find a container.""" if not default_container and use_biocontainers: diff --git a/cwltool/provenance.py b/cwltool/provenance.py index 670353302..a327d0132 100644 --- a/cwltool/provenance.py +++ b/cwltool/provenance.py @@ -36,9 +36,9 @@ from schema_salad.utils import json_dumps from typing_extensions import TYPE_CHECKING, TypedDict -import cwltool - +from . import utils from .loghandler import _logger + from .provenance_constants import ( ACCOUNT_UUID, CWLPROV, @@ -71,11 +71,7 @@ ) if TYPE_CHECKING: - from .command_line_tool import ( # pylint: disable=unused-import - CommandLineTool, - ExpressionTool, - ) - from .workflow import Workflow # pylint: disable=unused-import + pass def _whoami() -> Tuple[str, str]: @@ -418,7 +414,6 @@ def write_bag_file( def add_tagfile( self, path: str, - no_data: bool = False, timestamp: Optional[datetime.datetime] = None, ) -> None: """Add tag files to our research object.""" @@ -434,7 +429,8 @@ def add_tagfile( # adding checksums after closing. # Below probably OK for now as metadata files # are not too large..? - if cwltool.main.NO_DATA: + + if utils.NO_DATA: checksums[SHA1] = checksum_only(tag_file, hasher=hashlib.sha1) tag_file.seek(0) checksums[SHA256] = checksum_only(tag_file, hasher=hashlib.sha256) @@ -750,8 +746,8 @@ def _write_bag_info(self) -> None: info_file.write("Payload-Oxum: %d.%d\n" % (total_size, num_files)) _logger.debug("[provenance] Generated bagit metadata: %s", self.folder) - def generate_snapshot(self, prov_dep: CWLObjectType, no_data: bool) -> None: - """Copy all of the CWL files to the snapshot/ directory.""" + def generate_snapshot(self, prov_dep: CWLObjectType) -> None: + """Copy all the CWL files to the snapshot/ directory.""" self.self_check() for key, value in prov_dep.items(): if key == "location" and cast(str, value).split("/")[-1]: @@ -774,13 +770,13 @@ def generate_snapshot(self, prov_dep: CWLObjectType, no_data: bool) -> None: timestamp = datetime.datetime.fromtimestamp( os.path.getmtime(filepath) ) - self.add_tagfile(path, no_data, timestamp) + self.add_tagfile(path, timestamp) except PermissionError: pass # FIXME: avoids duplicate snapshotting; need better solution elif key in ("secondaryFiles", "listing"): for files in cast(MutableSequence[CWLObjectType], value): if isinstance(files, MutableMapping): - self.generate_snapshot(files, no_data) + self.generate_snapshot(files) else: pass @@ -813,7 +809,7 @@ def add_data_file( prefix=tmp_prefix, dir=tmp_dir, delete=False ) as tmp: # TODO this should depend on the arguments - if cwltool.main.NO_DATA: + if NO_DATA: checksum = checksum_only(from_fp) else: checksum = checksum_copy(from_fp, tmp) @@ -906,7 +902,7 @@ def _add_to_bagit(self, rel_path: str, **checksums: str) -> None: checksums = dict(checksums) with open(lpath, "rb") as file_path: # FIXME: Need sha-256 / sha-512 as well for Research Object BagIt profile? - if cwltool.main.NO_DATA: + if NO_DATA: checksums[SHA1] = checksum_only(file_path, hasher=hashlib.sha1) else: checksums[SHA1] = checksum_copy(file_path, hasher=hashlib.sha1) diff --git a/cwltool/utils.py b/cwltool/utils.py index 824aeb843..9b0c132a3 100644 --- a/cwltool/utils.py +++ b/cwltool/utils.py @@ -58,6 +58,8 @@ processes_to_kill = collections.deque() # type: Deque[subprocess.Popen[str]] +NO_DATA = False + CWLOutputAtomType = Union[ None, bool, From 08e18b00da2ae63e056bd4717254434b781e4cfa Mon Sep 17 00:00:00 2001 From: "Michael R. Crusoe" Date: Tue, 6 Sep 2022 17:09:59 +0200 Subject: [PATCH 23/49] remove global NO_DATA --- cwltool/main.py | 11 ++--------- cwltool/provenance.py | 37 +++++++++++++++++-------------------- 2 files changed, 19 insertions(+), 29 deletions(-) diff --git a/cwltool/main.py b/cwltool/main.py index ace0caf9d..2c369f935 100755 --- a/cwltool/main.py +++ b/cwltool/main.py @@ -710,6 +710,7 @@ def setup_provenance( temp_prefix_ro=args.tmpdir_prefix, orcid=args.orcid, full_name=args.cwl_full_name, + no_data=args.no_data, ) runtimeContext.research_obj = ro @@ -971,9 +972,6 @@ def print_targets( ) -NO_DATA = False - - def main( argsl: Optional[List[str]] = None, args: Optional[argparse.Namespace] = None, @@ -1050,11 +1048,6 @@ def main( return 0 _logger.info(versionfunc()) - # TODO How can we access args.no_data from other places in a nice way?... - _logger.error("No data status %s", args.no_data) - global NO_DATA - NO_DATA = args.no_data - if args.print_supported_versions: print("\n".join(supported_cwl_versions(args.enable_dev)), file=stdout) return 0 @@ -1458,7 +1451,7 @@ def loc_to_path(obj: CWLObjectType) -> None: research_obj = runtimeContext.research_obj if loadingContext.loader is not None: research_obj.generate_snapshot( - prov_deps(workflowobj, loadingContext.loader, uri), args.no_data + prov_deps(workflowobj, loadingContext.loader, uri) ) else: _logger.warning( diff --git a/cwltool/provenance.py b/cwltool/provenance.py index 670353302..1699349cf 100644 --- a/cwltool/provenance.py +++ b/cwltool/provenance.py @@ -36,8 +36,6 @@ from schema_salad.utils import json_dumps from typing_extensions import TYPE_CHECKING, TypedDict -import cwltool - from .loghandler import _logger from .provenance_constants import ( ACCOUNT_UUID, @@ -278,6 +276,7 @@ def __init__( temp_prefix_ro: str = "tmp", orcid: str = "", full_name: str = "", + no_data: bool = False, ) -> None: """Initialize the ResearchObject.""" self.temp_prefix = temp_prefix_ro @@ -301,6 +300,7 @@ def __init__( ## self.relativised_input_object = {} # type: CWLObjectType self.has_manifest = False + self.no_data = no_data self._initialize() _logger.debug("[provenance] Temporary research object: %s", self.folder) @@ -418,7 +418,6 @@ def write_bag_file( def add_tagfile( self, path: str, - no_data: bool = False, timestamp: Optional[datetime.datetime] = None, ) -> None: """Add tag files to our research object.""" @@ -434,7 +433,7 @@ def add_tagfile( # adding checksums after closing. # Below probably OK for now as metadata files # are not too large..? - if cwltool.main.NO_DATA: + if self.no_data: checksums[SHA1] = checksum_only(tag_file, hasher=hashlib.sha1) tag_file.seek(0) checksums[SHA256] = checksum_only(tag_file, hasher=hashlib.sha256) @@ -750,7 +749,7 @@ def _write_bag_info(self) -> None: info_file.write("Payload-Oxum: %d.%d\n" % (total_size, num_files)) _logger.debug("[provenance] Generated bagit metadata: %s", self.folder) - def generate_snapshot(self, prov_dep: CWLObjectType, no_data: bool) -> None: + def generate_snapshot(self, prov_dep: CWLObjectType) -> None: """Copy all of the CWL files to the snapshot/ directory.""" self.self_check() for key, value in prov_dep.items(): @@ -774,13 +773,13 @@ def generate_snapshot(self, prov_dep: CWLObjectType, no_data: bool) -> None: timestamp = datetime.datetime.fromtimestamp( os.path.getmtime(filepath) ) - self.add_tagfile(path, no_data, timestamp) + self.add_tagfile(path, timestamp) except PermissionError: pass # FIXME: avoids duplicate snapshotting; need better solution elif key in ("secondaryFiles", "listing"): for files in cast(MutableSequence[CWLObjectType], value): if isinstance(files, MutableMapping): - self.generate_snapshot(files, no_data) + self.generate_snapshot(files) else: pass @@ -813,7 +812,7 @@ def add_data_file( prefix=tmp_prefix, dir=tmp_dir, delete=False ) as tmp: # TODO this should depend on the arguments - if cwltool.main.NO_DATA: + if self.no_data: checksum = checksum_only(from_fp) else: checksum = checksum_copy(from_fp, tmp) @@ -906,7 +905,7 @@ def _add_to_bagit(self, rel_path: str, **checksums: str) -> None: checksums = dict(checksums) with open(lpath, "rb") as file_path: # FIXME: Need sha-256 / sha-512 as well for Research Object BagIt profile? - if cwltool.main.NO_DATA: + if self.no_data: checksums[SHA1] = checksum_only(file_path, hasher=hashlib.sha1) else: checksums[SHA1] = checksum_copy(file_path, hasher=hashlib.sha1) @@ -1060,12 +1059,14 @@ def checksum_copy( return content_processor(contents, src_file, dst_file, checksum, buffersize) -def content_processor(contents, src_file, dst_file, checksum, buffersize): - """ - Calculate the checksum based on the content. - - @rtype: checksum - """ +def content_processor( + contents: Any, + src_file: IO[Any], + dst_file: Optional[IO[Any]], + checksum: hashlib._Hash, + buffersize: int, +) -> str: + """Calculate the checksum based on the content.""" while contents != b"": if dst_file is not None: dst_file.write(contents) @@ -1082,11 +1083,7 @@ def checksum_only( hasher=Hasher, # type: Callable[[], hashlib._Hash] buffersize: int = 1024 * 1024, ) -> str: - """ - Calculate the checksum only, does not copy the data files. - - @rtype: checksum - """ + """Calculate the checksum only, does not copy the data files.""" if dst_file is not None: _logger.error("Destination file should be None but it is %s", dst_file) From 33f706bee284de72bbb5ae9c5d8f61cf5bc9253e Mon Sep 17 00:00:00 2001 From: Jasper Koehorst Date: Tue, 6 Sep 2022 17:14:51 +0200 Subject: [PATCH 24/49] missed two NO_DATA's --- cwltool/provenance.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cwltool/provenance.py b/cwltool/provenance.py index a327d0132..490002d30 100644 --- a/cwltool/provenance.py +++ b/cwltool/provenance.py @@ -809,7 +809,7 @@ def add_data_file( prefix=tmp_prefix, dir=tmp_dir, delete=False ) as tmp: # TODO this should depend on the arguments - if NO_DATA: + if utils.NO_DATA: checksum = checksum_only(from_fp) else: checksum = checksum_copy(from_fp, tmp) @@ -902,7 +902,7 @@ def _add_to_bagit(self, rel_path: str, **checksums: str) -> None: checksums = dict(checksums) with open(lpath, "rb") as file_path: # FIXME: Need sha-256 / sha-512 as well for Research Object BagIt profile? - if NO_DATA: + if utils.NO_DATA: checksums[SHA1] = checksum_only(file_path, hasher=hashlib.sha1) else: checksums[SHA1] = checksum_copy(file_path, hasher=hashlib.sha1) From b288fb452b1169bbfd26c96562c99d7a45cbafcc Mon Sep 17 00:00:00 2001 From: Jasper Koehorst Date: Tue, 6 Sep 2022 17:18:20 +0200 Subject: [PATCH 25/49] added return type str: to the checksum content processor --- cwltool/provenance.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cwltool/provenance.py b/cwltool/provenance.py index 490002d30..786ccd632 100644 --- a/cwltool/provenance.py +++ b/cwltool/provenance.py @@ -1056,7 +1056,7 @@ def checksum_copy( return content_processor(contents, src_file, dst_file, checksum, buffersize) -def content_processor(contents, src_file, dst_file, checksum, buffersize): +def content_processor(contents, src_file, dst_file, checksum, buffersize) -> str: """ Calculate the checksum based on the content. From 1dbcdad6fc6a08cfb1568e01380cfdbbc0381855 Mon Sep 17 00:00:00 2001 From: "Michael R. Crusoe" Date: Tue, 6 Sep 2022 17:21:31 +0200 Subject: [PATCH 26/49] fix type --- cwltool/provenance.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cwltool/provenance.py b/cwltool/provenance.py index 1699349cf..0021e0016 100644 --- a/cwltool/provenance.py +++ b/cwltool/provenance.py @@ -1063,7 +1063,7 @@ def content_processor( contents: Any, src_file: IO[Any], dst_file: Optional[IO[Any]], - checksum: hashlib._Hash, + checksum: "hashlib._Hash", buffersize: int, ) -> str: """Calculate the checksum based on the content.""" From ab71278bb4c6b86d70ac633fb7d411e351086e3b Mon Sep 17 00:00:00 2001 From: "Michael R. Crusoe" Date: Tue, 6 Sep 2022 17:31:30 +0200 Subject: [PATCH 27/49] restore regular prov tests --- tests/test_provenance.py | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/tests/test_provenance.py b/tests/test_provenance.py index d8fdd174f..4adb5e5c1 100644 --- a/tests/test_provenance.py +++ b/tests/test_provenance.py @@ -30,15 +30,11 @@ CWLPROV = Namespace("https://w3id.org/cwl/prov#") OA = Namespace("http://www.w3.org/ns/oa#") -NO_DATA = True - def cwltool(tmp_path: Path, *args: Any) -> Path: prov_folder = tmp_path / "provenance" prov_folder.mkdir() new_args = ["--provenance", str(prov_folder)] - if NO_DATA: - new_args = ["--no-data"] + new_args new_args.extend(args) # Run within a temporary directory to not pollute git checkout tmp_dir = tmp_path / "cwltool-run" @@ -253,7 +249,6 @@ def check_provenance( secondary_files: bool = False, ) -> None: check_folders(base_path) - # TODO can we run check_bagit if there is no data in data? check_bagit(base_path) check_ro(base_path, nested=nested) check_prov( @@ -279,9 +274,6 @@ def check_folders(base_path: Path) -> None: def check_bagit(base_path: Path) -> None: - if NO_DATA: - return - # check bagit structure required_files = [ "bagit.txt", @@ -541,8 +533,7 @@ def check_prov( g2.parse(file=f, format="nt", publicID=nt_uri) # TODO: Check g2 statements that it's the same UUID activity inside # as in the outer step - # TODO check with NO_DATA being true is this then false?... - if directory and not NO_DATA: + if directory: directories = set(g.subjects(RDF.type, RO.Folder)) assert directories From 112f4f074561757bb4068491c685cf8df70f3545 Mon Sep 17 00:00:00 2001 From: Jasper Koehorst Date: Tue, 6 Sep 2022 19:04:11 +0200 Subject: [PATCH 28/49] Duplicated a test case and the cwltool function to allow for --no-data testing. The file is still created but remains empty at the moment --- tests/test_provenance.py | 59 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/tests/test_provenance.py b/tests/test_provenance.py index 4adb5e5c1..61c796b48 100644 --- a/tests/test_provenance.py +++ b/tests/test_provenance.py @@ -44,6 +44,18 @@ def cwltool(tmp_path: Path, *args: Any) -> Path: assert status == 0, f"Failed: cwltool.main({args})" return prov_folder +def cwltool_no_data(tmp_path: Path, *args: Any) -> Path: + prov_folder = tmp_path / "provenance" + prov_folder.mkdir() + new_args = ["--no-data", "--provenance", str(prov_folder)] + new_args.extend(args) + # Run within a temporary directory to not pollute git checkout + tmp_dir = tmp_path / "cwltool-run" + tmp_dir.mkdir() + with working_directory(tmp_dir): + status = main(new_args) + assert status == 0, f"Failed: cwltool.main({args})" + return prov_folder @needs_docker def test_hello_workflow(tmp_path: Path) -> None: @@ -191,6 +203,53 @@ def test_directory_workflow(tmp_path: Path) -> None: p = folder / "data" / prefix / l_hash assert p.is_file(), f"Could not find {l} as {p}" +@needs_docker +def test_directory_workflow_no_data(tmp_path: Path) -> None: + dir2 = tmp_path / "dir2" + dir2.mkdir() + sha1 = { + # Expected hashes of ASCII letters (no linefeed) + # as returned from: + # for x in a b c ; do echo -n $x | sha1sum ; done + "a": "86f7e437faa5a7fce15d1ddcb9eaeaea377667b8", + "b": "e9d71f5ee7c92d6dc9e92ffdad17b8bd49418f98", + "c": "84a516841ba77a5b4648de2cd0dfcb30ea46dbb4", + } + for x in "abc": + # Make test files with predictable hashes + with open(dir2 / x, "w", encoding="ascii") as f: + f.write(x) + + folder = cwltool_no_data(tmp_path, get_data("tests/wf/directory.cwl"), "--dir", str(dir2)) + # check invert? as there should be no data in there + # check_provenance(folder, directory=True) + + # Output should include ls stdout of filenames a b c on each line + file_list = ( + folder + / "data" + / "3c" + / "3ca69e8d6c234a469d16ac28a4a658c92267c423" + # checksum as returned from: + # echo -e "a\nb\nc" | sha1sum + # 3ca69e8d6c234a469d16ac28a4a658c92267c423 - + ) + # File should be empty and in the future not existing... + assert os.path.getsize(file_list.absolute()) == 0 + # To be discared when file really does not exist anymore + assert file_list.is_file() + + # Input files should be captured by hash value, + # even if they were inside a class: Directory + for (l, l_hash) in sha1.items(): + prefix = l_hash[:2] # first 2 letters + p = folder / "data" / prefix / l_hash + # File should be empty and in the future not existing... + assert os.path.getsize(p.absolute()) == 0 + # To be discared when file really does not exist anymore + assert p.is_file(), f"Could not find {l} as {p}" + + @needs_docker def test_no_data_files(tmp_path: Path) -> None: From cd0a4aff2c86a9eed8085752a900d229df728568 Mon Sep 17 00:00:00 2001 From: Jasper Koehorst Date: Tue, 6 Sep 2022 19:40:13 +0200 Subject: [PATCH 29/49] formatting --- tests/test_provenance.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/test_provenance.py b/tests/test_provenance.py index 61c796b48..9b3ef70fb 100644 --- a/tests/test_provenance.py +++ b/tests/test_provenance.py @@ -44,6 +44,7 @@ def cwltool(tmp_path: Path, *args: Any) -> Path: assert status == 0, f"Failed: cwltool.main({args})" return prov_folder + def cwltool_no_data(tmp_path: Path, *args: Any) -> Path: prov_folder = tmp_path / "provenance" prov_folder.mkdir() @@ -57,6 +58,7 @@ def cwltool_no_data(tmp_path: Path, *args: Any) -> Path: assert status == 0, f"Failed: cwltool.main({args})" return prov_folder + @needs_docker def test_hello_workflow(tmp_path: Path) -> None: check_provenance( @@ -203,6 +205,7 @@ def test_directory_workflow(tmp_path: Path) -> None: p = folder / "data" / prefix / l_hash assert p.is_file(), f"Could not find {l} as {p}" + @needs_docker def test_directory_workflow_no_data(tmp_path: Path) -> None: dir2 = tmp_path / "dir2" @@ -220,7 +223,9 @@ def test_directory_workflow_no_data(tmp_path: Path) -> None: with open(dir2 / x, "w", encoding="ascii") as f: f.write(x) - folder = cwltool_no_data(tmp_path, get_data("tests/wf/directory.cwl"), "--dir", str(dir2)) + folder = cwltool_no_data( + tmp_path, get_data("tests/wf/directory.cwl"), "--dir", str(dir2) + ) # check invert? as there should be no data in there # check_provenance(folder, directory=True) @@ -250,7 +255,6 @@ def test_directory_workflow_no_data(tmp_path: Path) -> None: assert p.is_file(), f"Could not find {l} as {p}" - @needs_docker def test_no_data_files(tmp_path: Path) -> None: folder = cwltool( From 50dac835a7df4b15bdc19a53e6efa7126340575a Mon Sep 17 00:00:00 2001 From: Jasper Koehorst Date: Wed, 7 Sep 2022 13:10:08 +0200 Subject: [PATCH 30/49] nolisting workflow and test added --- tests/test_provenance.py | 31 ++++++++++--- tests/wf/directory_no_listing.cwl | 73 +++++++++++++++++++++++++++++++ 2 files changed, 99 insertions(+), 5 deletions(-) create mode 100755 tests/wf/directory_no_listing.cwl diff --git a/tests/test_provenance.py b/tests/test_provenance.py index 9b3ef70fb..0a329a3de 100644 --- a/tests/test_provenance.py +++ b/tests/test_provenance.py @@ -48,7 +48,7 @@ def cwltool(tmp_path: Path, *args: Any) -> Path: def cwltool_no_data(tmp_path: Path, *args: Any) -> Path: prov_folder = tmp_path / "provenance" prov_folder.mkdir() - new_args = ["--no-data", "--provenance", str(prov_folder)] + new_args = ["--enable-ext", "--no-data", "--provenance", str(prov_folder)] new_args.extend(args) # Run within a temporary directory to not pollute git checkout tmp_dir = tmp_path / "cwltool-run" @@ -207,7 +207,7 @@ def test_directory_workflow(tmp_path: Path) -> None: @needs_docker -def test_directory_workflow_no_data(tmp_path: Path) -> None: +def test_directory_workflow_no_listing(tmp_path: Path) -> None: dir2 = tmp_path / "dir2" dir2.mkdir() sha1 = { @@ -223,8 +223,28 @@ def test_directory_workflow_no_data(tmp_path: Path) -> None: with open(dir2 / x, "w", encoding="ascii") as f: f.write(x) - folder = cwltool_no_data( - tmp_path, get_data("tests/wf/directory.cwl"), "--dir", str(dir2) + dir3 = tmp_path / "dirIgnore" + dir3.mkdir() + sha1 = { + # Expected hashes of ASCII letters (no linefeed) + # as returned from: + # for x in a b c ; do echo -n $x | sha1sum ; done + "d": "3c363836cf4e16666669a25da280a1865c2d2874", + "e": "58e6b3a414a1e090dfc6029add0f3555ccba127f", + "f": "4a0a19218e082a343a1b17e5333409af9d98f0f5", + } + for x in "def": + # Make test files with predictable hashes + with open(dir3 / x, "w", encoding="ascii") as f: + f.write(x) + + folder = cwltool( + tmp_path, + get_data("tests/wf/directory_no_listing.cwl"), + "--dir", + str(dir2), + "--ignore", + str(dir3), ) # check invert? as there should be no data in there # check_provenance(folder, directory=True) @@ -234,12 +254,13 @@ def test_directory_workflow_no_data(tmp_path: Path) -> None: folder / "data" / "3c" - / "3ca69e8d6c234a469d16ac28a4a658c92267c423" + / "3c363836cf4e16666669a25da280a1865c2d2874" # checksum as returned from: # echo -e "a\nb\nc" | sha1sum # 3ca69e8d6c234a469d16ac28a4a658c92267c423 - ) # File should be empty and in the future not existing... + print("FILE LIST: ", file_list.absolute()) assert os.path.getsize(file_list.absolute()) == 0 # To be discared when file really does not exist anymore assert file_list.is_file() diff --git a/tests/wf/directory_no_listing.cwl b/tests/wf/directory_no_listing.cwl new file mode 100755 index 000000000..19a376bf1 --- /dev/null +++ b/tests/wf/directory_no_listing.cwl @@ -0,0 +1,73 @@ +#!/usr/bin/env cwl-runner +cwlVersion: v1.2 +class: Workflow + +doc: > + Inspect provided directory and return filenames. + Generate a new directory and return it (including content). + +hints: + - class: DockerRequirement + dockerPull: docker.io/debian:stable-slim + +inputs: + dir: + type: Directory + ignore: + type: Directory + loadListing: no_listing + +steps: + ls: + in: + dir: dir + ignore: ignore + out: + [listing] + run: + class: CommandLineTool + baseCommand: ls + inputs: + dir: + type: Directory + inputBinding: + position: 1 + ignore: + type: Directory + inputBinding: + position: 2 + outputs: + listing: + type: stdout + + generate: + in: [] + out: + [dir1] + run: + class: CommandLineTool + requirements: + - class: ShellCommandRequirement + arguments: + - shellQuote: false + valueFrom: > + pwd; + mkdir -p dir1/a/b; + echo -n a > dir1/a.txt; + echo -n b > dir1/a/b.txt; + echo -n c > dir1/a/b/c.txt; + inputs: [] + outputs: + dir1: + type: Directory + outputBinding: + glob: "dir1" + +outputs: + listing: + type: File + outputSource: ls/listing + dir1: + type: Directory + outputSource: generate/dir1 + From d3048af62a27b0331dcdc938788d7915bb4bbcca Mon Sep 17 00:00:00 2001 From: Jasper Koehorst Date: Wed, 7 Sep 2022 13:13:18 +0200 Subject: [PATCH 31/49] with copy files but excluding a specific folder test --- cwltool/provenance_profile.py | 36 ++++++++++++++++++++++++++++++----- tests/test_provenance.py | 6 +++--- 2 files changed, 34 insertions(+), 8 deletions(-) diff --git a/cwltool/provenance_profile.py b/cwltool/provenance_profile.py index d192c343f..f926d8df3 100644 --- a/cwltool/provenance_profile.py +++ b/cwltool/provenance_profile.py @@ -25,6 +25,7 @@ from typing_extensions import TYPE_CHECKING import cwltool.workflow +from . import process from .errors import WorkflowException from .job import CommandLineJob, JobBase @@ -57,7 +58,7 @@ def copy_job_order( - job: Union[Process, JobsType], job_order_object: CWLObjectType + job: Union[Process, JobsType], job_order_object: CWLObjectType, process ) -> CWLObjectType: """Create copy of job object for provenance.""" if not isinstance(job, WorkflowJob): @@ -66,12 +67,34 @@ def copy_job_order( customised_job: CWLObjectType = {} # new job object for RO debug = _logger.isEnabledFor(logging.DEBUG) + # Process the process object first + load_listing = {} + + # Implementation to capture the loadlisting from cwl to skip the inclusion of for example files of big database + # folders + for index, entry in enumerate(process.inputs_record_schema["fields"]): + if ( + entry["type"] == "org.w3id.cwl.cwl.Directory" + and "loadListing" in entry + and entry["loadListing"] + ): + load_listing[entry["name"]] = entry["loadListing"] + + # print("LOAD LISTING: ", load_listing) + # PROCESS:Workflow: file:///Users/jasperk/gitlab/cwltool/tests/wf/directory_no_listing.cwl + # print("PROCESS:" + str(process)) + for each, i in enumerate(job.tool["inputs"]): with SourceLine(job.tool["inputs"], each, WorkflowException, debug): iid = shortname(i["id"]) + # if iid in the load listing object and no_listing then.... if iid in job_order_object: - customised_job[iid] = copy.deepcopy(job_order_object[iid]) - # add the input element in dictionary for provenance + if iid in load_listing and load_listing[iid] != "no_listing": + customised_job[iid] = copy.deepcopy(job_order_object[iid]) + # TODO Other listing options here? + else: + # add the input element in dictionary for provenance + customised_job[iid] = copy.deepcopy(job_order_object[iid]) elif "default" in i: customised_job[iid] = copy.deepcopy(i["default"]) # add the default elements in the dictionary for provenance @@ -246,13 +269,13 @@ def evaluate( if not hasattr(process, "steps"): # record provenance of independent commandline tool executions self.prospective_prov(job) - customised_job = copy_job_order(job, job_order_object) + customised_job = copy_job_order(job, job_order_object, process) self.used_artefacts(customised_job, self.workflow_run_uri) research_obj.create_job(customised_job) elif hasattr(job, "workflow"): # record provenance of workflow executions self.prospective_prov(job) - customised_job = copy_job_order(job, job_order_object) + customised_job = copy_job_order(job, job_order_object, process) self.used_artefacts(customised_job, self.workflow_run_uri) def record_process_start( @@ -472,8 +495,11 @@ def declare_directory(self, value: CWLObjectType) -> ProvEntity: # a later call to this method will sort that is_empty = True + # if value['basename'] == "dirIgnore": + # pass if "listing" not in value: get_listing(self.fsaccess, value) + for entry in cast(MutableSequence[CWLObjectType], value.get("listing", [])): is_empty = False # Declare child-artifacts diff --git a/tests/test_provenance.py b/tests/test_provenance.py index 0a329a3de..5b4150839 100644 --- a/tests/test_provenance.py +++ b/tests/test_provenance.py @@ -260,8 +260,8 @@ def test_directory_workflow_no_listing(tmp_path: Path) -> None: # 3ca69e8d6c234a469d16ac28a4a658c92267c423 - ) # File should be empty and in the future not existing... - print("FILE LIST: ", file_list.absolute()) - assert os.path.getsize(file_list.absolute()) == 0 + # print("FILE LIST: ", file_list.absolute()) + # assert os.path.getsize(file_list.absolute()) == 0 # To be discared when file really does not exist anymore assert file_list.is_file() @@ -271,7 +271,7 @@ def test_directory_workflow_no_listing(tmp_path: Path) -> None: prefix = l_hash[:2] # first 2 letters p = folder / "data" / prefix / l_hash # File should be empty and in the future not existing... - assert os.path.getsize(p.absolute()) == 0 + # assert os.path.getsize(p.absolute()) == 0 # To be discared when file really does not exist anymore assert p.is_file(), f"Could not find {l} as {p}" From ac532d4d2b2e37f1f88d07a392ff2fe4d832e34b Mon Sep 17 00:00:00 2001 From: Jasper Koehorst Date: Thu, 8 Sep 2022 14:53:58 +0200 Subject: [PATCH 32/49] working on load listing recognition for files and provenance --- cwltool/job.py | 3 +- cwltool/provenance_profile.py | 68 ++++++++++++++++++++++--------- cwltool/utils.py | 2 +- tests/test_provenance.py | 13 ++++-- tests/wf/directory_no_listing.cwl | 10 +++-- 5 files changed, 67 insertions(+), 29 deletions(-) diff --git a/cwltool/job.py b/cwltool/job.py index a327773c5..31eacde76 100644 --- a/cwltool/job.py +++ b/cwltool/job.py @@ -299,7 +299,7 @@ def _execute( and isinstance(job_order, (list, dict)) ): runtimeContext.prov_obj.used_artefacts( - job_order, runtimeContext.process_run_id, str(self.name) + job_order, runtimeContext.process_run_id, str(self.name), load_listing=self.builder.loadListing ) else: _logger.warning( @@ -426,6 +426,7 @@ def stderr_stdout_log_path( runtimeContext.process_run_id, outputs, datetime.datetime.now(), + builder.loadListing # TODO FIX THIS ) if processStatus != "success": _logger.warning("[job %s] completed %s", self.name, processStatus) diff --git a/cwltool/provenance_profile.py b/cwltool/provenance_profile.py index f926d8df3..0a5acdb5d 100644 --- a/cwltool/provenance_profile.py +++ b/cwltool/provenance_profile.py @@ -89,8 +89,15 @@ def copy_job_order( iid = shortname(i["id"]) # if iid in the load listing object and no_listing then.... if iid in job_order_object: - if iid in load_listing and load_listing[iid] != "no_listing": - customised_job[iid] = copy.deepcopy(job_order_object[iid]) + if iid in load_listing: + if load_listing[iid] == "no_listing": + _logger.warning("Skip listing of " + iid) + job_order_object[iid]['loadListing'] = 'no_listing' + job_order_object[iid]['listing'] = [] + customised_job[iid] = job_order_object[iid] + else: + # Normal deep copy + customised_job[iid] = copy.deepcopy(job_order_object[iid]) # TODO Other listing options here? else: # add the input element in dictionary for provenance @@ -270,13 +277,14 @@ def evaluate( # record provenance of independent commandline tool executions self.prospective_prov(job) customised_job = copy_job_order(job, job_order_object, process) - self.used_artefacts(customised_job, self.workflow_run_uri) + self.used_artefacts(customised_job, self.workflow_run_uri, job.builder.loadListing) research_obj.create_job(customised_job) elif hasattr(job, "workflow"): # record provenance of workflow executions self.prospective_prov(job) customised_job = copy_job_order(job, job_order_object, process) - self.used_artefacts(customised_job, self.workflow_run_uri) + self.used_artefacts(customised_job, self.workflow_run_uri, schema=process.inputs_record_schema) + def record_process_start( self, process: Process, job: JobsType, process_run_id: Optional[str] = None @@ -355,11 +363,12 @@ def record_process_end( process_run_id: str, outputs: Union[CWLObjectType, MutableSequence[CWLObjectType], None], when: datetime.datetime, + load_listing: str = "deep_listing", ) -> None: - self.generate_output_prov(outputs, process_run_id, process_name) + self.generate_output_prov(outputs, process_run_id, process_name, load_listing) self.document.wasEndedBy(process_run_id, None, self.workflow_run_uri, when) - def declare_file(self, value: CWLObjectType) -> Tuple[ProvEntity, ProvEntity, str]: + def declare_file(self, value: CWLObjectType, load_listing: str = "deep_listing") -> Tuple[ProvEntity, ProvEntity, str]: if value["class"] != "File": raise ValueError("Must have class:File: %s" % value) # Need to determine file hash aka RO filename @@ -436,9 +445,9 @@ def declare_file(self, value: CWLObjectType) -> Tuple[ProvEntity, ProvEntity, st ): # TODO: Record these in a specializationOf entity with UUID? if sec["class"] == "File": - (sec_entity, _, _) = self.declare_file(sec) + (sec_entity, _, _) = self.declare_file(sec, load_listing) elif sec["class"] == "Directory": - sec_entity = self.declare_directory(sec) + sec_entity = self.declare_directory(sec, load_listing) else: raise ValueError(f"Got unexpected secondaryFiles value: {sec}") # We don't know how/when/where the secondary file was generated, @@ -453,7 +462,7 @@ def declare_file(self, value: CWLObjectType) -> Tuple[ProvEntity, ProvEntity, st return file_entity, entity, checksum - def declare_directory(self, value: CWLObjectType) -> ProvEntity: + def declare_directory(self, value: CWLObjectType, load_listing: str = "deep_listing") -> ProvEntity: """Register any nested files/directories.""" # FIXME: Calculate a hash-like identifier for directory # so we get same value if it's the same filenames/hashes @@ -498,12 +507,19 @@ def declare_directory(self, value: CWLObjectType) -> ProvEntity: # if value['basename'] == "dirIgnore": # pass if "listing" not in value: - get_listing(self.fsaccess, value) + if load_listing == "no_listing": + pass + elif load_listing == "deep_listing": + get_listing(self.fsaccess, value) + elif load_listing == "shallow_listing": + get_listing(self.fsaccess, value, False) + else: + raise ValueError("Invalid listing value: %s", load_listing) for entry in cast(MutableSequence[CWLObjectType], value.get("listing", [])): is_empty = False # Declare child-artifacts - entity = self.declare_artefact(entry) + entity = self.declare_artefact(entry, load_listing) self.document.membership(coll, entity) # Membership relation aka our ORE Proxy m_id = uuid.uuid4().urn @@ -573,7 +589,7 @@ def declare_string(self, value: str) -> Tuple[ProvEntity, str]: ) return entity, checksum - def declare_artefact(self, value: Any) -> ProvEntity: + def declare_artefact(self, value: Any, load_listing: str = "deep_listing") -> ProvEntity: """Create data artefact entities for all file objects.""" if value is None: # FIXME: If this can happen in CWL, we'll @@ -615,12 +631,12 @@ def declare_artefact(self, value: Any) -> ProvEntity: # Base case - we found a File we need to update if value.get("class") == "File": - (entity, _, _) = self.declare_file(value) + (entity, _, _) = self.declare_file(value, load_listing) value["@id"] = entity.identifier.uri return entity if value.get("class") == "Directory": - entity = self.declare_directory(value) + entity = self.declare_directory(value, load_listing) value["@id"] = entity.identifier.uri return entity coll_id = value.setdefault("@id", uuid.uuid4().urn) @@ -643,7 +659,7 @@ def declare_artefact(self, value: Any) -> ProvEntity: # Let's iterate and recurse coll_attribs: List[Tuple[Union[str, Identifier], Any]] = [] for (key, val) in value.items(): - v_ent = self.declare_artefact(val) + v_ent = self.declare_artefact(val, load_listing) self.document.membership(coll, v_ent) m_entity = self.document.entity(uuid.uuid4().urn) # Note: only support PROV-O style dictionary @@ -664,7 +680,7 @@ def declare_artefact(self, value: Any) -> ProvEntity: members = [] for each_input_obj in iter(value): # Recurse and register any nested objects - e = self.declare_artefact(each_input_obj) + e = self.declare_artefact(each_input_obj, load_listing) members.append(e) # If we reached this, then we were allowed to iterate @@ -698,11 +714,16 @@ def used_artefacts( job_order: Union[CWLObjectType, List[CWLObjectType]], process_run_id: str, name: Optional[str] = None, + schema: Any = None, + load_listing: Optional[str] = None, ) -> None: """Add used() for each data artefact.""" if isinstance(job_order, list): for entry in job_order: - self.used_artefacts(entry, process_run_id, name) + # for field in schema.fields: + # if field['name'] == entry. + # load_listing = schema.fields + self.used_artefacts(entry, process_run_id, name, load_listing) else: # FIXME: Use workflow name in packed.cwl, "main" is wrong for nested workflows base = "main" @@ -710,8 +731,14 @@ def used_artefacts( base += "/" + name for key, value in job_order.items(): prov_role = self.wf_ns[f"{base}/{key}"] + if not load_listing: + load_listing = "deep_listing" + for field in schema['fields']: + if field['name'] == key: + load_listing = field['loadListing'] + break try: - entity = self.declare_artefact(value) + entity = self.declare_artefact(value, load_listing) self.document.used( process_run_id, entity, @@ -727,11 +754,12 @@ def generate_output_prov( final_output: Union[CWLObjectType, MutableSequence[CWLObjectType], None], process_run_id: Optional[str], name: Optional[str], + load_listing: str = "deep_listing" ) -> None: """Call wasGeneratedBy() for each output,copy the files into the RO.""" if isinstance(final_output, MutableSequence): for entry in final_output: - self.generate_output_prov(entry, process_run_id, name) + self.generate_output_prov(entry, process_run_id, name, load_listing) elif final_output is not None: # Timestamp should be created at the earliest timestamp = datetime.datetime.now() @@ -740,7 +768,7 @@ def generate_output_prov( # entity (UUID) and document it as generated in # a role corresponding to the output for output, value in final_output.items(): - entity = self.declare_artefact(value) + entity = self.declare_artefact(value, load_listing) if name is not None: name = urllib.parse.quote(str(name), safe=":/,#") # FIXME: Probably not "main" in nested workflows diff --git a/cwltool/utils.py b/cwltool/utils.py index abc5c587a..6b88779a0 100644 --- a/cwltool/utils.py +++ b/cwltool/utils.py @@ -97,7 +97,7 @@ ScatterOutputCallbackType = Callable[[Optional[ScatterDestinationsType], str], None] SinkType = Union[CWLOutputType, CWLObjectType] DirectoryType = TypedDict( - "DirectoryType", {"class": str, "listing": List[CWLObjectType], "basename": str} + "DirectoryType", {"class": str, "listing": List[CWLObjectType], "basename": str, "loadListing": str} ) JSONAtomType = Union[Dict[str, Any], List[Any], str, int, float, bool, None] JSONType = Union[ diff --git a/tests/test_provenance.py b/tests/test_provenance.py index 5b4150839..4cfd06827 100644 --- a/tests/test_provenance.py +++ b/tests/test_provenance.py @@ -208,6 +208,10 @@ def test_directory_workflow(tmp_path: Path) -> None: @needs_docker def test_directory_workflow_no_listing(tmp_path: Path) -> None: + """ + This test will check for 3 files that should be there and 3 files that should not be there. + @param tmp_path: + """ dir2 = tmp_path / "dir2" dir2.mkdir() sha1 = { @@ -253,8 +257,8 @@ def test_directory_workflow_no_listing(tmp_path: Path) -> None: file_list = ( folder / "data" - / "3c" - / "3c363836cf4e16666669a25da280a1865c2d2874" + / "84" + / "84a516841ba77a5b4648de2cd0dfcb30ea46dbb4" # checksum as returned from: # echo -e "a\nb\nc" | sha1sum # 3ca69e8d6c234a469d16ac28a4a658c92267c423 - @@ -273,8 +277,9 @@ def test_directory_workflow_no_listing(tmp_path: Path) -> None: # File should be empty and in the future not existing... # assert os.path.getsize(p.absolute()) == 0 # To be discared when file really does not exist anymore - assert p.is_file(), f"Could not find {l} as {p}" - + if l not in ['d', 'e', 'f']: + print("Analysing file %s", l) + assert p.is_file(), f"Could not find {l} as {p}" @needs_docker def test_no_data_files(tmp_path: Path) -> None: diff --git a/tests/wf/directory_no_listing.cwl b/tests/wf/directory_no_listing.cwl index 19a376bf1..58efc8071 100755 --- a/tests/wf/directory_no_listing.cwl +++ b/tests/wf/directory_no_listing.cwl @@ -13,6 +13,7 @@ hints: inputs: dir: type: Directory + loadListing: deep_listing ignore: type: Directory loadListing: no_listing @@ -47,7 +48,10 @@ steps: run: class: CommandLineTool requirements: - - class: ShellCommandRequirement + ShellCommandRequirement: {} + LoadListingRequirement: + loadListing: deep_listing + arguments: - shellQuote: false valueFrom: > @@ -64,10 +68,10 @@ steps: glob: "dir1" outputs: - listing: + output_1: type: File outputSource: ls/listing - dir1: + output_2: type: Directory outputSource: generate/dir1 From fb5a65ab04a8c28d827683327ee8016920dafbd4 Mon Sep 17 00:00:00 2001 From: Jasper Koehorst Date: Thu, 8 Sep 2022 15:16:30 +0200 Subject: [PATCH 33/49] expanded the test case, server testing showed a loadListing option not present in the field object. --- cwltool/provenance_profile.py | 7 +++++-- tests/test_provenance.py | 25 +++++++++++++++++++++---- tests/wf/directory_no_listing.cwl | 3 +++ 3 files changed, 29 insertions(+), 6 deletions(-) diff --git a/cwltool/provenance_profile.py b/cwltool/provenance_profile.py index 0a5acdb5d..848234919 100644 --- a/cwltool/provenance_profile.py +++ b/cwltool/provenance_profile.py @@ -735,8 +735,11 @@ def used_artefacts( load_listing = "deep_listing" for field in schema['fields']: if field['name'] == key: - load_listing = field['loadListing'] - break + if 'loadListing' in field: + load_listing = field['loadListing'] + break + else: + _logger.warning("No loadListing info in object") try: entity = self.declare_artefact(value, load_listing) self.document.used( diff --git a/tests/test_provenance.py b/tests/test_provenance.py index 4cfd06827..d7b3fccae 100644 --- a/tests/test_provenance.py +++ b/tests/test_provenance.py @@ -212,7 +212,7 @@ def test_directory_workflow_no_listing(tmp_path: Path) -> None: This test will check for 3 files that should be there and 3 files that should not be there. @param tmp_path: """ - dir2 = tmp_path / "dir2" + dir2 = tmp_path / "dir_deep_listing" dir2.mkdir() sha1 = { # Expected hashes of ASCII letters (no linefeed) @@ -227,12 +227,12 @@ def test_directory_workflow_no_listing(tmp_path: Path) -> None: with open(dir2 / x, "w", encoding="ascii") as f: f.write(x) - dir3 = tmp_path / "dirIgnore" + dir3 = tmp_path / "dir_no_listing" dir3.mkdir() sha1 = { # Expected hashes of ASCII letters (no linefeed) # as returned from: - # for x in a b c ; do echo -n $x | sha1sum ; done + # for x in d e f ; do echo -n $x | sha1sum ; done "d": "3c363836cf4e16666669a25da280a1865c2d2874", "e": "58e6b3a414a1e090dfc6029add0f3555ccba127f", "f": "4a0a19218e082a343a1b17e5333409af9d98f0f5", @@ -242,6 +242,21 @@ def test_directory_workflow_no_listing(tmp_path: Path) -> None: with open(dir3 / x, "w", encoding="ascii") as f: f.write(x) + dir4 = tmp_path / "dir_no_info" + dir4.mkdir() + sha1 = { + # Expected hashes of ASCII letters (no linefeed) + # as returned from: + # for x in g h i ; do echo -n $x | sha1sum ; done + "g": "54fd1711209fb1c0781092374132c66e79e2241b", + "h": "27d5482eebd075de44389774fce28c69f45c8a75", + "i": "042dc4512fa3d391c5170cf3aa61e6a638f84342", + } + for x in "def": + # Make test files with predictable hashes + with open(dir4 / x, "w", encoding="ascii") as f: + f.write(x) + folder = cwltool( tmp_path, get_data("tests/wf/directory_no_listing.cwl"), @@ -249,6 +264,8 @@ def test_directory_workflow_no_listing(tmp_path: Path) -> None: str(dir2), "--ignore", str(dir3), + "--ignore_no_info", + str(dir4), ) # check invert? as there should be no data in there # check_provenance(folder, directory=True) @@ -277,7 +294,7 @@ def test_directory_workflow_no_listing(tmp_path: Path) -> None: # File should be empty and in the future not existing... # assert os.path.getsize(p.absolute()) == 0 # To be discared when file really does not exist anymore - if l not in ['d', 'e', 'f']: + if l not in ['d', 'e', 'f', 'g', 'h', 'i']: print("Analysing file %s", l) assert p.is_file(), f"Could not find {l} as {p}" diff --git a/tests/wf/directory_no_listing.cwl b/tests/wf/directory_no_listing.cwl index 58efc8071..a8f32fca5 100755 --- a/tests/wf/directory_no_listing.cwl +++ b/tests/wf/directory_no_listing.cwl @@ -17,6 +17,9 @@ inputs: ignore: type: Directory loadListing: no_listing + ignore_no_info: + type: Directory + steps: ls: From 373b600a5ca750cc72d78ea09a2833f3954a1d30 Mon Sep 17 00:00:00 2001 From: Jasper Koehorst Date: Thu, 8 Sep 2022 15:32:14 +0200 Subject: [PATCH 34/49] issue with load listing field --- cwltool/provenance_profile.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cwltool/provenance_profile.py b/cwltool/provenance_profile.py index 848234919..8e78590f2 100644 --- a/cwltool/provenance_profile.py +++ b/cwltool/provenance_profile.py @@ -739,7 +739,9 @@ def used_artefacts( load_listing = field['loadListing'] break else: + # Need to find a way to reproduce this _logger.warning("No loadListing info in object") + load_listing = "no_listing" try: entity = self.declare_artefact(value, load_listing) self.document.used( From eb93204bb47c205a36267c5b1a39bda0d448efc9 Mon Sep 17 00:00:00 2001 From: Jasper Koehorst Date: Thu, 8 Sep 2022 15:35:47 +0200 Subject: [PATCH 35/49] unused import removal --- cwltool/provenance_profile.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/cwltool/provenance_profile.py b/cwltool/provenance_profile.py index 8e78590f2..e7071720e 100644 --- a/cwltool/provenance_profile.py +++ b/cwltool/provenance_profile.py @@ -25,8 +25,6 @@ from typing_extensions import TYPE_CHECKING import cwltool.workflow -from . import process - from .errors import WorkflowException from .job import CommandLineJob, JobBase from .loghandler import _logger From a4b26af76774cdb1e912b5813d7687c1b2a6a4ac Mon Sep 17 00:00:00 2001 From: Jasper Koehorst Date: Fri, 9 Sep 2022 13:09:23 +0200 Subject: [PATCH 36/49] show file name with debugger --- cwltool/provenance.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/cwltool/provenance.py b/cwltool/provenance.py index 0021e0016..ac3bdaa90 100644 --- a/cwltool/provenance.py +++ b/cwltool/provenance.py @@ -839,6 +839,10 @@ def add_data_file( ) # Inefficient, bagit support need to checksum again self._add_to_bagit(rel_path) + if 'dir' in self.relativised_input_object: + _logger.debug("[provenance] Directory :%s", self.relativised_input_object['dir']['basename']) + else: + _logger.debug("[provenance] File: %s", from_fp.name) _logger.debug("[provenance] Added data file %s", path) if timestamp is not None: createdOn, createdBy = self._self_made(timestamp) From 95c2c63a0de1755e786205981024d8584c956634 Mon Sep 17 00:00:00 2001 From: Jasper Koehorst Date: Fri, 9 Sep 2022 13:18:58 +0200 Subject: [PATCH 37/49] from_fp does not always carry name --- cwltool/provenance.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cwltool/provenance.py b/cwltool/provenance.py index ac3bdaa90..82f0930c4 100644 --- a/cwltool/provenance.py +++ b/cwltool/provenance.py @@ -842,7 +842,7 @@ def add_data_file( if 'dir' in self.relativised_input_object: _logger.debug("[provenance] Directory :%s", self.relativised_input_object['dir']['basename']) else: - _logger.debug("[provenance] File: %s", from_fp.name) + _logger.debug("[provenance] File: %s", str(from_fp)) _logger.debug("[provenance] Added data file %s", path) if timestamp is not None: createdOn, createdBy = self._self_made(timestamp) From 401918e650bfda1ca89cfb409cb9757769743f1b Mon Sep 17 00:00:00 2001 From: Jasper Koehorst Date: Tue, 20 Sep 2022 09:03:52 +0200 Subject: [PATCH 38/49] testing to print stacktrace to identify path to print file --- cwltool/provenance.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cwltool/provenance.py b/cwltool/provenance.py index 82f0930c4..b0f699e60 100644 --- a/cwltool/provenance.py +++ b/cwltool/provenance.py @@ -8,6 +8,7 @@ import re import shutil import tempfile +import traceback import uuid from array import array from collections import OrderedDict @@ -842,7 +843,9 @@ def add_data_file( if 'dir' in self.relativised_input_object: _logger.debug("[provenance] Directory :%s", self.relativised_input_object['dir']['basename']) else: + # This still shows up when no_listing _logger.debug("[provenance] File: %s", str(from_fp)) + traceback.print_stack() _logger.debug("[provenance] Added data file %s", path) if timestamp is not None: createdOn, createdBy = self._self_made(timestamp) From 6fe74f30a4379acec5f1600a8af10733dad18514 Mon Sep 17 00:00:00 2001 From: Jasper Koehorst Date: Tue, 20 Sep 2022 09:17:56 +0200 Subject: [PATCH 39/49] check listing value --- cwltool/provenance_profile.py | 1 + 1 file changed, 1 insertion(+) diff --git a/cwltool/provenance_profile.py b/cwltool/provenance_profile.py index e7071720e..f7fbf1673 100644 --- a/cwltool/provenance_profile.py +++ b/cwltool/provenance_profile.py @@ -367,6 +367,7 @@ def record_process_end( self.document.wasEndedBy(process_run_id, None, self.workflow_run_uri, when) def declare_file(self, value: CWLObjectType, load_listing: str = "deep_listing") -> Tuple[ProvEntity, ProvEntity, str]: + print("What listing? " + load_listing) if value["class"] != "File": raise ValueError("Must have class:File: %s" % value) # Need to determine file hash aka RO filename From 7f370bb64a38a7f6ed54b28233f45ca514c835f4 Mon Sep 17 00:00:00 2001 From: Jasper Koehorst Date: Tue, 20 Sep 2022 10:11:58 +0200 Subject: [PATCH 40/49] change default to invalid_listing --- cwltool/provenance_profile.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/cwltool/provenance_profile.py b/cwltool/provenance_profile.py index f7fbf1673..8a7ed9225 100644 --- a/cwltool/provenance_profile.py +++ b/cwltool/provenance_profile.py @@ -361,12 +361,12 @@ def record_process_end( process_run_id: str, outputs: Union[CWLObjectType, MutableSequence[CWLObjectType], None], when: datetime.datetime, - load_listing: str = "deep_listing", + load_listing: str = "invalid_listing", ) -> None: self.generate_output_prov(outputs, process_run_id, process_name, load_listing) self.document.wasEndedBy(process_run_id, None, self.workflow_run_uri, when) - def declare_file(self, value: CWLObjectType, load_listing: str = "deep_listing") -> Tuple[ProvEntity, ProvEntity, str]: + def declare_file(self, value: CWLObjectType, load_listing: str = "invalid_listing") -> Tuple[ProvEntity, ProvEntity, str]: print("What listing? " + load_listing) if value["class"] != "File": raise ValueError("Must have class:File: %s" % value) @@ -444,8 +444,10 @@ def declare_file(self, value: CWLObjectType, load_listing: str = "deep_listing") ): # TODO: Record these in a specializationOf entity with UUID? if sec["class"] == "File": + _logger.debug("447: " + load_listing) (sec_entity, _, _) = self.declare_file(sec, load_listing) elif sec["class"] == "Directory": + _logger.debug("450: " + load_listing) sec_entity = self.declare_directory(sec, load_listing) else: raise ValueError(f"Got unexpected secondaryFiles value: {sec}") @@ -461,7 +463,7 @@ def declare_file(self, value: CWLObjectType, load_listing: str = "deep_listing") return file_entity, entity, checksum - def declare_directory(self, value: CWLObjectType, load_listing: str = "deep_listing") -> ProvEntity: + def declare_directory(self, value: CWLObjectType, load_listing: str = "invalid_listing") -> ProvEntity: """Register any nested files/directories.""" # FIXME: Calculate a hash-like identifier for directory # so we get same value if it's the same filenames/hashes @@ -518,6 +520,7 @@ def declare_directory(self, value: CWLObjectType, load_listing: str = "deep_list for entry in cast(MutableSequence[CWLObjectType], value.get("listing", [])): is_empty = False # Declare child-artifacts + _logger.debug("523: " + load_listing) entity = self.declare_artefact(entry, load_listing) self.document.membership(coll, entity) # Membership relation aka our ORE Proxy @@ -588,7 +591,7 @@ def declare_string(self, value: str) -> Tuple[ProvEntity, str]: ) return entity, checksum - def declare_artefact(self, value: Any, load_listing: str = "deep_listing") -> ProvEntity: + def declare_artefact(self, value: Any, load_listing: str = "invalid_listing") -> ProvEntity: """Create data artefact entities for all file objects.""" if value is None: # FIXME: If this can happen in CWL, we'll @@ -630,6 +633,7 @@ def declare_artefact(self, value: Any, load_listing: str = "deep_listing") -> Pr # Base case - we found a File we need to update if value.get("class") == "File": + _logger.debug("635: " + load_listing) (entity, _, _) = self.declare_file(value, load_listing) value["@id"] = entity.identifier.uri return entity @@ -758,7 +762,7 @@ def generate_output_prov( final_output: Union[CWLObjectType, MutableSequence[CWLObjectType], None], process_run_id: Optional[str], name: Optional[str], - load_listing: str = "deep_listing" + load_listing: str = "invalid_listing" ) -> None: """Call wasGeneratedBy() for each output,copy the files into the RO.""" if isinstance(final_output, MutableSequence): From 26fec21a89997ac5c4aadfbc97310ec3f23d992c Mon Sep 17 00:00:00 2001 From: Jasper Koehorst Date: Tue, 20 Sep 2022 12:19:45 +0200 Subject: [PATCH 41/49] debugging in progress --- cwltool/executors.py | 2 +- cwltool/provenance_profile.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cwltool/executors.py b/cwltool/executors.py index 02f5480ce..0e1904f7e 100644 --- a/cwltool/executors.py +++ b/cwltool/executors.py @@ -179,7 +179,7 @@ def check_for_abstract_op(tool: CWLObjectType) -> None: process_run_id = None # type: Optional[str] name = "primary" process.parent_wf.generate_output_prov( - self.final_output[0], process_run_id, name + self.final_output[0], process_run_id, name, "generate_output_prov" ) process.parent_wf.document.wasEndedBy( process.parent_wf.workflow_run_uri, diff --git a/cwltool/provenance_profile.py b/cwltool/provenance_profile.py index 8a7ed9225..2b23b1f18 100644 --- a/cwltool/provenance_profile.py +++ b/cwltool/provenance_profile.py @@ -367,7 +367,7 @@ def record_process_end( self.document.wasEndedBy(process_run_id, None, self.workflow_run_uri, when) def declare_file(self, value: CWLObjectType, load_listing: str = "invalid_listing") -> Tuple[ProvEntity, ProvEntity, str]: - print("What listing? " + load_listing) + _logger.debug("What listing? " + load_listing) if value["class"] != "File": raise ValueError("Must have class:File: %s" % value) # Need to determine file hash aka RO filename From d01a0df0d00945c8a8ce63aa13e0e31dd9c39225 Mon Sep 17 00:00:00 2001 From: Jasper Koehorst Date: Wed, 12 Oct 2022 08:31:16 +0200 Subject: [PATCH 42/49] trace in debug --- cwltool/provenance.py | 2 +- tests/wf/directory_no_listing_nested.cwl | 81 ++++++++++++++++++++++++ 2 files changed, 82 insertions(+), 1 deletion(-) create mode 100755 tests/wf/directory_no_listing_nested.cwl diff --git a/cwltool/provenance.py b/cwltool/provenance.py index b0f699e60..84830a1d1 100644 --- a/cwltool/provenance.py +++ b/cwltool/provenance.py @@ -845,7 +845,7 @@ def add_data_file( else: # This still shows up when no_listing _logger.debug("[provenance] File: %s", str(from_fp)) - traceback.print_stack() + _logger.debug(traceback.print_stack()) _logger.debug("[provenance] Added data file %s", path) if timestamp is not None: createdOn, createdBy = self._self_made(timestamp) diff --git a/tests/wf/directory_no_listing_nested.cwl b/tests/wf/directory_no_listing_nested.cwl new file mode 100755 index 000000000..25e25008d --- /dev/null +++ b/tests/wf/directory_no_listing_nested.cwl @@ -0,0 +1,81 @@ +#!/usr/bin/env cwl-runner +cwlVersion: v1.2 +class: Workflow + +doc: > + Inspect provided directory and return filenames. + Generate a new directory and return it (including content). + +hints: + - class: DockerRequirement + dockerPull: docker.io/debian:stable-slim + +inputs: + dir: + type: Directory? + loadListing: no_listing + ignore: + type: Directory? + loadListing: no_listing + ignore_no_info: + type: Directory? + loadListing: no_listing + + +steps: + ls: + in: + dir: dir + ignore: ignore + out: + [listing] + run: + class: CommandLineTool + baseCommand: ls + inputs: + dir: + type: Directory + inputBinding: + position: 1 + ignore: + type: Directory + inputBinding: + position: 2 + outputs: + listing: + type: stdout + + generate: + in: [] + out: + [dir1] + run: + class: CommandLineTool + requirements: + ShellCommandRequirement: {} + LoadListingRequirement: + loadListing: deep_listing + + arguments: + - shellQuote: false + valueFrom: > + pwd; + mkdir -p dir1/a/b; + echo -n a > dir1/a.txt; + echo -n b > dir1/a/b.txt; + echo -n c > dir1/a/b/c.txt; + inputs: [] + outputs: + dir1: + type: Directory + outputBinding: + glob: "dir1" + +outputs: + output_1: + type: File + outputSource: ls/listing + output_2: + type: Directory + outputSource: generate/dir1 + From c15156b6913f9859ab2d528701d61b7f9e6c7e7a Mon Sep 17 00:00:00 2001 From: Jasper Koehorst Date: Wed, 12 Oct 2022 10:26:36 +0200 Subject: [PATCH 43/49] stack trace only at debug level --- cwltool/provenance.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cwltool/provenance.py b/cwltool/provenance.py index 84830a1d1..a4f466c2c 100644 --- a/cwltool/provenance.py +++ b/cwltool/provenance.py @@ -844,7 +844,9 @@ def add_data_file( _logger.debug("[provenance] Directory :%s", self.relativised_input_object['dir']['basename']) else: # This still shows up when no_listing - _logger.debug("[provenance] File: %s", str(from_fp)) + if _logger.DEBUG >= _logger.root.level: + _logger.debug("[provenance] File: %s", str(from_fp)) + # If debug is enabled? _logger.debug(traceback.print_stack()) _logger.debug("[provenance] Added data file %s", path) if timestamp is not None: From 315e78fe13efd39e213cd4da4747012171e713d2 Mon Sep 17 00:00:00 2001 From: Jasper Koehorst Date: Wed, 12 Oct 2022 10:30:24 +0200 Subject: [PATCH 44/49] stacktrace disabled --- cwltool/provenance.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/cwltool/provenance.py b/cwltool/provenance.py index a4f466c2c..593bbf4db 100644 --- a/cwltool/provenance.py +++ b/cwltool/provenance.py @@ -843,11 +843,9 @@ def add_data_file( if 'dir' in self.relativised_input_object: _logger.debug("[provenance] Directory :%s", self.relativised_input_object['dir']['basename']) else: - # This still shows up when no_listing - if _logger.DEBUG >= _logger.root.level: - _logger.debug("[provenance] File: %s", str(from_fp)) + _logger.debug("[provenance] File: %s", str(from_fp)) # If debug is enabled? - _logger.debug(traceback.print_stack()) + # _logger.debug(traceback.print_stack()) _logger.debug("[provenance] Added data file %s", path) if timestamp is not None: createdOn, createdBy = self._self_made(timestamp) From cad4896734922b8473c5867e69437bad57e6168b Mon Sep 17 00:00:00 2001 From: Jasper Koehorst Date: Wed, 16 Aug 2023 21:18:38 +0200 Subject: [PATCH 45/49] formatting --- cwltool/cwlprov/provenance_profile.py | 39 ++++++++++++++------------- cwltool/cwlprov/ro.py | 11 ++++---- cwltool/job.py | 7 +++-- cwltool/main.py | 2 +- cwltool/utils.py | 3 ++- tests/test_provenance.py | 5 ++-- 6 files changed, 36 insertions(+), 31 deletions(-) diff --git a/cwltool/cwlprov/provenance_profile.py b/cwltool/cwlprov/provenance_profile.py index 24bbd7a9b..dd043d49e 100644 --- a/cwltool/cwlprov/provenance_profile.py +++ b/cwltool/cwlprov/provenance_profile.py @@ -88,8 +88,8 @@ def copy_job_order(job: Union[Process, JobsType], job_order_object: CWLObjectTyp if iid in load_listing: if load_listing[iid] == "no_listing": _logger.warning("Skip listing of " + iid) - job_order_object[iid]['loadListing'] = 'no_listing' - job_order_object[iid]['listing'] = [] + job_order_object[iid]["loadListing"] = "no_listing" + job_order_object[iid]["listing"] = [] customised_job[iid] = job_order_object[iid] else: # Normal deep copy @@ -272,17 +272,16 @@ def evaluate( # record provenance of workflow executions self.prospective_prov(job) customised_job = copy_job_order(job, job_order_object, process) - self.used_artefacts(customised_job, self.workflow_run_uri, schema=process.inputs_record_schema) - + self.used_artefacts( + customised_job, self.workflow_run_uri, schema=process.inputs_record_schema + ) def record_process_start( self, process: Process, job: JobsType, process_run_id: Optional[str] = None ) -> Optional[str]: if not hasattr(process, "steps"): process_run_id = self.workflow_run_uri - elif not hasattr(job, "workflow") and isinstance( - process, cwltool.workflow.Workflow - ): + elif not hasattr(job, "workflow") and isinstance(process, cwltool.workflow.Workflow): # commandline tool execution as part of workflow name = "" if isinstance(job, (CommandLineJob, JobBase, WorkflowJob)): @@ -296,9 +295,7 @@ def record_process_start( if step is None: raise Exception("No / wrong step detected...!") - process_run_id = self.start_process( - step.id, process_name, datetime.datetime.now() - ) + process_run_id = self.start_process(step.id, process_name, datetime.datetime.now()) return process_run_id def start_process( @@ -355,7 +352,9 @@ def record_process_end( self.generate_output_prov(outputs, process_run_id, process_name, load_listing) self.document.wasEndedBy(process_run_id, None, self.workflow_run_uri, when) - def declare_file(self, value: CWLObjectType, load_listing: str = "invalid_listing") -> Tuple[ProvEntity, ProvEntity, str]: + def declare_file( + self, value: CWLObjectType, load_listing: str = "invalid_listing" + ) -> Tuple[ProvEntity, ProvEntity, str]: _logger.debug("What listing? " + load_listing) if value["class"] != "File": raise ValueError("Must have class:File: %s" % value) @@ -429,7 +428,9 @@ def declare_file(self, value: CWLObjectType, load_listing: str = "invalid_listin return file_entity, entity, checksum - def declare_directory(self, value: CWLObjectType, load_listing: str = "invalid_listing") -> ProvEntity: + def declare_directory( + self, value: CWLObjectType, load_listing: str = "invalid_listing" + ) -> ProvEntity: """Register any nested files/directories.""" # FIXME: Calculate a hash-like identifier for directory # so we get same value if it's the same filenames/hashes @@ -690,8 +691,8 @@ def used_artefacts( if isinstance(job_order, list): for entry in job_order: # for field in schema.fields: - # if field['name'] == entry. - # load_listing = schema.fields + # if field['name'] == entry. + # load_listing = schema.fields self.used_artefacts(entry, process_run_id, name, load_listing) else: # FIXME: Use workflow name in packed.cwl, "main" is wrong for nested workflows @@ -702,10 +703,10 @@ def used_artefacts( prov_role = self.wf_ns[f"{base}/{key}"] if not load_listing: load_listing = "deep_listing" - for field in schema['fields']: - if field['name'] == key: - if 'loadListing' in field: - load_listing = field['loadListing'] + for field in schema["fields"]: + if field["name"] == key: + if "loadListing" in field: + load_listing = field["loadListing"] break else: # Need to find a way to reproduce this @@ -728,7 +729,7 @@ def generate_output_prov( final_output: Union[CWLObjectType, MutableSequence[CWLObjectType], None], process_run_id: Optional[str], name: Optional[str], - load_listing: str = "invalid_listing" + load_listing: str = "invalid_listing", ) -> None: """Call wasGeneratedBy() for each output,copy the files into the RO.""" if isinstance(final_output, MutableSequence): diff --git a/cwltool/cwlprov/ro.py b/cwltool/cwlprov/ro.py index 7884f5f49..f2bb9cf74 100644 --- a/cwltool/cwlprov/ro.py +++ b/cwltool/cwlprov/ro.py @@ -478,9 +478,7 @@ def add_data_file( """Copy inputs to data/ folder.""" self.self_check() tmp_dir, tmp_prefix = os.path.split(self.temp_prefix) - with tempfile.NamedTemporaryFile( - prefix=tmp_prefix, dir=tmp_dir, delete=False - ) as tmp: + with tempfile.NamedTemporaryFile(prefix=tmp_prefix, dir=tmp_dir, delete=False) as tmp: # TODO this should depend on the arguments if self.no_data: checksum = checksum_only(from_fp) @@ -506,8 +504,10 @@ def add_data_file( _logger.warning("[provenance] Unknown hash method %s for bagit manifest", Hasher) # Inefficient, bagit support need to checksum again self._add_to_bagit(rel_path) - if 'dir' in self.relativised_input_object: - _logger.debug("[provenance] Directory :%s", self.relativised_input_object['dir']['basename']) + if "dir" in self.relativised_input_object: + _logger.debug( + "[provenance] Directory :%s", self.relativised_input_object["dir"]["basename"] + ) else: _logger.debug("[provenance] File: %s", str(from_fp)) # If debug is enabled? @@ -634,7 +634,6 @@ def _relativise_files( for obj in structure: # Recurse and rewrite any nested File objects self._relativise_files(cast(CWLOutputType, obj)) -<<<<<<< HEAD:cwltool/provenance.py def close(self, save_to: Optional[str] = None) -> None: """Close the Research Object, optionally saving to specified folder. diff --git a/cwltool/job.py b/cwltool/job.py index dd4e177a0..05e14900c 100644 --- a/cwltool/job.py +++ b/cwltool/job.py @@ -285,7 +285,10 @@ def _execute( and isinstance(job_order, (list, dict)) ): runtimeContext.prov_obj.used_artefacts( - job_order, runtimeContext.process_run_id, str(self.name), load_listing=self.builder.loadListing + job_order, + runtimeContext.process_run_id, + str(self.name), + load_listing=self.builder.loadListing, ) else: _logger.warning( @@ -411,7 +414,7 @@ def stderr_stdout_log_path( runtimeContext.process_run_id, outputs, datetime.datetime.now(), - builder.loadListing # TODO FIX THIS + builder.loadListing, # TODO FIX THIS ) if processStatus != "success": _logger.warning("[job %s] completed %s", self.name, processStatus) diff --git a/cwltool/main.py b/cwltool/main.py index 560c003e8..b9f06fd18 100755 --- a/cwltool/main.py +++ b/cwltool/main.py @@ -16,7 +16,7 @@ import time import urllib import warnings -from codecs import + from typing import ( IO, Any, diff --git a/cwltool/utils.py b/cwltool/utils.py index 10530c102..a81caf8e5 100644 --- a/cwltool/utils.py +++ b/cwltool/utils.py @@ -96,7 +96,8 @@ ScatterOutputCallbackType = Callable[[Optional[ScatterDestinationsType], str], None] SinkType = Union[CWLOutputType, CWLObjectType] DirectoryType = TypedDict( - "DirectoryType", {"class": str, "listing": List[CWLObjectType], "basename": str, "loadListing": str} + "DirectoryType", + {"class": str, "listing": List[CWLObjectType], "basename": str, "loadListing": str}, ) JSONAtomType = Union[Dict[str, Any], List[Any], str, int, float, bool, None] JSONType = Union[Dict[str, JSONAtomType], List[JSONAtomType], str, int, float, bool, None] diff --git a/tests/test_provenance.py b/tests/test_provenance.py index 66ae995a5..10e590ab7 100644 --- a/tests/test_provenance.py +++ b/tests/test_provenance.py @@ -303,16 +303,17 @@ def test_directory_workflow_no_listing(tmp_path: Path) -> None: # Input files should be captured by hash value, # even if they were inside a class: Directory - for (l, l_hash) in sha1.items(): + for l, l_hash in sha1.items(): prefix = l_hash[:2] # first 2 letters p = folder / "data" / prefix / l_hash # File should be empty and in the future not existing... # assert os.path.getsize(p.absolute()) == 0 # To be discared when file really does not exist anymore - if l not in ['d', 'e', 'f', 'g', 'h', 'i']: + if l not in ["d", "e", "f", "g", "h", "i"]: print("Analysing file %s", l) assert p.is_file(), f"Could not find {l} as {p}" + @needs_docker def test_no_data_files(tmp_path: Path) -> None: folder = cwltool( From b9308425d476803bfb577fdf07f667eca00faa3a Mon Sep 17 00:00:00 2001 From: Jasper Koehorst Date: Wed, 16 Aug 2023 21:19:08 +0200 Subject: [PATCH 46/49] sort imports --- cwltool/main.py | 1 - mypy-stubs/prov/model.pyi | 1 - 2 files changed, 2 deletions(-) diff --git a/cwltool/main.py b/cwltool/main.py index b9f06fd18..2ae9f9f6a 100755 --- a/cwltool/main.py +++ b/cwltool/main.py @@ -16,7 +16,6 @@ import time import urllib import warnings - from typing import ( IO, Any, diff --git a/mypy-stubs/prov/model.pyi b/mypy-stubs/prov/model.pyi index ee2688a4d..cc768a6b4 100644 --- a/mypy-stubs/prov/model.pyi +++ b/mypy-stubs/prov/model.pyi @@ -3,7 +3,6 @@ from typing import IO, Any, Dict, Iterable, List, Set, Tuple from _typeshed import Incomplete from prov.constants import * - # from prov import Error as Error, serializers as serializers from prov.identifier import Identifier, Namespace, QualifiedName From aa0054ee585ca938dfbf740c99bf7a2498c44953 Mon Sep 17 00:00:00 2001 From: Jasper Koehorst Date: Thu, 17 Aug 2023 06:57:16 +0200 Subject: [PATCH 47/49] No warnings test --- cwltool/argparser.py | 1 + cwltool/cwlprov/ro.py | 1 - cwltool/loghandler.py | 3 +++ cwltool/main.py | 1 - tests/test_js_sandbox.py | 2 +- tests/test_load_tool.py | 2 +- 6 files changed, 6 insertions(+), 4 deletions(-) diff --git a/cwltool/argparser.py b/cwltool/argparser.py index 332f18d86..90629d5a1 100644 --- a/cwltool/argparser.py +++ b/cwltool/argparser.py @@ -385,6 +385,7 @@ def arg_parser() -> argparse.ArgumentParser: volumegroup = parser.add_mutually_exclusive_group() volumegroup.add_argument("--verbose", action="store_true", help="Default logging") + volumegroup.add_argument("--no-warnings", action="store_true", help="Only print errors.") volumegroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.") volumegroup.add_argument("--debug", action="store_true", help="Print even more logging") diff --git a/cwltool/cwlprov/ro.py b/cwltool/cwlprov/ro.py index f2bb9cf74..87c060cdc 100644 --- a/cwltool/cwlprov/ro.py +++ b/cwltool/cwlprov/ro.py @@ -5,7 +5,6 @@ import os import shutil import tempfile -import traceback import uuid from pathlib import Path, PurePosixPath from typing import ( diff --git a/cwltool/loghandler.py b/cwltool/loghandler.py index c1f451991..7ef71206c 100644 --- a/cwltool/loghandler.py +++ b/cwltool/loghandler.py @@ -11,6 +11,7 @@ def configure_logging( stderr_handler: logging.Handler, + no_warnings: bool, quiet: bool, debug: bool, enable_color: bool, @@ -21,6 +22,8 @@ def configure_logging( rdflib_logger = logging.getLogger("rdflib.term") rdflib_logger.addHandler(stderr_handler) rdflib_logger.setLevel(logging.ERROR) + if no_warnings: + stderr_handler.setLevel(logging.ERROR) if quiet: # Silence STDERR, not an eventual provenance log file stderr_handler.setLevel(logging.WARN) diff --git a/cwltool/main.py b/cwltool/main.py index 2ae9f9f6a..4d536a679 100755 --- a/cwltool/main.py +++ b/cwltool/main.py @@ -8,7 +8,6 @@ import io import logging import os -import shutil import signal import subprocess # nosec import sys diff --git a/tests/test_js_sandbox.py b/tests/test_js_sandbox.py index 9739c77a7..98b4cc115 100644 --- a/tests/test_js_sandbox.py +++ b/tests/test_js_sandbox.py @@ -22,7 +22,7 @@ ("v7.7.3\n", True), ] -configure_logging(_logger.handlers[-1], False, True, True, True) +configure_logging(_logger.handlers[-1], False, False, True, True, True) _logger.setLevel(logging.DEBUG) diff --git a/tests/test_load_tool.py b/tests/test_load_tool.py index 3d0cba161..f291d2985 100644 --- a/tests/test_load_tool.py +++ b/tests/test_load_tool.py @@ -15,7 +15,7 @@ from .util import get_data -configure_logging(_logger.handlers[-1], False, True, True, True) +configure_logging(_logger.handlers[-1], False, False, True, True, True) _logger.setLevel(logging.DEBUG) From 87946a301dd15a6303088faea9598bf0f448de0c Mon Sep 17 00:00:00 2001 From: Jasper Koehorst Date: Thu, 17 Aug 2023 07:01:10 +0200 Subject: [PATCH 48/49] missed one attribute --- cwltool/main.py | 1 + 1 file changed, 1 insertion(+) diff --git a/cwltool/main.py b/cwltool/main.py index 4d536a679..dadb87bfa 100755 --- a/cwltool/main.py +++ b/cwltool/main.py @@ -1015,6 +1015,7 @@ def main( configure_logging( stderr_handler, + args.no_warnings, args.quiet, runtimeContext.debug, args.enable_color, From 420dd1c0a9102e56dc85d8cb0116ac593d3aae5c Mon Sep 17 00:00:00 2001 From: Jasper Koehorst Date: Thu, 17 Aug 2023 08:20:49 +0200 Subject: [PATCH 49/49] work in progress to fix the main merge --- cwltool/cwlprov/provenance_profile.py | 6 ++++-- cwltool/cwlprov/ro.py | 3 +++ 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/cwltool/cwlprov/provenance_profile.py b/cwltool/cwlprov/provenance_profile.py index dd043d49e..bd1c2fc02 100644 --- a/cwltool/cwlprov/provenance_profile.py +++ b/cwltool/cwlprov/provenance_profile.py @@ -24,6 +24,8 @@ from prov.model import PROV, PROV_LABEL, PROV_TYPE, PROV_VALUE, ProvDocument, ProvEntity from schema_salad.sourceline import SourceLine +import cwltool.workflow + from ..errors import WorkflowException from ..job import CommandLineJob, JobBase from ..loghandler import _logger @@ -55,7 +57,7 @@ from .ro import ResearchObject -def copy_job_order(job: Union[Process, JobsType], job_order_object: CWLObjectType) -> CWLObjectType: +def copy_job_order(job: Union[Process, JobsType], job_order_object: CWLObjectType, process) -> CWLObjectType: """Create copy of job object for provenance.""" if not isinstance(job, WorkflowJob): # direct command line tool execution @@ -265,7 +267,7 @@ def evaluate( if not hasattr(process, "steps"): # record provenance of independent commandline tool executions self.prospective_prov(job) - customised_job = copy_job_order(job, job_order_object) + customised_job = copy_job_order(job, job_order_object, process) self.used_artefacts(customised_job, self.workflow_run_uri) create_job(research_obj, customised_job) elif hasattr(job, "workflow"): diff --git a/cwltool/cwlprov/ro.py b/cwltool/cwlprov/ro.py index 87c060cdc..62b8cd61c 100644 --- a/cwltool/cwlprov/ro.py +++ b/cwltool/cwlprov/ro.py @@ -93,6 +93,9 @@ def __init__( self._initialize() _logger.debug("[provenance] Temporary research object: %s", self.folder) + # No data option + self.no_data = False + def self_check(self) -> None: """Raise ValueError if this RO is closed.""" if self.closed: