Skip to content

Send notification email on basic auth with 2FA #10836

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 78 additions & 9 deletions tests/unit/accounts/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def test_invalid_route(self, pyramid_request, pyramid_services):
pretend.stub(), IPasswordBreachedService, None
)
pyramid_request.matched_route = pretend.stub(name="route_name")
assert accounts._basic_auth_login("myuser", "mypass", pyramid_request) is None
assert accounts._basic_auth_check("myuser", "mypass", pyramid_request) is None
assert service.find_userid.calls == []

def test_with_no_user(self, pyramid_request, pyramid_services):
Expand All @@ -50,7 +50,7 @@ def test_with_no_user(self, pyramid_request, pyramid_services):
pretend.stub(), IPasswordBreachedService, None
)
pyramid_request.matched_route = pretend.stub(name="forklift.legacy.file_upload")
assert accounts._basic_auth_login("myuser", "mypass", pyramid_request) is None
assert accounts._basic_auth_check("myuser", "mypass", pyramid_request) is None
assert service.find_userid.calls == [pretend.call("myuser")]

def test_with_invalid_password(self, pyramid_request, pyramid_services):
Expand All @@ -75,7 +75,7 @@ def test_with_invalid_password(self, pyramid_request, pyramid_services):

with pytest.raises(BasicAuthFailedPassword) as excinfo:
assert (
accounts._basic_auth_login("myuser", "mypass", pyramid_request) is None
accounts._basic_auth_check("myuser", "mypass", pyramid_request) is None
)

assert excinfo.value.status == (
Expand Down Expand Up @@ -122,7 +122,7 @@ def test_with_disabled_user_no_reason(self, pyramid_request, pyramid_services):

with pytest.raises(BasicAuthFailedPassword) as excinfo:
assert (
accounts._basic_auth_login("myuser", "mypass", pyramid_request) is None
accounts._basic_auth_check("myuser", "mypass", pyramid_request) is None
)

assert excinfo.value.status == (
Expand Down Expand Up @@ -169,7 +169,7 @@ def test_with_disabled_user_compromised_pw(self, pyramid_request, pyramid_servic

with pytest.raises(BasicAuthBreachedPassword) as excinfo:
assert (
accounts._basic_auth_login("myuser", "mypass", pyramid_request) is None
accounts._basic_auth_check("myuser", "mypass", pyramid_request) is None
)

assert excinfo.value.status == "401 Bad Password!"
Expand All @@ -183,7 +183,7 @@ def test_with_valid_password(self, monkeypatch, pyramid_request, pyramid_service
authenticate = pretend.call_recorder(lambda userid, request: principals)
monkeypatch.setattr(accounts, "_authenticate", authenticate)

user = pretend.stub(id=2)
user = pretend.stub(id=2, has_two_factor=False)
service = pretend.stub(
get_user=pretend.call_recorder(lambda user_id: user),
find_userid=pretend.call_recorder(lambda username: 2),
Expand All @@ -208,7 +208,7 @@ def test_with_valid_password(self, monkeypatch, pyramid_request, pyramid_service

with freezegun.freeze_time(now):
assert (
accounts._basic_auth_login("myuser", "mypass", pyramid_request)
accounts._basic_auth_check("myuser", "mypass", pyramid_request)
is principals
)

Expand Down Expand Up @@ -259,7 +259,7 @@ def test_via_basic_auth_compromised(
pyramid_request.matched_route = pretend.stub(name="forklift.legacy.file_upload")

with pytest.raises(BasicAuthBreachedPassword) as excinfo:
accounts._basic_auth_login("myuser", "mypass", pyramid_request)
accounts._basic_auth_check("myuser", "mypass", pyramid_request)

assert excinfo.value.status == "401 Bad Password!"
assert service.find_userid.calls == [pretend.call("myuser")]
Expand All @@ -280,6 +280,64 @@ def test_via_basic_auth_compromised(
]
assert send_email.calls == [pretend.call(pyramid_request, user)]

def test_via_basic_auth_2fa_enabled(
self, monkeypatch, pyramid_request, pyramid_services
):
principals = pretend.stub()
authenticate = pretend.call_recorder(lambda userid, request: principals)
monkeypatch.setattr(accounts, "_authenticate", authenticate)
send_email = pretend.call_recorder(lambda *a, **kw: None)
monkeypatch.setattr(
accounts, "send_basic_auth_with_two_factor_email", send_email
)

user = pretend.stub(id=2, has_two_factor=True)
service = pretend.stub(
get_user=pretend.call_recorder(lambda user_id: user),
find_userid=pretend.call_recorder(lambda username: 2),
check_password=pretend.call_recorder(
lambda userid, password, tags=None: True
),
is_disabled=pretend.call_recorder(lambda user_id: (False, None)),
disable_password=pretend.call_recorder(lambda user_id, reason=None: None),
update_user=pretend.call_recorder(lambda userid, last_login: None),
)
breach_service = pretend.stub(
check_password=pretend.call_recorder(lambda pw, tags=None: False)
)

pyramid_services.register_service(service, IUserService, None)
pyramid_services.register_service(
breach_service, IPasswordBreachedService, None
)

pyramid_request.matched_route = pretend.stub(name="forklift.legacy.file_upload")

now = datetime.datetime.utcnow()

with freezegun.freeze_time(now):
assert (
accounts._basic_auth_check("myuser", "mypass", pyramid_request)
is principals
)

assert service.find_userid.calls == [pretend.call("myuser")]
assert service.get_user.calls == [pretend.call(2)]
assert service.is_disabled.calls == [pretend.call(2)]
assert service.check_password.calls == [
pretend.call(
2,
"mypass",
tags=["mechanism:basic_auth", "method:auth", "auth_method:basic"],
)
]
assert breach_service.check_password.calls == [
pretend.call("mypass", tags=["method:auth", "auth_method:basic"])
]
assert send_email.calls == [pretend.call(pyramid_request, user)]
assert service.update_user.calls == [pretend.call(2, last_login=now)]
assert authenticate.calls == [pretend.call(2, pyramid_request)]


class TestAuthenticate:
@pytest.mark.parametrize(
Expand Down Expand Up @@ -372,6 +430,17 @@ def test_route_matched_name_ok(self, monkeypatch):
assert authenticate_obj.calls == [pretend.call(1, request)]


class TestMacaroonAuthenticate:
def test_macaroon_authenticate(self, monkeypatch):
authenticate_obj = pretend.call_recorder(lambda *a, **kw: True)
monkeypatch.setattr(accounts, "_authenticate", authenticate_obj)
request = pretend.stub(
matched_route=pretend.stub(name="includes.current-user-indicator")
)
assert accounts._macaroon_authenticate(1, request) is True
assert authenticate_obj.calls == [pretend.call(1, request)]


class TestUser:
def test_with_user(self):
user = pretend.stub()
Expand Down Expand Up @@ -466,7 +535,7 @@ def test_includeme(monkeypatch):
]
assert config.set_authentication_policy.calls == [pretend.call(authn_obj)]
assert config.set_authorization_policy.calls == [pretend.call(authz_obj)]
assert basic_authn_cls.calls == [pretend.call(check=accounts._basic_auth_login)]
assert basic_authn_cls.calls == [pretend.call(check=accounts._basic_auth_check)]
assert session_authn_cls.calls == [
pretend.call(callback=accounts._session_authenticate)
]
Expand Down
71 changes: 71 additions & 0 deletions tests/unit/email/test_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -942,6 +942,77 @@ def test_password_compromised_email(
]


class TestBasicAuthWith2FAEmail:
@pytest.mark.parametrize("verified", [True, False])
def test_basic_auth_with_2fa_email(
self, pyramid_request, pyramid_config, monkeypatch, verified
):
stub_user = pretend.stub(
id="id",
username="username",
name="",
email="[email protected]",
primary_email=pretend.stub(email="[email protected]", verified=verified),
)
subject_renderer = pyramid_config.testing_add_renderer(
"email/basic-auth-with-2fa/subject.txt"
)
subject_renderer.string_response = "Email Subject"
body_renderer = pyramid_config.testing_add_renderer(
"email/basic-auth-with-2fa/body.txt"
)
body_renderer.string_response = "Email Body"
html_renderer = pyramid_config.testing_add_renderer(
"email/basic-auth-with-2fa/body.html"
)
html_renderer.string_response = "Email HTML Body"

send_email = pretend.stub(
delay=pretend.call_recorder(lambda *args, **kwargs: None)
)
pyramid_request.task = pretend.call_recorder(lambda *args, **kwargs: send_email)
monkeypatch.setattr(email, "send_email", send_email)

pyramid_request.db = pretend.stub(
query=lambda a: pretend.stub(
filter=lambda *a: pretend.stub(
one=lambda: pretend.stub(user_id=stub_user.id)
)
),
)
pyramid_request.user = stub_user
pyramid_request.registry.settings = {"mail.sender": "[email protected]"}

result = email.send_basic_auth_with_two_factor_email(pyramid_request, stub_user)

assert result == {}
assert pyramid_request.task.calls == [pretend.call(send_email)]
assert send_email.delay.calls == [
pretend.call(
f"{stub_user.username} <{stub_user.email}>",
{
"subject": "Email Subject",
"body_text": "Email Body",
"body_html": (
"<html>\n<head></head>\n"
"<body><p>Email HTML Body</p></body>\n</html>\n"
),
},
{
"tag": "account:email:sent",
"user_id": stub_user.id,
"ip_address": pyramid_request.remote_addr,
"additional": {
"from_": "[email protected]",
"to": stub_user.email,
"subject": "Email Subject",
"redact_ip": False,
},
},
)
]


class TestAccountDeletionEmail:
def test_account_deletion_email(self, pyramid_request, pyramid_config, monkeypatch):

Expand Down
76 changes: 44 additions & 32 deletions warehouse/accounts/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@
TokenServiceFactory,
database_login_factory,
)
from warehouse.email import send_password_compromised_email_hibp
from warehouse.email import (
send_basic_auth_with_two_factor_email,
send_password_compromised_email_hibp,
)
from warehouse.errors import BasicAuthBreachedPassword, BasicAuthFailedPassword
from warehouse.macaroons.auth_policy import (
MacaroonAuthenticationPolicy,
Expand All @@ -51,7 +54,32 @@ def _format_exc_status(exc, message):
return exc


def _basic_auth_login(username, password, request):
def _authenticate(userid, request):
"""Apply the necessary principals to the authenticated user"""
login_service = request.find_service(IUserService, context=None)
user = login_service.get_user(userid)

if user is None:
return

principals = []

if user.is_superuser:
principals.append("group:admins")
if user.is_moderator or user.is_superuser:
principals.append("group:moderators")
if user.is_psf_staff or user.is_superuser:
principals.append("group:psf_staff")

# user must have base admin access if any admin permission
if principals:
principals.append("group:with_admin_dashboard_access")

return principals


def _basic_auth_check(username, password, request):
# Basic authentication can only be used for uploading
if request.matched_route.name not in ["forklift.legacy.file_upload"]:
return

Expand Down Expand Up @@ -89,11 +117,13 @@ def _basic_auth_login(username, password, request):
raise _format_exc_status(
BasicAuthBreachedPassword(), breach_service.failure_message_plain
)
else:
login_service.update_user(
user.id, last_login=datetime.datetime.utcnow()
)
return _authenticate(user.id, request)

if user.has_two_factor:
send_basic_auth_with_two_factor_email(request, user)
# Eventually, raise here to disable basic auth with 2FA enabled

login_service.update_user(user.id, last_login=datetime.datetime.utcnow())
return _authenticate(user.id, request)
else:
user.record_event(
tag="account:login:failure",
Expand All @@ -109,36 +139,18 @@ def _basic_auth_login(username, password, request):
)


def _authenticate(userid, request):
login_service = request.find_service(IUserService, context=None)
user = login_service.get_user(userid)

if user is None:
return

principals = []

if user.is_superuser:
principals.append("group:admins")
if user.is_moderator or user.is_superuser:
principals.append("group:moderators")
if user.is_psf_staff or user.is_superuser:
principals.append("group:psf_staff")

# user must have base admin access if any admin permission
if principals:
principals.append("group:with_admin_dashboard_access")

return principals


def _session_authenticate(userid, request):
# Session authentication cannot be used for uploading
if request.matched_route.name in ["forklift.legacy.file_upload"]:
return

return _authenticate(userid, request)


def _macaroon_authenticate(userid, request):
return _authenticate(userid, request)


def _user(request):
userid = request.authenticated_userid

Expand Down Expand Up @@ -179,8 +191,8 @@ def includeme(config):
MultiAuthenticationPolicy(
[
SessionAuthenticationPolicy(callback=_session_authenticate),
BasicAuthAuthenticationPolicy(check=_basic_auth_login),
MacaroonAuthenticationPolicy(callback=_authenticate),
BasicAuthAuthenticationPolicy(check=_basic_auth_check),
MacaroonAuthenticationPolicy(callback=_macaroon_authenticate),
]
)
)
Expand Down
5 changes: 5 additions & 0 deletions warehouse/email/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,11 @@ def send_token_compromised_email_leak(request, user, *, public_url, origin):
return {"username": user.username, "public_url": public_url, "origin": origin}


@_email("basic-auth-with-2fa", allow_unverified=True)
def send_basic_auth_with_two_factor_email(request, user):
return {}


@_email("account-deleted")
def send_account_deletion_email(request, user):
return {"username": user.username}
Expand Down
Loading