Skip to content

New API: Add Key and Role classes #1360

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Apr 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 81 additions & 4 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
Metadata,
Snapshot,
Timestamp,
Targets
Targets,
Key,
Role
)

from tuf.api.serialization import (
Expand Down Expand Up @@ -292,6 +294,69 @@ def test_metadata_timestamp(self):
self.assertEqual(timestamp.signed.meta['snapshot.json'], fileinfo)


def test_key_class(self):
keys = {
"59a4df8af818e9ed7abe0764c0b47b4240952aa0d179b5b78346c470ac30278d":{
"keytype": "ed25519",
"keyval": {
"public": "edcd0a32a07dce33f7c7873aaffbff36d20ea30787574ead335eefd337e4dacd"
},
"scheme": "ed25519"
},
}
for key_dict in keys.values():
# Testing that the workflow of deserializing and serializing
# a key dictionary doesn't change the content.
test_key_dict = key_dict.copy()
key_obj = Key.from_dict(test_key_dict)
self.assertEqual(key_dict, key_obj.to_dict())
# Test creating an instance without a required attribute.
for key in key_dict.keys():
test_key_dict = key_dict.copy()
del test_key_dict[key]
with self.assertRaises(KeyError):
Key.from_dict(test_key_dict)
# Test creating a Key instance with wrong keyval format.
key_dict["keyval"] = {}
with self.assertRaises(ValueError):
Key.from_dict(key_dict)


def test_role_class(self):
roles = {
"root": {
"keyids": [
"4e777de0d275f9d28588dd9a1606cc748e548f9e22b6795b7cb3f63f98035fcb"
],
"threshold": 1
},
"snapshot": {
"keyids": [
"59a4df8af818e9ed7abe0764c0b47b4240952aa0d179b5b78346c470ac30278d"
],
"threshold": 1
},
}
for role_dict in roles.values():
# Testing that the workflow of deserializing and serializing
# a role dictionary doesn't change the content.
test_role_dict = role_dict.copy()
role_obj = Role.from_dict(test_role_dict)
self.assertEqual(role_dict, role_obj.to_dict())
# Test creating an instance without a required attribute.
for role_attr in role_dict.keys():
test_role_dict = role_dict.copy()
del test_role_dict[role_attr]
with self.assertRaises(KeyError):
Key.from_dict(test_role_dict)
# Test creating a Role instance with keyid dublicates.
# for keyid in role_dict["keyids"]:
role_dict["keyids"].append(role_dict["keyids"][0])
test_role_dict = role_dict.copy()
with self.assertRaises(ValueError):
Role.from_dict(test_role_dict)


def test_metadata_root(self):
root_path = os.path.join(
self.repo_dir, 'metadata', 'root.json')
Expand All @@ -306,21 +371,26 @@ def test_metadata_root(self):
root_key2['keytype'], root_key2['scheme'], root_key2['keyval'])

# Assert that root does not contain the new key
self.assertNotIn(keyid, root.signed.roles['root']['keyids'])
self.assertNotIn(keyid, root.signed.roles['root'].keyids)
self.assertNotIn(keyid, root.signed.keys)

# Add new root key
root.signed.add_key('root', keyid, key_metadata)

# Assert that key is added
self.assertIn(keyid, root.signed.roles['root']['keyids'])
self.assertIn(keyid, root.signed.roles['root'].keyids)
self.assertIn(keyid, root.signed.keys)

# Try adding the same key again and assert its ignored.
pre_add_keyid = root.signed.roles['root'].keyids.copy()
root.signed.add_key('root', keyid, key_metadata)
self.assertEqual(pre_add_keyid, root.signed.roles['root'].keyids)

# Remove the key
root.signed.remove_key('root', keyid)

# Assert that root does not contain the new key anymore
self.assertNotIn(keyid, root.signed.roles['root']['keyids'])
self.assertNotIn(keyid, root.signed.roles['root'].keyids)
self.assertNotIn(keyid, root.signed.keys)


Expand Down Expand Up @@ -365,6 +435,13 @@ def test_support_for_unrecognized_fields(self):
# Test that the metadata classes store unrecognized fields when
# initializing and passes them when casting the instance to a dict.

# Add unrecognized fields to all metadata sub (helper) classes.
if metadata == "root":
for keyid in dict1["signed"]["keys"].keys():
dict1["signed"]["keys"][keyid]["d"] = "c"
for role_str in dict1["signed"]["roles"].keys():
dict1["signed"]["roles"][role_str]["e"] = "g"
Comment on lines +438 to +443
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks like it will become hard to maintain when more tests are added but I'm fine with it for now: can you make an issue about doing this in a more structured way (like automate the dictionary poisoning so that ever dictionary is injected, not just ones we list here)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are right. We should automate this.
In a discussion, you showed a prototype automating this process.
Will appreciate it if you propose these changes and we can discuss them, but I don't think this is in the scope of this pr.


temp_copy = copy.deepcopy(dict1)
metadata_obj = Metadata.from_dict(temp_copy)

Expand Down
140 changes: 113 additions & 27 deletions tuf/api/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,97 @@ def bump_version(self) -> None:
self.version += 1


class Key:
"""A container class representing the public portion of a Key.

Attributes:
keytype: A string denoting a public key signature system,
such as "rsa", "ed25519", and "ecdsa-sha2-nistp256".
scheme: A string denoting a corresponding signature scheme. For example:
"rsassa-pss-sha256", "ed25519", and "ecdsa-sha2-nistp256".
keyval: A dictionary containing the public portion of the key.
unrecognized_fields: Dictionary of all unrecognized fields.

"""

def __init__(
self,
keytype: str,
scheme: str,
keyval: Mapping[str, str],
unrecognized_fields: Optional[Mapping[str, Any]] = None,
) -> None:
if not keyval.get("public"):
raise ValueError("keyval doesn't follow the specification format!")
self.keytype = keytype
self.scheme = scheme
self.keyval = keyval
self.unrecognized_fields = unrecognized_fields or {}

@classmethod
def from_dict(cls, key_dict: Mapping[str, Any]) -> "Key":
"""Creates Key object from its dict representation."""
keytype = key_dict.pop("keytype")
scheme = key_dict.pop("scheme")
keyval = key_dict.pop("keyval")
# All fields left in the key_dict are unrecognized.
return cls(keytype, scheme, keyval, key_dict)

def to_dict(self) -> Dict:
"""Returns the dictionary representation of self."""
return {
"keytype": self.keytype,
"scheme": self.scheme,
"keyval": self.keyval,
**self.unrecognized_fields,
}


class Role:
"""A container class containing the set of keyids and threshold associated
with a particular role.

Attributes:
keyids: A set of strings each of which represents a given key.
threshold: An integer representing the required number of keys for that
particular role.
unrecognized_fields: Dictionary of all unrecognized fields.

"""

def __init__(
self,
keyids: list,
threshold: int,
unrecognized_fields: Optional[Mapping[str, Any]] = None,
) -> None:
keyids_set = set(keyids)
if len(keyids_set) != len(keyids):
raise ValueError(
f"keyids should be a list of unique strings,"
f" instead got {keyids}"
)
self.keyids = keyids_set
self.threshold = threshold
self.unrecognized_fields = unrecognized_fields or {}

@classmethod
def from_dict(cls, role_dict: Mapping[str, Any]) -> "Role":
"""Creates Role object from its dict representation."""
keyids = role_dict.pop("keyids")
threshold = role_dict.pop("threshold")
# All fields left in the role_dict are unrecognized.
return cls(keyids, threshold, role_dict)

def to_dict(self) -> Dict:
"""Returns the dictionary representation of self."""
return {
"keyids": list(self.keyids),
"threshold": self.threshold,
**self.unrecognized_fields,
}


class Root(Signed):
"""A container for the signed part of root metadata.

Expand All @@ -414,27 +505,13 @@ class Root(Signed):
keys: A dictionary that contains a public key store used to verify
top level roles metadata signatures::
{
'<KEYID>': {
'keytype': '<KEY TYPE>',
'scheme': '<KEY SCHEME>',
'keyid_hash_algorithms': [
'<HASH ALGO 1>',
'<HASH ALGO 2>'
...
],
'keyval': {
'public': '<PUBLIC KEY HEX REPRESENTATION>'
}
},
'<KEYID>': <Key instance>,
...
},
roles: A dictionary that contains a list of signing keyids and
a signature threshold for each top level role::
{
'<ROLE>': {
'keyids': ['<SIGNING KEY KEYID>', ...],
'threshold': <SIGNATURE THRESHOLD>,
},
'<ROLE>': <Role istance>,
...
}

Expand All @@ -451,14 +528,13 @@ def __init__(
spec_version: str,
expires: datetime,
consistent_snapshot: bool,
keys: Mapping[str, Any],
roles: Mapping[str, Any],
keys: Mapping[str, Key],
roles: Mapping[str, Role],
unrecognized_fields: Optional[Mapping[str, Any]] = None,
) -> None:
super().__init__(
_type, version, spec_version, expires, unrecognized_fields
)
# TODO: Add classes for keys and roles
self.consistent_snapshot = consistent_snapshot
self.keys = keys
self.roles = roles
Expand All @@ -470,17 +546,28 @@ def from_dict(cls, root_dict: Mapping[str, Any]) -> "Root":
consistent_snapshot = root_dict.pop("consistent_snapshot")
keys = root_dict.pop("keys")
roles = root_dict.pop("roles")

for keyid, key_dict in keys.items():
keys[keyid] = Key.from_dict(key_dict)
for role_name, role_dict in roles.items():
roles[role_name] = Role.from_dict(role_dict)

# All fields left in the root_dict are unrecognized.
return cls(*common_args, consistent_snapshot, keys, roles, root_dict)

def to_dict(self) -> Dict[str, Any]:
"""Returns the dict representation of self."""
root_dict = self._common_fields_to_dict()
keys = {keyid: key.to_dict() for (keyid, key) in self.keys.items()}
roles = {}
for role_name, role in self.roles.items():
roles[role_name] = role.to_dict()

root_dict.update(
{
"consistent_snapshot": self.consistent_snapshot,
"keys": self.keys,
"roles": self.roles,
"keys": keys,
"roles": roles,
}
)
return root_dict
Expand All @@ -490,17 +577,16 @@ def add_key(
self, role: str, keyid: str, key_metadata: Mapping[str, Any]
) -> None:
"""Adds new key for 'role' and updates the key store."""
if keyid not in self.roles[role]["keyids"]:
self.roles[role]["keyids"].append(keyid)
self.keys[keyid] = key_metadata
self.roles[role].keyids.add(keyid)
self.keys[keyid] = key_metadata

# Remove key for a role.
def remove_key(self, role: str, keyid: str) -> None:
"""Removes key for 'role' and updates the key store."""
if keyid in self.roles[role]["keyids"]:
self.roles[role]["keyids"].remove(keyid)
if keyid in self.roles[role].keyids:
self.roles[role].keyids.remove(keyid)
for keyinfo in self.roles.values():
if keyid in keyinfo["keyids"]:
if keyid in keyinfo.keyids:
return

del self.keys[keyid]
Expand Down