diff --git a/tests/unit/accounts/test_core.py b/tests/unit/accounts/test_core.py index 18bec5f54009..ffcf4125a724 100644 --- a/tests/unit/accounts/test_core.py +++ b/tests/unit/accounts/test_core.py @@ -390,14 +390,8 @@ def test_unauthenticated_userid(self): def test_includeme(monkeypatch): - authz_obj = pretend.stub() - authz_cls = pretend.call_recorder(lambda *a, **kw: authz_obj) - monkeypatch.setattr(accounts, "ACLAuthorizationPolicy", authz_cls) - monkeypatch.setattr(accounts, "MacaroonAuthorizationPolicy", authz_cls) - monkeypatch.setattr(accounts, "TwoFactorAuthorizationPolicy", authz_cls) - multi_policy_obj = pretend.stub() - multi_policy_cls = pretend.call_recorder(lambda ps, authz: multi_policy_obj) + multi_policy_cls = pretend.call_recorder(lambda ps: multi_policy_obj) monkeypatch.setattr(accounts, "MultiSecurityPolicy", multi_policy_cls) session_policy_obj = pretend.stub() @@ -474,6 +468,10 @@ def test_includeme(monkeypatch): assert config.set_security_policy.calls == [pretend.call(multi_policy_obj)] assert multi_policy_cls.calls == [ pretend.call( - [session_policy_obj, basic_policy_obj, macaroon_policy_obj], authz_obj + [ + session_policy_obj, + basic_policy_obj, + macaroon_policy_obj, + ] ) ] diff --git a/tests/unit/accounts/test_models.py b/tests/unit/accounts/test_models.py index 5223818539a8..515a49048ed2 100644 --- a/tests/unit/accounts/test_models.py +++ b/tests/unit/accounts/test_models.py @@ -11,10 +11,14 @@ # limitations under the License. import datetime +import uuid import pytest +from pyramid.authorization import Authenticated + from warehouse.accounts.models import Email, RecoveryCode, User, UserFactory +from warehouse.utils.security_policy import principals_for from ...common.db.accounts import ( EmailFactory as DBEmailFactory, @@ -162,3 +166,62 @@ def test_acl(self, db_session): ("Allow", "group:admins", "admin"), ("Allow", "group:moderators", "moderator"), ] + + @pytest.mark.parametrize( + ( + "is_superuser", + "is_moderator", + "is_psf_staff", + "expected", + ), + [ + (False, False, False, []), + ( + True, + False, + False, + ["group:admins", "group:moderators", "group:psf_staff"], + ), + ( + False, + True, + False, + ["group:moderators"], + ), + ( + True, + True, + False, + ["group:admins", "group:moderators", "group:psf_staff"], + ), + ( + False, + False, + True, + ["group:psf_staff"], + ), + ( + False, + True, + True, + ["group:moderators", "group:psf_staff"], + ), + ], + ) + def test_principals( + self, + is_superuser, + is_moderator, + is_psf_staff, + expected, + ): + user = User( + id=uuid.uuid4(), + is_superuser=is_superuser, + is_moderator=is_moderator, + is_psf_staff=is_psf_staff, + ) + + expected = expected[:] + [f"user:{user.id}", Authenticated] + + assert set(principals_for(user)) == set(expected) diff --git a/tests/unit/accounts/test_security_policy.py b/tests/unit/accounts/test_security_policy.py index 969a33c726e8..3bbd14ba85dd 100644 --- a/tests/unit/accounts/test_security_policy.py +++ b/tests/unit/accounts/test_security_policy.py @@ -14,17 +14,14 @@ import pretend import pytest -from pyramid.interfaces import IAuthorizationPolicy, ISecurityPolicy -from pyramid.security import Allowed, Denied +from pyramid.authorization import Allow +from pyramid.interfaces import ISecurityPolicy from zope.interface.verify import verifyClass from warehouse.accounts import security_policy from warehouse.accounts.interfaces import IUserService -from warehouse.errors import WarehouseDenied from warehouse.utils.security_policy import AuthenticationMethod -from ...common.db.packaging import ProjectFactory - class TestBasicAuthSecurityPolicy: def test_verify(self): @@ -37,8 +34,6 @@ def test_noops(self): policy = security_policy.BasicAuthSecurityPolicy() with pytest.raises(NotImplementedError): policy.authenticated_userid(pretend.stub()) - with pytest.raises(NotImplementedError): - policy.permits(pretend.stub(), pretend.stub(), pretend.stub()) def test_forget_and_remember(self): policy = security_policy.BasicAuthSecurityPolicy() @@ -221,8 +216,6 @@ def test_noops(self): policy = security_policy.SessionSecurityPolicy() with pytest.raises(NotImplementedError): policy.authenticated_userid(pretend.stub()) - with pytest.raises(NotImplementedError): - policy.permits(pretend.stub(), pretend.stub(), pretend.stub()) def test_forget_and_remember(self, monkeypatch): request = pretend.stub() @@ -510,205 +503,134 @@ def test_identity_ip_banned(self, monkeypatch): assert request.add_response_callback.calls == [pretend.call(vary_cb)] -class TestTwoFactorAuthorizationPolicy: - def test_verify(self): - assert verifyClass( - IAuthorizationPolicy, security_policy.TwoFactorAuthorizationPolicy - ) - - def test_permits_no_active_request(self, monkeypatch): - get_current_request = pretend.call_recorder(lambda: None) - monkeypatch.setattr(security_policy, "get_current_request", get_current_request) - - backing_policy = pretend.stub( - permits=pretend.call_recorder(lambda *a, **kw: pretend.stub()) - ) - policy = security_policy.TwoFactorAuthorizationPolicy(policy=backing_policy) - result = policy.permits(pretend.stub(), pretend.stub(), pretend.stub()) - - assert result == WarehouseDenied("") - assert result.s == "There was no active request." - - def test_permits_if_context_is_not_permitted_by_backing_policy(self, monkeypatch): - request = pretend.stub() - get_current_request = pretend.call_recorder(lambda: request) - monkeypatch.setattr(security_policy, "get_current_request", get_current_request) - - permits_result = Denied("Because") - backing_policy = pretend.stub( - permits=pretend.call_recorder(lambda *a, **kw: permits_result) - ) - policy = security_policy.TwoFactorAuthorizationPolicy(policy=backing_policy) - result = policy.permits(pretend.stub(), pretend.stub(), pretend.stub()) - - assert result == permits_result - - def test_permits_if_non_2fa_requireable_context(self, monkeypatch): - request = pretend.stub() - get_current_request = pretend.call_recorder(lambda: request) - monkeypatch.setattr(security_policy, "get_current_request", get_current_request) +@pytest.mark.parametrize( + "policy_class", + [security_policy.BasicAuthSecurityPolicy, security_policy.SessionSecurityPolicy], +) +class TestPermits: + @pytest.mark.parametrize( + "principals,expected", [("user:5", True), ("user:1", False)] + ) + def test_acl(self, monkeypatch, policy_class, principals, expected): + monkeypatch.setattr(security_policy, "User", pretend.stub) - permits_result = Allowed("Because") - backing_policy = pretend.stub( - permits=pretend.call_recorder(lambda *a, **kw: permits_result) - ) - policy = security_policy.TwoFactorAuthorizationPolicy(policy=backing_policy) - result = policy.permits(pretend.stub(), pretend.stub(), pretend.stub()) + request = pretend.stub(identity=pretend.stub(__principals__=lambda: principals)) + context = pretend.stub(__acl__=[(Allow, "user:5", "myperm")]) - assert result == permits_result + policy = policy_class() + assert bool(policy.permits(request, context, "myperm")) == expected - def test_permits_if_context_does_not_require_2fa(self, monkeypatch, db_request): - db_request.user = pretend.stub() - db_request.registry.settings = { - "warehouse.two_factor_mandate.enabled": True, - "warehouse.two_factor_mandate.available": True, - "warehouse.two_factor_requirement.enabled": True, - } - get_current_request = pretend.call_recorder(lambda: db_request) - monkeypatch.setattr(security_policy, "get_current_request", get_current_request) + @pytest.mark.parametrize( + "mfa_required,has_mfa,expected", + [ + (True, True, True), + (False, True, True), + (True, False, False), + (False, False, True), + ], + ) + def test_2fa_owner_requires( + self, monkeypatch, policy_class, mfa_required, has_mfa, expected + ): + monkeypatch.setattr(security_policy, "User", pretend.stub) + monkeypatch.setattr(security_policy, "TwoFactorRequireable", pretend.stub) - permits_result = Allowed("Because") - backing_policy = pretend.stub( - permits=pretend.call_recorder(lambda *a, **kw: permits_result) + request = pretend.stub( + identity=pretend.stub( + __principals__=lambda: ["user:5"], has_two_factor=has_mfa + ), + registry=pretend.stub( + settings={ + "warehouse.two_factor_requirement.enabled": True, + "warehouse.two_factor_mandate.enabled": False, + "warehouse.two_factor_mandate.available": False, + } + ), ) - policy = security_policy.TwoFactorAuthorizationPolicy(policy=backing_policy) - context = ProjectFactory.create( - owners_require_2fa=False, - pypi_mandates_2fa=False, + context = pretend.stub( + __acl__=[(Allow, "user:5", "myperm")], owners_require_2fa=mfa_required ) - result = policy.permits(context, pretend.stub(), pretend.stub()) - - assert result == permits_result - def test_flashes_if_context_requires_2fa_but_not_enabled( - self, monkeypatch, db_request - ): - db_request.registry.settings = { - "warehouse.two_factor_mandate.enabled": False, - "warehouse.two_factor_mandate.available": True, - "warehouse.two_factor_requirement.enabled": True, - } - db_request.session.flash = pretend.call_recorder(lambda m, queue: None) - db_request.user = pretend.stub(has_two_factor=False) - get_current_request = pretend.call_recorder(lambda: db_request) - monkeypatch.setattr(security_policy, "get_current_request", get_current_request) - - permits_result = Allowed("Because") - backing_policy = pretend.stub( - permits=pretend.call_recorder(lambda *a, **kw: permits_result) - ) - policy = security_policy.TwoFactorAuthorizationPolicy(policy=backing_policy) - context = ProjectFactory.create( - owners_require_2fa=False, - pypi_mandates_2fa=True, - ) - result = policy.permits(context, pretend.stub(), pretend.stub()) - - assert result == permits_result - assert db_request.session.flash.calls == [ - pretend.call( - "This project is included in PyPI's two-factor mandate " - "for critical projects. In the future, you will be unable to " - "perform this action without enabling 2FA for your account", - queue="warning", - ), - ] + policy = policy_class() + assert bool(policy.permits(request, context, "myperm")) == expected - @pytest.mark.parametrize("owners_require_2fa", [True, False]) - @pytest.mark.parametrize("pypi_mandates_2fa", [True, False]) - @pytest.mark.parametrize("two_factor_requirement_enabled", [True, False]) - @pytest.mark.parametrize("two_factor_mandate_available", [True, False]) - @pytest.mark.parametrize("two_factor_mandate_enabled", [True, False]) - def test_permits_if_user_has_2fa( - self, - monkeypatch, - owners_require_2fa, - pypi_mandates_2fa, - two_factor_requirement_enabled, - two_factor_mandate_available, - two_factor_mandate_enabled, - db_request, + @pytest.mark.parametrize( + "mfa_required,has_mfa,expected", + [ + (True, True, True), + (False, True, True), + (True, False, False), + (False, False, True), + ], + ) + def test_2fa_pypi_mandates_2fa( + self, monkeypatch, policy_class, mfa_required, has_mfa, expected ): - db_request.registry.settings = { - "warehouse.two_factor_requirement.enabled": two_factor_requirement_enabled, - "warehouse.two_factor_mandate.available": two_factor_mandate_available, - "warehouse.two_factor_mandate.enabled": two_factor_mandate_enabled, - } - user = pretend.stub(has_two_factor=True) - db_request.user = user - get_current_request = pretend.call_recorder(lambda: db_request) - monkeypatch.setattr(security_policy, "get_current_request", get_current_request) + monkeypatch.setattr(security_policy, "User", pretend.stub) + monkeypatch.setattr(security_policy, "TwoFactorRequireable", pretend.stub) - permits_result = Allowed("Because") - backing_policy = pretend.stub( - permits=pretend.call_recorder(lambda *a, **kw: permits_result) + request = pretend.stub( + identity=pretend.stub( + __principals__=lambda: ["user:5"], has_two_factor=has_mfa + ), + registry=pretend.stub( + settings={ + "warehouse.two_factor_requirement.enabled": False, + "warehouse.two_factor_mandate.enabled": True, + "warehouse.two_factor_mandate.available": False, + } + ), ) - policy = security_policy.TwoFactorAuthorizationPolicy(policy=backing_policy) - context = ProjectFactory.create( - owners_require_2fa=owners_require_2fa, pypi_mandates_2fa=pypi_mandates_2fa + context = pretend.stub( + __acl__=[(Allow, "user:5", "myperm")], pypi_mandates_2fa=mfa_required ) - result = policy.permits(context, pretend.stub(), pretend.stub()) - assert result == permits_result + policy = policy_class() + assert bool(policy.permits(request, context, "myperm")) == expected @pytest.mark.parametrize( - "owners_require_2fa, pypi_mandates_2fa, reason", + "mfa_required,has_mfa,expected", [ - (True, False, "owners_require_2fa"), - (False, True, "pypi_mandates_2fa"), - (True, True, "pypi_mandates_2fa"), + (True, True, True), + (False, True, True), + (True, False, False), + (False, False, True), ], ) - def test_denies_if_2fa_is_required_but_user_doesnt_have_2fa( - self, - monkeypatch, - owners_require_2fa, - pypi_mandates_2fa, - reason, - db_request, + def test_2fa_pypi_mandates_2fa_with_warning( + self, monkeypatch, policy_class, mfa_required, has_mfa, expected ): - db_request.registry.settings = { - "warehouse.two_factor_requirement.enabled": owners_require_2fa, - "warehouse.two_factor_mandate.enabled": pypi_mandates_2fa, - } - user = pretend.stub(has_two_factor=False) - db_request.user = user - get_current_request = pretend.call_recorder(lambda: db_request) - monkeypatch.setattr(security_policy, "get_current_request", get_current_request) - - permits_result = Allowed("Because") - backing_policy = pretend.stub( - permits=pretend.call_recorder(lambda *a, **kw: permits_result) - ) - policy = security_policy.TwoFactorAuthorizationPolicy(policy=backing_policy) - context = ProjectFactory.create( - owners_require_2fa=owners_require_2fa, pypi_mandates_2fa=pypi_mandates_2fa - ) - result = policy.permits(context, pretend.stub(), pretend.stub()) - - summary = { - "owners_require_2fa": ( - "This project requires two factor authentication to be enabled " - "for all contributors.", + monkeypatch.setattr(security_policy, "User", pretend.stub) + monkeypatch.setattr(security_policy, "TwoFactorRequireable", pretend.stub) + + request = pretend.stub( + identity=pretend.stub( + __principals__=lambda: ["user:5"], has_two_factor=has_mfa ), - "pypi_mandates_2fa": ( - "PyPI requires two factor authentication to be enabled " - "for all contributors to this project.", + registry=pretend.stub( + settings={ + "warehouse.two_factor_requirement.enabled": False, + "warehouse.two_factor_mandate.enabled": False, + "warehouse.two_factor_mandate.available": True, + } ), - }[reason] - - assert result == WarehouseDenied(summary, reason="two_factor_required") - - def test_principals_allowed_by_permission(self): - principals = pretend.stub() - backing_policy = pretend.stub( - principals_allowed_by_permission=pretend.call_recorder( - lambda *a: principals - ) - ) - policy = security_policy.TwoFactorAuthorizationPolicy(policy=backing_policy) - - assert ( - policy.principals_allowed_by_permission(pretend.stub(), pretend.stub()) - is principals - ) + session=pretend.stub(flash=pretend.call_recorder(lambda msg, queue: None)), + ) + context = pretend.stub( + __acl__=[(Allow, "user:5", "myperm")], pypi_mandates_2fa=mfa_required + ) + + policy = policy_class() + assert bool(policy.permits(request, context, "myperm")) + + if not expected: + assert request.session.flash.calls == [ + pretend.call( + "This project is included in PyPI's two-factor mandate " + "for critical projects. In the future, you will be unable to " + "perform this action without enabling 2FA for your account", + queue="warning", + ) + ] + else: + assert request.session.flash.calls == [] diff --git a/tests/unit/macaroons/test_security_policy.py b/tests/unit/macaroons/test_security_policy.py index eb4e0b08e815..eeb06ab20953 100644 --- a/tests/unit/macaroons/test_security_policy.py +++ b/tests/unit/macaroons/test_security_policy.py @@ -14,7 +14,8 @@ import pretend import pytest -from pyramid.interfaces import IAuthorizationPolicy, ISecurityPolicy +from pyramid.authorization import Allow +from pyramid.interfaces import ISecurityPolicy from pyramid.security import Denied from zope.interface.verify import verifyClass @@ -68,8 +69,6 @@ def test_noops(self): policy = security_policy.MacaroonSecurityPolicy() with pytest.raises(NotImplementedError): policy.authenticated_userid(pretend.stub()) - with pytest.raises(NotImplementedError): - policy.permits(pretend.stub(), pretend.stub(), pretend.stub()) def test_forget_and_remember(self): policy = security_policy.MacaroonSecurityPolicy() @@ -208,45 +207,6 @@ def test_identity_oidc_publisher(self, monkeypatch): assert add_vary_cb.calls == [pretend.call("Authorization")] assert request.add_response_callback.calls == [pretend.call(vary_cb)] - -class TestMacaroonAuthorizationPolicy: - def test_verify(self): - assert verifyClass( - IAuthorizationPolicy, security_policy.MacaroonAuthorizationPolicy - ) - - def test_permits_no_active_request(self, monkeypatch): - get_current_request = pretend.call_recorder(lambda: None) - monkeypatch.setattr(security_policy, "get_current_request", get_current_request) - - backing_policy = pretend.stub( - permits=pretend.call_recorder(lambda *a, **kw: pretend.stub()) - ) - policy = security_policy.MacaroonAuthorizationPolicy(policy=backing_policy) - result = policy.permits(pretend.stub(), pretend.stub(), pretend.stub()) - - assert result == Denied("") - assert result.s == "There was no active request." - - def test_permits_no_macaroon(self, monkeypatch): - request = pretend.stub() - get_current_request = pretend.call_recorder(lambda: request) - monkeypatch.setattr(security_policy, "get_current_request", get_current_request) - - _extract_http_macaroon = pretend.call_recorder(lambda r: None) - monkeypatch.setattr( - security_policy, "_extract_http_macaroon", _extract_http_macaroon - ) - - permits = pretend.stub() - backing_policy = pretend.stub( - permits=pretend.call_recorder(lambda *a, **kw: permits) - ) - policy = security_policy.MacaroonAuthorizationPolicy(policy=backing_policy) - result = policy.permits(pretend.stub(), pretend.stub(), pretend.stub()) - - assert result == permits - def test_permits_invalid_macaroon(self, monkeypatch): macaroon_service = pretend.stub( verify=pretend.raiser(InvalidMacaroonError("foo")) @@ -254,91 +214,58 @@ def test_permits_invalid_macaroon(self, monkeypatch): request = pretend.stub( find_service=pretend.call_recorder(lambda interface, **kw: macaroon_service) ) - get_current_request = pretend.call_recorder(lambda: request) - monkeypatch.setattr(security_policy, "get_current_request", get_current_request) - - _extract_http_macaroon = pretend.call_recorder(lambda r: b"not a real macaroon") + _extract_http_macaroon = pretend.call_recorder(lambda r: "not a real macaroon") monkeypatch.setattr( security_policy, "_extract_http_macaroon", _extract_http_macaroon ) - permits = pretend.stub() - backing_policy = pretend.stub( - permits=pretend.call_recorder(lambda *a, **kw: permits) - ) - policy = security_policy.MacaroonAuthorizationPolicy(policy=backing_policy) - result = policy.permits(pretend.stub(), pretend.stub(), pretend.stub()) + policy = security_policy.MacaroonSecurityPolicy() + result = policy.permits(request, pretend.stub(), "upload") assert result == Denied("") assert result.s == "Invalid API Token: foo" - def test_permits_valid_macaroon(self, monkeypatch): + @pytest.mark.parametrize( + "principals,expected", [(["user:5"], True), (["user:1"], False)] + ) + def test_permits_valid_macaroon(self, monkeypatch, principals, expected): macaroon_service = pretend.stub( verify=pretend.call_recorder(lambda *a: pretend.stub()) ) request = pretend.stub( - find_service=pretend.call_recorder(lambda interface, **kw: macaroon_service) + identity=pretend.stub(__principals__=lambda: principals), + find_service=pretend.call_recorder( + lambda interface, **kw: macaroon_service + ), ) - get_current_request = pretend.call_recorder(lambda: request) - monkeypatch.setattr(security_policy, "get_current_request", get_current_request) - - _extract_http_macaroon = pretend.call_recorder(lambda r: b"not a real macaroon") + _extract_http_macaroon = pretend.call_recorder(lambda r: "not a real macaroon") monkeypatch.setattr( security_policy, "_extract_http_macaroon", _extract_http_macaroon ) - permits = pretend.stub() - backing_policy = pretend.stub( - permits=pretend.call_recorder(lambda *a, **kw: permits) - ) - policy = security_policy.MacaroonAuthorizationPolicy(policy=backing_policy) - result = policy.permits(pretend.stub(), pretend.stub(), "upload") + context = pretend.stub(__acl__=[(Allow, "user:5", ["upload"])]) - assert result == permits + policy = security_policy.MacaroonSecurityPolicy() + result = policy.permits(request, context, "upload") + + assert bool(result) == expected @pytest.mark.parametrize( "invalid_permission", - ["admin", "moderator", "manage:user", "manage:project", "nonexistant"], + ["admin", "moderator", "manage:user", "manage:project", "nonexistent"], ) def test_denies_valid_macaroon_for_incorrect_permission( self, monkeypatch, invalid_permission ): - macaroon_service = pretend.stub( - verify=pretend.call_recorder(lambda *a: pretend.stub()) - ) - request = pretend.stub( - find_service=pretend.call_recorder(lambda interface, **kw: macaroon_service) - ) - get_current_request = pretend.call_recorder(lambda: request) - monkeypatch.setattr(security_policy, "get_current_request", get_current_request) - - _extract_http_macaroon = pretend.call_recorder(lambda r: b"not a real macaroon") + _extract_http_macaroon = pretend.call_recorder(lambda r: "not a real macaroon") monkeypatch.setattr( security_policy, "_extract_http_macaroon", _extract_http_macaroon ) - permits = pretend.stub() - backing_policy = pretend.stub( - permits=pretend.call_recorder(lambda *a, **kw: permits) - ) - policy = security_policy.MacaroonAuthorizationPolicy(policy=backing_policy) + policy = security_policy.MacaroonSecurityPolicy() result = policy.permits(pretend.stub(), pretend.stub(), invalid_permission) assert result == Denied("") assert result.s == ( f"API tokens are not valid for permission: {invalid_permission}!" ) - - def test_principals_allowed_by_permission(self): - principals = pretend.stub() - backing_policy = pretend.stub( - principals_allowed_by_permission=pretend.call_recorder( - lambda *a: principals - ) - ) - policy = security_policy.MacaroonAuthorizationPolicy(policy=backing_policy) - - assert ( - policy.principals_allowed_by_permission(pretend.stub(), pretend.stub()) - is principals - ) diff --git a/tests/unit/oidc/test_utils.py b/tests/unit/oidc/test_utils.py index 769ee4ae11bc..496806c68c43 100644 --- a/tests/unit/oidc/test_utils.py +++ b/tests/unit/oidc/test_utils.py @@ -15,8 +15,11 @@ import pretend import pytest +from pyramid.authorization import Authenticated + from tests.common.db.oidc import GitHubPublisherFactory from warehouse.oidc import utils +from warehouse.utils.security_policy import principals_for def test_find_publisher_by_issuer_bad_issuer_url(): @@ -71,3 +74,12 @@ def test_find_publisher_by_issuer_github(db_request, environment, expected_id): ).id == expected_id ) + + +def test_oidc_context_principals(): + assert principals_for( + utils.OIDCContext(publisher=pretend.stub(id=17), claims=None) + ) == [ + Authenticated, + "oidc:17", + ] diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py index c65ddbf6b3a1..a0f01434396b 100644 --- a/tests/unit/test_config.py +++ b/tests/unit/test_config.py @@ -470,13 +470,12 @@ def __init__(self): def test_root_factory_access_control_list(): acl = config.RootFactory.__acl__ - assert len(acl) == 5 - assert acl[0] == (Allow, "group:admins", "admin") - assert acl[1] == (Allow, "group:moderators", "moderator") - assert acl[2] == (Allow, "group:psf_staff", "psf_staff") - assert acl[3] == ( - Allow, - "group:with_admin_dashboard_access", - "admin_dashboard_access", - ) - assert acl[4] == (Allow, Authenticated, "manage:user") + assert acl == [ + (Allow, "group:admins", "admin"), + (Allow, "group:admins", "admin_dashboard_access"), + (Allow, "group:moderators", "moderator"), + (Allow, "group:moderators", "admin_dashboard_access"), + (Allow, "group:psf_staff", "psf_staff"), + (Allow, "group:psf_staff", "admin_dashboard_access"), + (Allow, Authenticated, "manage:user"), + ] diff --git a/tests/unit/utils/test_security_policy.py b/tests/unit/utils/test_security_policy.py index 38e7d287c899..c2054b3fcb27 100644 --- a/tests/unit/utils/test_security_policy.py +++ b/tests/unit/utils/test_security_policy.py @@ -11,256 +11,127 @@ # limitations under the License. import pretend -import pytest -from pyramid.authorization import Authenticated -from pyramid.security import Denied +from pyramid.interfaces import ISecurityPolicy +from zope.interface.verify import verifyClass -from warehouse.oidc.utils import OIDCContext from warehouse.utils import security_policy -from ...common.db.accounts import UserFactory -from ...common.db.oidc import GitHubPublisherFactory +def test_principals_for(): + identity = pretend.stub(__principals__=lambda: ["a", "b", "z"]) + assert security_policy.principals_for(identity) == ["a", "b", "z"] -@pytest.mark.parametrize( - ( - "is_superuser", - "is_moderator", - "is_psf_staff", - "expected", - ), - [ - (False, False, False, []), - ( - True, - False, - False, - [ - "group:admins", - "group:moderators", - "group:psf_staff", - "group:with_admin_dashboard_access", - ], - ), - ( - False, - True, - False, - ["group:moderators", "group:with_admin_dashboard_access"], - ), - ( - True, - True, - False, - [ - "group:admins", - "group:moderators", - "group:psf_staff", - "group:with_admin_dashboard_access", - ], - ), - ( - False, - False, - True, - ["group:psf_staff", "group:with_admin_dashboard_access"], - ), - ( - False, - True, - True, - [ - "group:moderators", - "group:psf_staff", - "group:with_admin_dashboard_access", - ], - ), - ], -) -def test_principals_for_authenticated_user( - is_superuser, - is_moderator, - is_psf_staff, - expected, -): - user = pretend.stub( - id=1, - is_superuser=is_superuser, - is_moderator=is_moderator, - is_psf_staff=is_psf_staff, - ) - assert security_policy._principals_for_authenticated_user(user) == expected +def test_principals_for_with_none(): + assert security_policy.principals_for(pretend.stub()) == [] -class TestMultiSecurityPolicy: - def test_initializes(self): - subpolicies = pretend.stub() - authz = pretend.stub() - policy = security_policy.MultiSecurityPolicy(subpolicies, authz) - assert policy._policies is subpolicies - assert policy._authz is authz +class TestMultiSecurityPolicy: + def test_verify(self): + assert verifyClass( + ISecurityPolicy, + security_policy.MultiSecurityPolicy, + ) def test_identity_none(self): - subpolicies = [pretend.stub(identity=pretend.call_recorder(lambda r: None))] - authz = pretend.stub() - policy = security_policy.MultiSecurityPolicy(subpolicies, authz) + subpolicies = [pretend.stub(identity=lambda r: None)] + policy = security_policy.MultiSecurityPolicy(subpolicies) - request = pretend.stub() + request = pretend.stub(add_finished_callback=lambda *a, **kw: None) assert policy.identity(request) is None - for p in subpolicies: - assert p.identity.calls == [pretend.call(request)] def test_identity_first_come_first_serve(self): identity1 = pretend.stub() identity2 = pretend.stub() subpolicies = [ - pretend.stub(identity=pretend.call_recorder(lambda r: None)), - pretend.stub(identity=pretend.call_recorder(lambda r: identity1)), - pretend.stub(identity=pretend.call_recorder(lambda r: identity2)), + pretend.stub(identity=lambda r: None), + pretend.stub(identity=lambda r: identity1), + pretend.stub(identity=lambda r: identity2), ] - authz = pretend.stub() - policy = security_policy.MultiSecurityPolicy(subpolicies, authz) + policy = security_policy.MultiSecurityPolicy(subpolicies) - request = pretend.stub() + request = pretend.stub(add_finished_callback=lambda *a, **kw: None) assert policy.identity(request) is identity1 - assert subpolicies[0].identity.calls == [pretend.call(request)] - assert subpolicies[1].identity.calls == [pretend.call(request)] - assert subpolicies[2].identity.calls == [] def test_authenticated_userid_no_identity(self): - request = pretend.stub() - subpolicies = [pretend.stub(identity=pretend.call_recorder(lambda r: None))] - authz = pretend.stub() - policy = security_policy.MultiSecurityPolicy(subpolicies, authz) + request = pretend.stub(add_finished_callback=lambda *a, **kw: None) + subpolicies = [pretend.stub(identity=lambda r: None)] + policy = security_policy.MultiSecurityPolicy(subpolicies) assert policy.authenticated_userid(request) is None - assert subpolicies[0].identity.calls == [pretend.call(request)] def test_authenticated_userid_nonuser_identity(self, db_request): - request = pretend.stub() + request = pretend.stub(add_finished_callback=lambda *a, **kw: None) nonuser = pretend.stub(id="not-a-user-instance") - subpolicies = [pretend.stub(identity=pretend.call_recorder(lambda r: nonuser))] - authz = pretend.stub() - policy = security_policy.MultiSecurityPolicy(subpolicies, authz) + subpolicies = [pretend.stub(identity=lambda r: nonuser)] + policy = security_policy.MultiSecurityPolicy(subpolicies) assert policy.authenticated_userid(request) is None - assert subpolicies[0].identity.calls == [pretend.call(request)] - def test_authenticated_userid(self, db_request): - request = pretend.stub() - user = UserFactory.create() - subpolicies = [pretend.stub(identity=pretend.call_recorder(lambda r: user))] - authz = pretend.stub() - policy = security_policy.MultiSecurityPolicy(subpolicies, authz) + def test_authenticated_userid(self, monkeypatch): + monkeypatch.setattr(security_policy, "User", pretend.stub) + + request = pretend.stub(add_finished_callback=lambda *a, **kw: None) + user = pretend.stub(id="a fake user") + subpolicies = [pretend.stub(identity=lambda r: user)] + policy = security_policy.MultiSecurityPolicy(subpolicies) assert policy.authenticated_userid(request) == str(user.id) - assert subpolicies[0].identity.calls == [pretend.call(request)] def test_forget(self): - header = pretend.stub() - subpolicies = [ - pretend.stub(forget=pretend.call_recorder(lambda r, **kw: [header])) - ] - authz = pretend.stub() - policy = security_policy.MultiSecurityPolicy(subpolicies, authz) + subpolicies = [pretend.stub(forget=lambda r, **kw: [("ForgetMe", "1")])] + policy = security_policy.MultiSecurityPolicy(subpolicies) request = pretend.stub() - assert policy.forget(request, foo=None) == [header] - assert subpolicies[0].forget.calls == [pretend.call(request, foo=None)] + assert policy.forget(request, foo=None) == [("ForgetMe", "1")] def test_remember(self): - header = pretend.stub() subpolicies = [ - pretend.stub(remember=pretend.call_recorder(lambda r, uid, **kw: [header])) + pretend.stub(remember=lambda r, uid, foo, **kw: [("RememberMe", foo)]) ] - authz = pretend.stub() - policy = security_policy.MultiSecurityPolicy(subpolicies, authz) + policy = security_policy.MultiSecurityPolicy(subpolicies) request = pretend.stub() userid = pretend.stub() - assert policy.remember(request, userid, foo=None) == [header] - assert subpolicies[0].remember.calls == [ - pretend.call(request, userid, foo=None) - ] + assert policy.remember(request, userid, foo="bob") == [("RememberMe", "bob")] - def test_permits_user(self, db_request, monkeypatch): - subpolicies = pretend.stub() - status = pretend.stub() - authz = pretend.stub(permits=pretend.call_recorder(lambda *a: status)) - policy = security_policy.MultiSecurityPolicy(subpolicies, authz) - - principals_for_authenticated_user = pretend.call_recorder( - lambda *a: ["some:principal"] - ) - monkeypatch.setattr( - security_policy, - "_principals_for_authenticated_user", - principals_for_authenticated_user, - ) - - user = UserFactory.create() - request = pretend.stub(identity=user) + def test_permits(self): + identity1 = pretend.stub() + identity2 = pretend.stub() context = pretend.stub() - permission = pretend.stub() - assert policy.permits(request, context, permission) is status - assert authz.permits.calls == [ - pretend.call( - context, - [Authenticated, f"user:{user.id}", "some:principal"], - permission, - ) - ] - def test_permits_oidc_publisher(self, db_request): - subpolicies = pretend.stub() - status = pretend.stub() - authz = pretend.stub(permits=pretend.call_recorder(lambda *a: status)) - policy = security_policy.MultiSecurityPolicy(subpolicies, authz) - - publisher = GitHubPublisherFactory.create() - request = pretend.stub(identity=OIDCContext(publisher, None)) - context = pretend.stub() - permission = pretend.stub() - assert policy.permits(request, context, permission) is status - assert authz.permits.calls == [ - pretend.call( - context, - [Authenticated, f"oidc:{publisher.id}"], - permission, - ) + subpolicies = [ + pretend.stub(identity=lambda r: None), + pretend.stub( + identity=lambda r: identity1, + permits=( + lambda r, c, p: r.identity == identity1 + and c == context + and p == "myperm" + ), + ), + pretend.stub(identity=lambda r: identity2), ] + policy = security_policy.MultiSecurityPolicy(subpolicies) - def test_permits_nonuser_denied(self): - subpolicies = pretend.stub() - authz = pretend.stub(permits=pretend.call_recorder(lambda *a: pretend.stub())) - policy = security_policy.MultiSecurityPolicy(subpolicies, authz) - - # Anything that doesn't pass an isinstance check for User - fakeuser = pretend.stub() - request = pretend.stub(identity=fakeuser) - context = pretend.stub() - permission = pretend.stub() - assert policy.permits(request, context, permission) == Denied("unimplemented") - assert authz.permits.calls == [] + request = pretend.stub( + identity=identity1, + add_finished_callback=lambda *a, **kw: None, + ) - def test_permits_no_identity(self): - subpolicies = pretend.stub() - status = pretend.stub() - authz = pretend.stub(permits=pretend.call_recorder(lambda *a: status)) - policy = security_policy.MultiSecurityPolicy(subpolicies, authz) + assert policy.permits(request, context, "myperm") - request = pretend.stub(identity=None) + def test_permits_no_policy(self): + subpolicies = [ + pretend.stub(identity=lambda r: None), + pretend.stub(identity=lambda r: None), + pretend.stub(identity=lambda r: None), + ] + policy = security_policy.MultiSecurityPolicy(subpolicies) + request = pretend.stub( + identity=None, add_finished_callback=lambda *a, **kw: None + ) context = pretend.stub() - permission = pretend.stub() - assert policy.permits(request, context, permission) is status - assert authz.permits.calls == [pretend.call(context, [], permission)] - - def test_cant_use_unauthenticated_userid(self): - subpolicies = pretend.stub() - authz = pretend.stub(permits=pretend.call_recorder(lambda *a: pretend.stub())) - policy = security_policy.MultiSecurityPolicy(subpolicies, authz) - with pytest.raises(NotImplementedError): - policy.unauthenticated_userid(pretend.stub()) + assert not policy.permits(request, context, "myperm") diff --git a/warehouse/accounts/__init__.py b/warehouse/accounts/__init__.py index 05cc2e97518c..9667b3a064c9 100644 --- a/warehouse/accounts/__init__.py +++ b/warehouse/accounts/__init__.py @@ -10,8 +10,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from pyramid.authorization import ACLAuthorizationPolicy - from warehouse.accounts.interfaces import ( IEmailBreachedService, IPasswordBreachedService, @@ -22,7 +20,6 @@ from warehouse.accounts.security_policy import ( BasicAuthSecurityPolicy, SessionSecurityPolicy, - TwoFactorAuthorizationPolicy, ) from warehouse.accounts.services import ( HaveIBeenPwnedEmailBreachedService, @@ -33,10 +30,7 @@ database_login_factory, ) from warehouse.admin.flags import AdminFlagValue -from warehouse.macaroons.security_policy import ( - MacaroonAuthorizationPolicy, - MacaroonSecurityPolicy, -) +from warehouse.macaroons.security_policy import MacaroonSecurityPolicy from warehouse.oidc.utils import OIDCContext from warehouse.organizations.services import IOrganizationService from warehouse.rate_limiting import IRateLimiter, RateLimit @@ -127,10 +121,7 @@ def includeme(config): breached_email_class.create_service, IEmailBreachedService ) - # Register our security policies (AuthN + AuthZ) - authz_policy = TwoFactorAuthorizationPolicy( - policy=MacaroonAuthorizationPolicy(policy=ACLAuthorizationPolicy()) - ) + # Register our security policies. config.set_security_policy( MultiSecurityPolicy( [ @@ -138,7 +129,6 @@ def includeme(config): BasicAuthSecurityPolicy(), MacaroonSecurityPolicy(), ], - authz_policy, ) ) diff --git a/warehouse/accounts/models.py b/warehouse/accounts/models.py index 08c464bd1950..5aad274d43f5 100644 --- a/warehouse/accounts/models.py +++ b/warehouse/accounts/models.py @@ -14,7 +14,7 @@ import enum from citext import CIText -from pyramid.authorization import Allow +from pyramid.authorization import Allow, Authenticated from sqlalchemy import ( Boolean, CheckConstraint, @@ -192,6 +192,18 @@ def can_reset_password(self): ] ) + def __principals__(self) -> list[str]: + principals = [Authenticated, f"user:{self.id}"] + + if self.is_superuser: + principals.append("group:admins") + if self.is_moderator or self.is_superuser: + principals.append("group:moderators") + if self.is_psf_staff or self.is_superuser: + principals.append("group:psf_staff") + + return principals + def __acl__(self): return [ (Allow, "group:admins", "admin"), diff --git a/warehouse/accounts/security_policy.py b/warehouse/accounts/security_policy.py index 16dc2d6775e9..463ba6c8cf15 100644 --- a/warehouse/accounts/security_policy.py +++ b/warehouse/accounts/security_policy.py @@ -16,13 +16,14 @@ SessionAuthenticationHelper, extract_http_basic_credentials, ) +from pyramid.authorization import ACLHelper from pyramid.httpexceptions import HTTPUnauthorized -from pyramid.interfaces import IAuthorizationPolicy, ISecurityPolicy -from pyramid.threadlocal import get_current_request +from pyramid.interfaces import ISecurityPolicy +from pyramid.security import Allowed from zope.interface import implementer from warehouse.accounts.interfaces import IPasswordBreachedService, IUserService -from warehouse.accounts.models import DisableReason +from warehouse.accounts.models import DisableReason, User from warehouse.cache.http import add_vary_callback from warehouse.email import send_password_compromised_email_hibp from warehouse.errors import ( @@ -33,7 +34,7 @@ ) from warehouse.events.tags import EventTag from warehouse.packaging.models import TwoFactorRequireable -from warehouse.utils.security_policy import AuthenticationMethod +from warehouse.utils.security_policy import AuthenticationMethod, principals_for def _format_exc_status(exc, message): @@ -124,6 +125,7 @@ def _basic_auth_check(username, password, request): class SessionSecurityPolicy: def __init__(self): self._session_helper = SessionAuthenticationHelper() + self._acl = ACLHelper() def identity(self, request): # If we're calling into this API on a request, then we want to register @@ -184,12 +186,14 @@ def authenticated_userid(self, request): raise NotImplementedError def permits(self, request, context, permission): - # Handled by MultiSecurityPolicy - raise NotImplementedError + return _permits_for_user_policy(self._acl, request, context, permission) @implementer(ISecurityPolicy) class BasicAuthSecurityPolicy: + def __init__(self): + self._acl = ACLHelper() + def identity(self, request): # If we're calling into this API on a request, then we want to register # a callback which will ensure that the response varies based on the @@ -225,82 +229,77 @@ def authenticated_userid(self, request): raise NotImplementedError def permits(self, request, context, permission): - # Handled by MultiSecurityPolicy - raise NotImplementedError + return _permits_for_user_policy(self._acl, request, context, permission) -@implementer(IAuthorizationPolicy) -class TwoFactorAuthorizationPolicy: - def __init__(self, policy): - self.policy = policy +def _permits_for_user_policy(acl, request, context, permission): + # It should only be possible for request.identity to be a User object + # at this point, and we only a User in these policies. + assert isinstance(request.identity, User) - def permits(self, context, principals, permission): - # The Pyramid API doesn't let us access the request here, so we have to pull it - # out of the thread local instead. - # TODO: Work with Pyramid devs to figure out if there is a better way to support - # the worklow we are using here or not. - request = get_current_request() + # Dispatch to our ACL + # NOTE: These parameters are in a different order than the signature of this method. + res = acl.permits(context, principals_for(request.identity), permission) - # Our request could possibly be a None, if there isn't an active request, in - # that case we're going to always deny, because without a request, we can't - # determine if this request is authorized or not. - if request is None: + # If our underlying permits allowed this, we will check our 2FA status, + # that might possibly return a reason to deny the request anyways, and if + # it does we'll return that. + if isinstance(res, Allowed): + mfa = _check_for_mfa(request, context) + if mfa is not None: + return mfa + + return res + + +def _check_for_mfa(request, context) -> WarehouseDenied | None: + # It should only be possible for request.identity to be a User object + # at this point, and we only a User in these policies. + assert isinstance(request.identity, User) + + # Check if the context is 2FA requireable, if 2FA is indeed required, and if + # the user has 2FA enabled. + if isinstance(context, TwoFactorRequireable): + # Check if we allow owners to require 2FA, and if so does our context owner + # require 2FA? And if so does our user have 2FA? + if ( + request.registry.settings["warehouse.two_factor_requirement.enabled"] + and context.owners_require_2fa + and not request.identity.has_two_factor + ): return WarehouseDenied( - "There was no active request.", reason="no_active_request" + "This project requires two factor authentication to be enabled " + "for all contributors.", + reason="owners_require_2fa", ) - # Check if the subpolicy permits authorization - subpolicy_permits = self.policy.permits(context, principals, permission) - - # If the request is permitted by the subpolicy, check if the context is - # 2FA requireable, if 2FA is indeed required, and if the user has 2FA - # enabled. - # - # We check for `request.user` explicitly because we don't perform - # this check for non-user identities: the only way a non-user - # identity can be created is after a 2FA check on a 2FA-mandated - # project. + # Check if PyPI is enforcing the 2FA mandate on "critical" projects, and if it + # is does our current context require it, and if it does, does our user have + # 2FA? if ( - subpolicy_permits - and isinstance(context, TwoFactorRequireable) - and request.user + request.registry.settings["warehouse.two_factor_mandate.enabled"] + and context.pypi_mandates_2fa + and not request.identity.has_two_factor ): - if ( - request.registry.settings["warehouse.two_factor_requirement.enabled"] - and context.owners_require_2fa - and not request.user.has_two_factor - ): - return WarehouseDenied( - "This project requires two factor authentication to be enabled " - "for all contributors.", - reason="owners_require_2fa", - ) - if ( - request.registry.settings["warehouse.two_factor_mandate.enabled"] - and context.pypi_mandates_2fa - and not request.user.has_two_factor - ): - return WarehouseDenied( - "PyPI requires two factor authentication to be enabled " - "for all contributors to this project.", - reason="pypi_mandates_2fa", - ) - if ( - request.registry.settings["warehouse.two_factor_mandate.available"] - and context.pypi_mandates_2fa - and not request.user.has_two_factor - ): - request.session.flash( - "This project is included in PyPI's two-factor mandate " - "for critical projects. In the future, you will be unable to " - "perform this action without enabling 2FA for your account", - queue="warning", - ) + return WarehouseDenied( + "PyPI requires two factor authentication to be enabled " + "for all contributors to this project.", + reason="pypi_mandates_2fa", + ) - return subpolicy_permits + # Check if PyPI's 2FA mandate is available, but not enforcing, and if it is and + # the current context would require 2FA, and if our user doesn't have have 2FA + # then we'll flash a warning. + if ( + request.registry.settings["warehouse.two_factor_mandate.available"] + and context.pypi_mandates_2fa + and not request.identity.has_two_factor + ): + request.session.flash( + "This project is included in PyPI's two-factor mandate " + "for critical projects. In the future, you will be unable to " + "perform this action without enabling 2FA for your account", + queue="warning", + ) - def principals_allowed_by_permission(self, context, permission): - # We just dispatch this, because this policy doesn't restrict what - # principals are allowed by a particular permission, it just restricts - # specific requests to not have that permission. - return self.policy.principals_allowed_by_permission(context, permission) + return None diff --git a/warehouse/config.py b/warehouse/config.py index b89a5bf04828..57b734c31462 100644 --- a/warehouse/config.py +++ b/warehouse/config.py @@ -58,9 +58,11 @@ class RootFactory: __acl__ = [ (Allow, "group:admins", "admin"), + (Allow, "group:admins", "admin_dashboard_access"), (Allow, "group:moderators", "moderator"), + (Allow, "group:moderators", "admin_dashboard_access"), (Allow, "group:psf_staff", "psf_staff"), - (Allow, "group:with_admin_dashboard_access", "admin_dashboard_access"), + (Allow, "group:psf_staff", "admin_dashboard_access"), (Allow, Authenticated, "manage:user"), ] diff --git a/warehouse/macaroons/security_policy.py b/warehouse/macaroons/security_policy.py index 0ec3872cf0aa..56b0bcb1ac2a 100644 --- a/warehouse/macaroons/security_policy.py +++ b/warehouse/macaroons/security_policy.py @@ -12,8 +12,8 @@ import base64 -from pyramid.interfaces import IAuthorizationPolicy, ISecurityPolicy -from pyramid.threadlocal import get_current_request +from pyramid.authorization import ACLHelper +from pyramid.interfaces import ISecurityPolicy from zope.interface import implementer from warehouse.cache.http import add_vary_callback @@ -21,7 +21,7 @@ from warehouse.macaroons import InvalidMacaroonError from warehouse.macaroons.interfaces import IMacaroonService from warehouse.oidc.utils import OIDCContext -from warehouse.utils.security_policy import AuthenticationMethod +from warehouse.utils.security_policy import AuthenticationMethod, principals_for def _extract_basic_macaroon(auth): @@ -73,6 +73,9 @@ def _extract_http_macaroon(request): @implementer(ISecurityPolicy) class MacaroonSecurityPolicy: + def __init__(self): + self._acl = ACLHelper() + def identity(self, request): # If we're calling into this API on a request, then we want to register # a callback which will ensure that the response varies based on the @@ -122,65 +125,41 @@ def authenticated_userid(self, request): raise NotImplementedError def permits(self, request, context, permission): - # Handled by MultiSecurityPolicy - raise NotImplementedError - - -@implementer(IAuthorizationPolicy) -class MacaroonAuthorizationPolicy: - def __init__(self, policy): - self.policy = policy - - def permits(self, context, principals, permission): - # The Pyramid API doesn't let us access the request here, so we have to pull it - # out of the thread local instead. - # TODO: Work with Pyramid devs to figure out if there is a better way to support - # the worklow we are using here or not. - request = get_current_request() - - # Our request could possibly be a None, if there isn't an active request, in - # that case we're going to always deny, because without a request, we can't - # determine if this request is authorized or not. - if request is None: - return WarehouseDenied( - "There was no active request.", reason="no_active_request" - ) - # Re-extract our Macaroon from the request, it sucks to have to do this work # twice, but I believe it is inevitable unless we pass the Macaroon back as # a principal-- which doesn't seem to be the right fit for it. macaroon = _extract_http_macaroon(request) - # This logic will only happen on requests that are being authenticated with - # Macaroons. Any other request will just fall back to the standard Authorization - # policy. - if macaroon is not None: - valid_permissions = ["upload"] - macaroon_service = request.find_service(IMacaroonService, context=None) - - try: - macaroon_service.verify(macaroon, request, context, permission) - except InvalidMacaroonError as exc: - return WarehouseDenied( - f"Invalid API Token: {exc}", reason="invalid_api_token" - ) - - # If our Macaroon is verified, and for a valid permission then we'll pass - # this request to our underlying Authorization policy, so it can handle its - # own authorization logic on the principal. - if permission in valid_permissions: - return self.policy.permits(context, principals, permission) - else: - return WarehouseDenied( - f"API tokens are not valid for permission: {permission}!", - reason="invalid_permission", - ) - - else: - return self.policy.permits(context, principals, permission) - - def principals_allowed_by_permission(self, context, permission): - # We just dispatch this, because Macaroons don't restrict what principals are - # allowed by a particular permission, they just restrict specific requests - # to not have that permission. - return self.policy.principals_allowed_by_permission(context, permission) + # It should not be possible to *not* have a macaroon at this point, because we + # can't call this function without an identity that came from a macaroon + assert isinstance(macaroon, str), "no valid macaroon" + + # Check to make sure that the permission we're attempting to permit is one that + # is allowed to be used for macaroons. + # TODO: This should be moved out of there and into the macaroons themselves, it + # doesn't really make a lot of sense here and it makes things more + # complicated if we want to allow the use of macaroons for actions other + # than uploading. + if permission not in ["upload"]: + return WarehouseDenied( + f"API tokens are not valid for permission: {permission}!", + reason="invalid_permission", + ) + + # Check if our macaroon itself is valid. This does not actually check if the + # identity bound to that macaroon has permission to do what it's trying to do + # but rather that the caveats embedded into the macaroon are valid for the given + # request, context, and permission. + macaroon_service = request.find_service(IMacaroonService, context=None) + try: + macaroon_service.verify(macaroon, request, context, permission) + except InvalidMacaroonError as exc: + return WarehouseDenied( + f"Invalid API Token: {exc}", reason="invalid_api_token" + ) + + # The macaroon is valid, so we can actually see if request.identity is + # authorized now or not. + # NOTE: These parameters are in a different order than the signature of this + # method. + return self._acl.permits(context, principals_for(request.identity), permission) diff --git a/warehouse/oidc/utils.py b/warehouse/oidc/utils.py index c5e2f3d701aa..d692adb3867d 100644 --- a/warehouse/oidc/utils.py +++ b/warehouse/oidc/utils.py @@ -14,6 +14,7 @@ from dataclasses import dataclass +from pyramid.authorization import Authenticated from sqlalchemy.sql.expression import func, literal from warehouse.oidc.interfaces import SignedClaims @@ -117,3 +118,6 @@ class OIDCContext: """ Pertinent OIDC claims from the token, if they exist. """ + + def __principals__(self) -> list[str]: + return [Authenticated, f"oidc:{self.publisher.id}"] diff --git a/warehouse/utils/security_policy.py b/warehouse/utils/security_policy.py index 4bb3c7c6150d..f0874eb343ed 100644 --- a/warehouse/utils/security_policy.py +++ b/warehouse/utils/security_policy.py @@ -12,13 +12,20 @@ import enum -from pyramid.authorization import Authenticated from pyramid.interfaces import ISecurityPolicy +from pyramid.request import RequestLocalCache from pyramid.security import Denied from zope.interface import implementer from warehouse.accounts.models import User -from warehouse.oidc.utils import OIDCContext + + +# NOTE: Is there a better place for this to live? It may not even need to exist +# since it's so small, it may be easier to just inline it. +def principals_for(identity) -> list[str]: + if hasattr(identity, "__principals__"): + return identity.__principals__() + return [] class AuthenticationMethod(enum.Enum): @@ -27,23 +34,6 @@ class AuthenticationMethod(enum.Enum): MACAROON = "macaroon" -def _principals_for_authenticated_user(user): - """Apply the necessary principals to the authenticated user""" - principals = [] - if user.is_superuser: - principals.append("group:admins") - if user.is_moderator or user.is_superuser: - principals.append("group:moderators") - if user.is_psf_staff or user.is_superuser: - principals.append("group:psf_staff") - - # user must have base admin access if any admin permission - if principals: - principals.append("group:with_admin_dashboard_access") - - return principals - - @implementer(ISecurityPolicy) class MultiSecurityPolicy: """ @@ -54,35 +44,45 @@ class MultiSecurityPolicy: with the following semantics: * `identity`: Selected from the first policy to return non-`None` - * `authenticated_userid`: Selected from the first policy to return a user + * `authenticated_userid`: Selected from the first policy to return an identity * `forget`: Combined from all policies * `remember`: Combined from all policies - * `permits`: Uses the AuthZ policy passed during initialization + * `permits`: Uses the the policy that returned the identity. These semantics mostly mirror those of `pyramid-multiauth`. """ - def __init__(self, policies, authz): + def __init__(self, policies): self._policies = policies - self._authz = authz + self._identity_cache = RequestLocalCache(self._get_identity_with_policy) - def identity(self, request): + def _get_identity_with_policy(self, request): + # This will be cached per request, which means that we'll have a stable + # result for both the identity AND the policy that produced it. Further + # uses can then make sure to use the same policy throughout, at least + # where it makes sense to. for policy in self._policies: if ident := policy.identity(request): - return ident + return ident, policy - return None + return None, None + + def identity(self, request): + identity, _policy = self._identity_cache.get_or_create(request) + return identity def authenticated_userid(self, request): if ident := self.identity(request): + # TODO: Note, this logic breaks the contract of a SecurityPolicy, the + # authenticated_userid is intended to be used to fetch the unique + # identifier that represents the current identity. We're leaving + # it here for now, because there are a number of views directly + # using this to detect user vs not, which we'll need to move to a + # more correct pattern before fixing this. if isinstance(ident, User): return str(ident.id) return None - def unauthenticated_userid(self, request): - # This is deprecated and we shouldn't use it - raise NotImplementedError - def forget(self, request, **kw): headers = [] for policy in self._policies: @@ -96,22 +96,16 @@ def remember(self, request, userid, **kw): return headers def permits(self, request, context, permission): - identity = request.identity - principals = [] - if identity is not None: - principals.append(Authenticated) - - if isinstance(identity, User): - principals.append(f"user:{identity.id}") - principals.extend(_principals_for_authenticated_user(identity)) - elif isinstance(identity, OIDCContext): - principals.append(f"oidc:{identity.publisher.id}") - else: - return Denied("unknown identity") - - # NOTE: Observe that the parameters passed into the underlying AuthZ - # policy here are not the same (or in the same order) as the ones - # passed into `permits` above. This is because the underlying AuthZ - # policy is a "legacy" Pyramid 1.0 style one that implements the - # `IAuthorizationPolicy` interface rather than `ISecurityPolicy`. - return self._authz.permits(context, principals, permission) + identity, policy = self._identity_cache.get_or_create(request) + # Sanity check that somehow our cached identity + policy didn't end up + # different than what the request.identity is. This shouldn't be possible + # but we'll assert it because if we let it pass silently it may mean that + # some kind of confused-deputy attack is possible. + assert request.identity == identity, "request has a different identity" + + # Dispatch to the underlying policy for the given identity, if there was one + # for this request. + if policy is not None: + return policy.permits(request, context, permission) + else: + return Denied("unknown identity")