Skip to content

Commit

Permalink
Merge pull request #2436 from lukpueh/add-dsse
Browse files Browse the repository at this point in the history
Add basic DSSE equivalent for Metadata API and configurable DSSE support in ngclient
  • Loading branch information
lukpueh authored Feb 22, 2024
2 parents 3077932 + 4005e76 commit 52fa73a
Show file tree
Hide file tree
Showing 10 changed files with 2,353 additions and 1,929 deletions.
5 changes: 4 additions & 1 deletion examples/uploader/_localrepo.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,10 @@ def open(self, role: str) -> Metadata:
# if there is a metadata version fetched from remote, use that
# HACK: access Updater internals
if role in self.updater._trusted_set:
return copy.deepcopy(self.updater._trusted_set[role])
# NOTE: The original signature wrapper (Metadata) was verified and
# discarded upon inclusion in the trusted set. It is safe to use
# a fresh wrapper. `close` will override existing signatures anyway.
return Metadata(copy.deepcopy(self.updater._trusted_set[role]))

# otherwise we're creating metadata from scratch
md = Metadata(Targets())
Expand Down
91 changes: 91 additions & 0 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import unittest
from copy import copy, deepcopy
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any, ClassVar, Dict, Optional

from securesystemslib import exceptions as sslib_exceptions
Expand All @@ -33,6 +34,7 @@

from tests import utils
from tuf.api import exceptions
from tuf.api.dsse import SimpleEnvelope
from tuf.api.metadata import (
TOP_LEVEL_ROLE_NAMES,
DelegatedRole,
Expand Down Expand Up @@ -1144,6 +1146,95 @@ def test_delegations_get_delegated_role(self) -> None:
)


class TestSimpleEnvelope(unittest.TestCase):
"""Tests for public API in 'tuf/api/dsse.py'."""

@classmethod
def setUpClass(cls) -> None:
repo_data_dir = Path(utils.TESTS_DIR) / "repository_data"
cls.metadata_dir = repo_data_dir / "repository" / "metadata"
cls.signer_store = {}
for role in [Snapshot, Targets, Timestamp]:
key_path = repo_data_dir / "keystore" / f"{role.type}_key"
key = import_ed25519_privatekey_from_file(
str(key_path),
password="password",
)
cls.signer_store[role.type] = SSlibSigner(key)

def test_serialization(self) -> None:
"""Basic de/serialization test.
1. Load test metadata for each role
2. Wrap metadata payloads in envelope serializing the payload
3. Serialize envelope
4. De-serialize envelope
5. De-serialize payload
"""
for role in [Root, Timestamp, Snapshot, Targets]:
metadata_path = self.metadata_dir / f"{role.type}.json"
metadata = Metadata.from_file(str(metadata_path))
self.assertIsInstance(metadata.signed, role)

envelope = SimpleEnvelope.from_signed(metadata.signed)
envelope_bytes = envelope.to_bytes()

envelope2 = SimpleEnvelope.from_bytes(envelope_bytes)
payload = envelope2.get_signed()
self.assertEqual(metadata.signed, payload)

def test_fail_envelope_serialization(self) -> None:
envelope = SimpleEnvelope(b"foo", "bar", ["baz"])
with self.assertRaises(SerializationError):
envelope.to_bytes()

def test_fail_envelope_deserialization(self) -> None:
with self.assertRaises(DeserializationError):
SimpleEnvelope.from_bytes(b"[")

def test_fail_payload_serialization(self) -> None:
with self.assertRaises(SerializationError):
SimpleEnvelope.from_signed("foo") # type: ignore

def test_fail_payload_deserialization(self) -> None:
payloads = [b"[", b'{"_type": "foo"}']
for payload in payloads:
envelope = SimpleEnvelope(payload, "bar", [])
with self.assertRaises(DeserializationError):
envelope.get_signed()

def test_verify_delegate(self) -> None:
"""Basic verification test.
1. Load test metadata for each role
2. Wrap non-root payloads in envelope serializing the payload
3. Sign with correct delegated key
4. Verify delegate with root
"""
root_path = self.metadata_dir / "root.json"
root = Metadata[Root].from_file(str(root_path)).signed

for role in [Timestamp, Snapshot, Targets]:
metadata_path = self.metadata_dir / f"{role.type}.json"
metadata = Metadata.from_file(str(metadata_path))
self.assertIsInstance(metadata.signed, role)

signer = self.signer_store[role.type]
self.assertIn(
signer.key_dict["keyid"], root.roles[role.type].keyids
)

envelope = SimpleEnvelope.from_signed(metadata.signed)
envelope.sign(signer)
self.assertTrue(len(envelope.signatures) == 1)

root.verify_delegate(
role.type, envelope.pae(), envelope.signatures_dict
)


# Run unit test.
if __name__ == "__main__":
utils.configure_test_logging(sys.argv)
Expand Down
117 changes: 93 additions & 24 deletions tests/test_trusted_metadata_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,22 @@

from tests import utils
from tuf.api import exceptions
from tuf.api.dsse import SimpleEnvelope
from tuf.api.metadata import (
Metadata,
MetaFile,
Root,
Signed,
Snapshot,
Targets,
Timestamp,
)
from tuf.api.serialization.json import JSONSerializer
from tuf.ngclient._internal.trusted_metadata_set import TrustedMetadataSet
from tuf.ngclient._internal.trusted_metadata_set import (
TrustedMetadataSet,
_load_from_simple_envelope,
)
from tuf.ngclient.config import EnvelopeType

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -93,7 +99,9 @@ def hashes_length_modifier(timestamp: Timestamp) -> None:
)

def setUp(self) -> None:
self.trusted_set = TrustedMetadataSet(self.metadata[Root.type])
self.trusted_set = TrustedMetadataSet(
self.metadata[Root.type], EnvelopeType.METADATA
)

def _update_all_besides_targets(
self,
Expand Down Expand Up @@ -132,7 +140,7 @@ def test_update(self) -> None:

count = 0
for md in self.trusted_set:
self.assertIsInstance(md, Metadata)
self.assertIsInstance(md, Signed)
count += 1

self.assertTrue(count, 6)
Expand All @@ -149,11 +157,11 @@ def test_update_metadata_output(self) -> None:
delegeted_targets_2 = self.trusted_set.update_delegated_targets(
self.metadata["role2"], "role2", "role1"
)
self.assertIsInstance(timestamp.signed, Timestamp)
self.assertIsInstance(snapshot.signed, Snapshot)
self.assertIsInstance(targets.signed, Targets)
self.assertIsInstance(delegeted_targets_1.signed, Targets)
self.assertIsInstance(delegeted_targets_2.signed, Targets)
self.assertIsInstance(timestamp, Timestamp)
self.assertIsInstance(snapshot, Snapshot)
self.assertIsInstance(targets, Targets)
self.assertIsInstance(delegeted_targets_1, Targets)
self.assertIsInstance(delegeted_targets_2, Targets)

def test_out_of_order_ops(self) -> None:
# Update snapshot before timestamp
Expand Down Expand Up @@ -192,25 +200,40 @@ def test_out_of_order_ops(self) -> None:
self.metadata["role1"], "role1", Targets.type
)

def test_root_with_invalid_json(self) -> None:
# Test loading initial root and root update
for test_func in [TrustedMetadataSet, self.trusted_set.update_root]:
# root is not json
with self.assertRaises(exceptions.RepositoryError):
test_func(b"")
def test_bad_initial_root(self) -> None:
# root is not json
with self.assertRaises(exceptions.RepositoryError):
TrustedMetadataSet(b"", EnvelopeType.METADATA)

# root is invalid
root = Metadata.from_bytes(self.metadata[Root.type])
root.signed.version += 1
with self.assertRaises(exceptions.UnsignedMetadataError):
test_func(root.to_bytes())
# root is invalid
root = Metadata.from_bytes(self.metadata[Root.type])
root.signed.version += 1
with self.assertRaises(exceptions.UnsignedMetadataError):
TrustedMetadataSet(root.to_bytes(), EnvelopeType.METADATA)

# metadata is of wrong type
with self.assertRaises(exceptions.RepositoryError):
test_func(self.metadata[Snapshot.type])
# metadata is of wrong type
with self.assertRaises(exceptions.RepositoryError):
TrustedMetadataSet(
self.metadata[Snapshot.type], EnvelopeType.METADATA
)

def test_bad_root_update(self) -> None:
# root is not json
with self.assertRaises(exceptions.RepositoryError):
self.trusted_set.update_root(b"")

# root is invalid
root = Metadata.from_bytes(self.metadata[Root.type])
root.signed.version += 1
with self.assertRaises(exceptions.UnsignedMetadataError):
self.trusted_set.update_root(root.to_bytes())

# metadata is of wrong type
with self.assertRaises(exceptions.RepositoryError):
self.trusted_set.update_root(self.metadata[Snapshot.type])

def test_top_level_md_with_invalid_json(self) -> None:
top_level_md: List[Tuple[bytes, Callable[[bytes], Metadata]]] = [
top_level_md: List[Tuple[bytes, Callable[[bytes], Signed]]] = [
(self.metadata[Timestamp.type], self.trusted_set.update_timestamp),
(self.metadata[Snapshot.type], self.trusted_set.update_snapshot),
(self.metadata[Targets.type], self.trusted_set.update_targets),
Expand Down Expand Up @@ -260,7 +283,7 @@ def root_expired_modifier(root: Root) -> None:

# intermediate root can be expired
root = self.modify_metadata(Root.type, root_expired_modifier)
tmp_trusted_set = TrustedMetadataSet(root)
tmp_trusted_set = TrustedMetadataSet(root, EnvelopeType.METADATA)
# update timestamp to trigger final root expiry check
with self.assertRaises(exceptions.ExpiredMetadataError):
tmp_trusted_set.update_timestamp(self.metadata[Timestamp.type])
Expand Down Expand Up @@ -471,6 +494,52 @@ def target_expired_modifier(target: Targets) -> None:

# TODO test updating over initial metadata (new keys, newer timestamp, etc)

def test_load_from_simple_envelope(self) -> None:
"""Basic unit test for ``_load_from_simple_envelope`` helper.
TODO: Test via trusted metadata set tests like for traditional metadata
"""
metadata = Metadata.from_bytes(self.metadata[Root.type])
root = metadata.signed
envelope = SimpleEnvelope.from_signed(root)

# Unwrap unsigned envelope without verification
envelope_bytes = envelope.to_bytes()
payload_obj, signed_bytes, signatures = _load_from_simple_envelope(
Root, envelope_bytes
)

self.assertEqual(payload_obj, root)
self.assertEqual(signed_bytes, envelope.pae())
self.assertDictEqual(signatures, {})

# Unwrap correctly signed envelope (use default role name)
sig = envelope.sign(self.keystore[Root.type])
envelope_bytes = envelope.to_bytes()
_, _, signatures = _load_from_simple_envelope(
Root, envelope_bytes, root
)
self.assertDictEqual(signatures, {sig.keyid: sig})

# Load correctly signed envelope (with explicit role name)
_, _, signatures = _load_from_simple_envelope(
Root, envelope.to_bytes(), root, Root.type
)
self.assertDictEqual(signatures, {sig.keyid: sig})

# Fail load envelope with unexpected 'payload_type'
envelope_bad_type = SimpleEnvelope.from_signed(root)
envelope_bad_type.payload_type = "foo"
envelope_bad_type_bytes = envelope_bad_type.to_bytes()
with self.assertRaises(exceptions.RepositoryError):
_load_from_simple_envelope(Root, envelope_bad_type_bytes)

# Fail load envelope with unexpected payload type
envelope_bad_signed = SimpleEnvelope.from_signed(root)
envelope_bad_signed_bytes = envelope_bad_signed.to_bytes()
with self.assertRaises(exceptions.RepositoryError):
_load_from_simple_envelope(Targets, envelope_bad_signed_bytes)


if __name__ == "__main__":
utils.configure_test_logging(sys.argv)
Expand Down
2 changes: 1 addition & 1 deletion tests/test_updater_ng.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ def test_updating_root(self) -> None:
# Bump root version, resign and refresh
self._modify_repository_root(lambda root: None, bump_version=True)
self.updater.refresh()
self.assertEqual(self.updater._trusted_set.root.signed.version, 2)
self.assertEqual(self.updater._trusted_set.root.version, 2)

def test_missing_targetinfo(self) -> None:
self.updater.refresh()
Expand Down
Loading

0 comments on commit 52fa73a

Please sign in to comment.