diff --git a/case_utils/case_validate/__init__.py b/case_utils/case_validate/__init__.py index 6deaaa0..f7db9ab 100644 --- a/case_utils/case_validate/__init__.py +++ b/case_utils/case_validate/__init__.py @@ -71,6 +71,104 @@ def concept_is_cdo_concept(n_concept: rdflib.URIRef) -> bool: ) or concept_iri.startswith("https://ontology.caseontology.org/") +def get_invalid_cdo_concepts( + data_graph: rdflib.Graph, ontology_graph: rdflib.Graph +) -> Set[rdflib.URIRef]: + """ + Get the set of concepts in the data graph that are not part of the CDO ontologies as specified with the ontology_graph argument. + + :param data_graph: The data graph to validate. + :param ontology_graph: The ontology graph to use for validation. + :return: The list of concepts in the data graph that are not part of the CDO ontology. + + >>> from case_utils.namespace import NS_RDF, NS_OWL, NS_UCO_CORE + >>> from rdflib import Graph, Literal, Namespace, URIRef + >>> # Define a namespace for a knowledge base, and a namespace for custom extensions. + >>> ns_kb = Namespace("http://example.org/kb/") + >>> ns_ex = Namespace("http://example.org/ontology/") + >>> dg = Graph() + >>> og = Graph() + >>> # Use an ontology graph in review that includes only a single class and a single property excerpted from UCO, but also a single custom property. + >>> _ = og.add((NS_UCO_CORE.UcoObject, NS_RDF.type, NS_OWL.Class)) + >>> _ = og.add((NS_UCO_CORE.name, NS_RDF.type, NS_OWL.DatatypeProperty)) + >>> _ = og.add((ns_ex.ourCustomProperty, NS_RDF.type, NS_OWL.DatatypeProperty)) + >>> # Define an individual. + >>> n_uco_object = ns_kb["UcoObject-f494d239-d9fd-48da-bc07-461ba86d8c6c"] + >>> n_uco_object + rdflib.term.URIRef('http://example.org/kb/UcoObject-f494d239-d9fd-48da-bc07-461ba86d8c6c') + >>> # Review a data graph that includes only the single individual, class typo'd (capitalized incorrectly), but property OK. + >>> _ = dg.add((n_uco_object, NS_RDF.type, NS_UCO_CORE.UCOObject)) + >>> _ = dg.add((n_uco_object, NS_UCO_CORE.name, Literal("Test"))) + >>> _ = dg.add((n_uco_object, ns_ex.customProperty, Literal("Custom Value"))) + >>> invalid_cdo_concepts = get_invalid_cdo_concepts(dg, og) + >>> invalid_cdo_concepts + {rdflib.term.URIRef('https://ontology.unifiedcyberontology.org/uco/core/UCOObject')} + >>> # Note that the property "ourCustomProperty" was typo'd in the data graph, but this was not reported. + >>> assert ns_ex.ourCustomProperty not in invalid_cdo_concepts + """ + # Construct set of CDO concepts for data graph concept-existence review. + cdo_concepts: Set[rdflib.URIRef] = set() + + for n_structural_class in [ + NS_OWL.Class, + NS_OWL.AnnotationProperty, + NS_OWL.DatatypeProperty, + NS_OWL.ObjectProperty, + NS_RDFS.Datatype, + NS_SH.NodeShape, + NS_SH.PropertyShape, + NS_SH.Shape, + ]: + for ontology_triple in ontology_graph.triples( + (None, NS_RDF.type, n_structural_class) + ): + if not isinstance(ontology_triple[0], rdflib.URIRef): + continue + if concept_is_cdo_concept(ontology_triple[0]): + cdo_concepts.add(ontology_triple[0]) + for n_ontology_predicate in [ + NS_OWL.backwardCompatibleWith, + NS_OWL.imports, + NS_OWL.incompatibleWith, + NS_OWL.priorVersion, + NS_OWL.versionIRI, + ]: + for ontology_triple in ontology_graph.triples( + (None, n_ontology_predicate, None) + ): + assert isinstance(ontology_triple[0], rdflib.URIRef) + assert isinstance(ontology_triple[2], rdflib.URIRef) + cdo_concepts.add(ontology_triple[0]) + cdo_concepts.add(ontology_triple[2]) + for ontology_triple in ontology_graph.triples((None, NS_RDF.type, NS_OWL.Ontology)): + if not isinstance(ontology_triple[0], rdflib.URIRef): + continue + cdo_concepts.add(ontology_triple[0]) + + # Also load historical ontology and version IRIs. + ontology_and_version_iris_data = importlib.resources.read_text( + case_utils.ontology, "ontology_and_version_iris.txt" + ) + for line in ontology_and_version_iris_data.split("\n"): + cleaned_line = line.strip() + if cleaned_line == "": + continue + cdo_concepts.add(rdflib.URIRef(cleaned_line)) + + data_cdo_concepts: Set[rdflib.URIRef] = set() + for data_triple in data_graph.triples((None, None, None)): + for data_triple_member in data_triple: + if isinstance(data_triple_member, rdflib.URIRef): + if concept_is_cdo_concept(data_triple_member): + data_cdo_concepts.add(data_triple_member) + elif isinstance(data_triple_member, rdflib.Literal): + if isinstance(data_triple_member.datatype, rdflib.URIRef): + if concept_is_cdo_concept(data_triple_member.datatype): + data_cdo_concepts.add(data_triple_member.datatype) + + return data_cdo_concepts - cdo_concepts + + def main() -> None: parser = argparse.ArgumentParser( description="CASE wrapper to pySHACL command line tool." @@ -181,67 +279,9 @@ def main() -> None: _logger.debug("arg_ontology_graph = %r.", arg_ontology_graph) ontology_graph.parse(arg_ontology_graph) - # Construct set of CDO concepts for data graph concept-existence review. - cdo_concepts: Set[rdflib.URIRef] = set() - - for n_structural_class in [ - NS_OWL.Class, - NS_OWL.AnnotationProperty, - NS_OWL.DatatypeProperty, - NS_OWL.ObjectProperty, - NS_RDFS.Datatype, - NS_SH.NodeShape, - NS_SH.PropertyShape, - NS_SH.Shape, - ]: - for ontology_triple in ontology_graph.triples( - (None, NS_RDF.type, n_structural_class) - ): - if not isinstance(ontology_triple[0], rdflib.URIRef): - continue - if concept_is_cdo_concept(ontology_triple[0]): - cdo_concepts.add(ontology_triple[0]) - for n_ontology_predicate in [ - NS_OWL.backwardCompatibleWith, - NS_OWL.imports, - NS_OWL.incompatibleWith, - NS_OWL.priorVersion, - NS_OWL.versionIRI, - ]: - for ontology_triple in ontology_graph.triples( - (None, n_ontology_predicate, None) - ): - assert isinstance(ontology_triple[0], rdflib.URIRef) - assert isinstance(ontology_triple[2], rdflib.URIRef) - cdo_concepts.add(ontology_triple[0]) - cdo_concepts.add(ontology_triple[2]) - for ontology_triple in ontology_graph.triples((None, NS_RDF.type, NS_OWL.Ontology)): - if not isinstance(ontology_triple[0], rdflib.URIRef): - continue - cdo_concepts.add(ontology_triple[0]) - - # Also load historical ontology and version IRIs. - ontology_and_version_iris_data = importlib.resources.read_text( - case_utils.ontology, "ontology_and_version_iris.txt" - ) - for line in ontology_and_version_iris_data.split("\n"): - cleaned_line = line.strip() - if cleaned_line == "": - continue - cdo_concepts.add(rdflib.URIRef(cleaned_line)) - - data_cdo_concepts: Set[rdflib.URIRef] = set() - for data_triple in data_graph.triples((None, None, None)): - for data_triple_member in data_triple: - if isinstance(data_triple_member, rdflib.URIRef): - if concept_is_cdo_concept(data_triple_member): - data_cdo_concepts.add(data_triple_member) - elif isinstance(data_triple_member, rdflib.Literal): - if isinstance(data_triple_member.datatype, rdflib.URIRef): - if concept_is_cdo_concept(data_triple_member.datatype): - data_cdo_concepts.add(data_triple_member.datatype) + # Get the list of undefined CDO concepts in the graph + undefined_cdo_concepts = get_invalid_cdo_concepts(data_graph, ontology_graph) - undefined_cdo_concepts = data_cdo_concepts - cdo_concepts for undefined_cdo_concept in sorted(undefined_cdo_concepts): warnings.warn(undefined_cdo_concept, NonExistentCDOConceptWarning) undefined_cdo_concepts_message = (