diff --git a/src/pyff/builtins.py b/src/pyff/builtins.py index 9c560956..dca81346 100644 --- a/src/pyff/builtins.py +++ b/src/pyff/builtins.py @@ -686,7 +686,7 @@ def load(req: Plumbing.Request, *opts): if elt == "as": child_opts.alias = value elif elt == "verify": - child_opts.verify = value + child_opts.verify = value.split(",") elif elt == "via": child_opts.via.append(PipelineCallback(value, req, store=req.md.store)) elif elt == "cleanup": @@ -698,7 +698,7 @@ def load(req: Plumbing.Request, *opts): "Usage: load resource [as url] [[verify] verification] [via pipeline]* [cleanup pipeline]*" ) else: - child_opts.verify = elt + child_opts.verify = elt.split(",") # override anything in child_opts with what is in opts child_opts = child_opts.model_copy(update=_opts) diff --git a/src/pyff/resource.py b/src/pyff/resource.py index 4d0d1177..4ab07a01 100644 --- a/src/pyff/resource.py +++ b/src/pyff/resource.py @@ -13,7 +13,7 @@ from datetime import datetime from enum import Enum from threading import Condition, Lock -from typing import TYPE_CHECKING, Any, Callable +from typing import TYPE_CHECKING, Any, Callable, Optional from urllib.parse import quote as urlescape import requests @@ -159,8 +159,8 @@ def i_handle(self, t: Resource, url=None, response=None, exception=None, last_fe class ResourceOpts(BaseModel): alias: str | None = Field(None, alias='as') # TODO: Rename to 'name'? - # a certificate (file) or a SHA1 fingerprint to use for signature verification - verify: str | None = None + # a list of certificate (file(s)) or a SHA1 fingerprint(s) to use for signature verification + verify: Optional[list[str]] = None # TODO: move classes to make the correct typing work: Iterable[Union[Lambda, PipelineCallback]] = Field([]) via: list[Callable] = Field([]) # A list of callbacks that can be used to pre-process parsed metadata before validation. Use as a clue-bat. diff --git a/src/pyff/samlmd.py b/src/pyff/samlmd.py index a0047a6c..d6a4e483 100644 --- a/src/pyff/samlmd.py +++ b/src/pyff/samlmd.py @@ -303,23 +303,20 @@ def parse(self, resource: Resource, content: str) -> EidasMDParserInfo: if location: certs = CertDict(ml) fingerprints = list(certs.keys()) - fp = None - if len(fingerprints) > 0: - fp = fingerprints[0] ep = ml.find("{{{}}}Endpoint".format(NS['ser'])) - if ep is not None and fp is not None: + if ep is not None and len(fingerprints) != 0: args = dict( country_code=mdl.get('Territory'), hide_from_discovery=str2bool(ep.get('HideFromDiscovery', 'false')), ) log.debug( "MDSL[{}]: {} verified by {} for country {}".format( - info.scheme_territory, location, fp, args.get('country_code') + info.scheme_territory, location, fingerprints, args.get('country_code') ) ) child_opts = resource.opts.model_copy(update={'alias': None}, deep=True) - child_opts.verify = fp + child_opts.verify = fingerprints r = resource.add_child(location, child_opts) # this is specific post-processing for MDSL files diff --git a/src/pyff/utils.py b/src/pyff/utils.py index 3df96e65..a43eef08 100644 --- a/src/pyff/utils.py +++ b/src/pyff/utils.py @@ -263,16 +263,27 @@ def redis(): return thread_data.redis -def check_signature(t: ElementTree, key: Optional[str], only_one_signature: bool = False) -> ElementTree: - if key is not None: - log.debug(f"verifying signature using {key}") - refs = xmlsec.verified(t, key, drop_signature=True) - if only_one_signature and len(refs) != 1: - raise MetadataException("XML metadata contains %d signatures - exactly 1 is required" % len(refs)) - t = refs[0] # prevent wrapping attacks +def check_signature(t: ElementTree, keys: Optional[list[str]] = None, only_one_signature: bool = False) -> ElementTree: + if keys: + refs = [] + for key in keys: + log.debug(f"verifying signature using {key}") + try: + refs = refs + xmlsec.verified(t, key, drop_signature=True) + except xmlsec.exceptions.XMLSigException: + continue - return t + if not refs: + raise MetadataException("No valid signature(s) found") + else: + if only_one_signature and len(refs) != 1: + raise MetadataException("XML metadata contains %d signatures - exactly 1 is required" % len(refs)) + # Make sure to only return one tree: + # - prevent wrapping attacks + # - pyff.samlmd.parse_saml_metadata doesn't handle when multiple trees are returned + t = refs[0] + return t def validate_document(t): schema().assertValid(t)