From e296b50aa461d7ec39ad73a1287b9a80c1f1e814 Mon Sep 17 00:00:00 2001 From: Lance Edgar Date: Sun, 4 Aug 2024 21:54:46 -0500 Subject: [PATCH] feat: add custom security policy, login/logout for pyramid aka. the `wuttaweb.auth` module --- docs/api/wuttaweb/auth.rst | 6 ++ docs/api/wuttaweb/index.rst | 1 + src/wuttaweb/app.py | 4 + src/wuttaweb/auth.py | 146 ++++++++++++++++++++++++++++++++++++ tests/test_auth.py | 139 ++++++++++++++++++++++++++++++++++ 5 files changed, 296 insertions(+) create mode 100644 docs/api/wuttaweb/auth.rst create mode 100644 src/wuttaweb/auth.py create mode 100644 tests/test_auth.py diff --git a/docs/api/wuttaweb/auth.rst b/docs/api/wuttaweb/auth.rst new file mode 100644 index 0000000..d645c67 --- /dev/null +++ b/docs/api/wuttaweb/auth.rst @@ -0,0 +1,6 @@ + +``wuttaweb.auth`` +================= + +.. automodule:: wuttaweb.auth + :members: diff --git a/docs/api/wuttaweb/index.rst b/docs/api/wuttaweb/index.rst index 5a65f11..8ee6ff4 100644 --- a/docs/api/wuttaweb/index.rst +++ b/docs/api/wuttaweb/index.rst @@ -8,6 +8,7 @@ :maxdepth: 1 app + auth db forms forms.base diff --git a/src/wuttaweb/app.py b/src/wuttaweb/app.py index a35d00d..f8bfc3a 100644 --- a/src/wuttaweb/app.py +++ b/src/wuttaweb/app.py @@ -32,6 +32,7 @@ from wuttjamaican.conf import make_config from pyramid.config import Configurator import wuttaweb.db +from wuttaweb.auth import WuttaSecurityPolicy class WebAppProvider(AppProvider): @@ -115,6 +116,9 @@ def make_pyramid_config(settings): pyramid_config = Configurator(settings=settings) + # configure user authorization / authentication + pyramid_config.set_security_policy(WuttaSecurityPolicy()) + pyramid_config.include('pyramid_beaker') pyramid_config.include('pyramid_deform') pyramid_config.include('pyramid_mako') diff --git a/src/wuttaweb/auth.py b/src/wuttaweb/auth.py new file mode 100644 index 0000000..0c2f26d --- /dev/null +++ b/src/wuttaweb/auth.py @@ -0,0 +1,146 @@ +# -*- coding: utf-8; -*- +################################################################################ +# +# wuttaweb -- Web App for Wutta Framework +# Copyright © 2024 Lance Edgar +# +# This file is part of Wutta Framework. +# +# Wutta Framework is free software: you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) any +# later version. +# +# Wutta Framework is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# Wutta Framework. If not, see . +# +################################################################################ +""" +Auth Utility Logic +""" + +import re + +from pyramid.authentication import SessionAuthenticationHelper +from pyramid.request import RequestLocalCache +from pyramid.security import remember, forget + +from wuttaweb.db import Session + + +def login_user(request, user): + """ + Perform the steps necessary to "login" the given user. This + returns a ``headers`` dict which you should pass to the final + redirect, like so:: + + from pyramid.httpexceptions import HTTPFound + + headers = login_user(request, user) + return HTTPFound(location='/', headers=headers) + + .. warning:: + + This logic does not "authenticate" the user! It assumes caller + has already authenticated the user and they are safe to login. + + See also :func:`logout_user()`. + """ + headers = remember(request, user.uuid) + return headers + + +def logout_user(request): + """ + Perform the logout action for the given request. This returns a + ``headers`` dict which you should pass to the final redirect, like + so:: + + from pyramid.httpexceptions import HTTPFound + + headers = logout_user(request) + return HTTPFound(location='/', headers=headers) + + See also :func:`login_user()`. + """ + request.session.delete() + request.session.invalidate() + headers = forget(request) + return headers + + +class WuttaSecurityPolicy: + """ + Pyramid :term:`security policy` for WuttaWeb. + + For more on the Pyramid details, see :doc:`pyramid:narr/security`. + + But the idea here is that you should be able to just use this, + without thinking too hard:: + + from pyramid.config import Configurator + from wuttaweb.auth import WuttaSecurityPolicy + + pyramid_config = Configurator() + pyramid_config.set_security_policy(WuttaSecurityPolicy()) + + This security policy will then do the following: + + * use the request "web session" for auth storage (e.g. current + ``user.uuid``) + * check permissions as needed, by calling + :meth:`~wuttjamaican:wuttjamaican.auth.AuthHandler.has_permission()` + for current user + + :param db_session: Optional :term:`db session` to use, instead of + :class:`wuttaweb.db.Session`. Probably only useful for tests. + """ + + def __init__(self, db_session=None): + self.session_helper = SessionAuthenticationHelper() + self.identity_cache = RequestLocalCache(self.load_identity) + self.db_session = db_session or Session() + + def load_identity(self, request): + config = request.registry.settings['wutta_config'] + app = config.get_app() + model = app.model + + # fetch user uuid from current session + uuid = self.session_helper.authenticated_userid(request) + if not uuid: + return + + # fetch user object from db + user = self.db_session.get(model.User, uuid) + if not user: + return + + return user + + def identity(self, request): + return self.identity_cache.get_or_create(request) + + def authenticated_userid(self, request): + user = self.identity(request) + if user is not None: + return user.uuid + + def remember(self, request, userid, **kw): + return self.session_helper.remember(request, userid, **kw) + + def forget(self, request, **kw): + return self.session_helper.forget(request, **kw) + + def permits(self, request, context, permission): + config = request.registry.settings['wutta_config'] + app = config.get_app() + auth = app.get_auth_handler() + + user = self.identity(request) + return auth.has_permission(self.db_session, user, permission) diff --git a/tests/test_auth.py b/tests/test_auth.py new file mode 100644 index 0000000..a6bea29 --- /dev/null +++ b/tests/test_auth.py @@ -0,0 +1,139 @@ +# -*- coding: utf-8; -*- + +from unittest import TestCase +from unittest.mock import MagicMock + +from pyramid import testing + +from wuttjamaican.conf import WuttaConfig +from wuttaweb import auth as mod + + +class TestLoginUser(TestCase): + + def test_basic(self): + config = WuttaConfig() + app = config.get_app() + model = app.model + request = testing.DummyRequest(wutta_config=config) + user = model.User(username='barney') + headers = mod.login_user(request, user) + self.assertEqual(headers, []) + +class TestLogoutUser(TestCase): + + def test_basic(self): + config = WuttaConfig() + request = testing.DummyRequest(wutta_config=config) + request.session.delete = MagicMock() + headers = mod.logout_user(request) + request.session.delete.assert_called_once_with() + self.assertEqual(headers, []) + + +class TestWuttaSecurityPolicy(TestCase): + + def setUp(self): + self.config = WuttaConfig(defaults={ + 'wutta.db.default.url': 'sqlite://', + }) + + self.request = testing.DummyRequest() + self.pyramid_config = testing.setUp(request=self.request, settings={ + 'wutta_config': self.config, + }) + + self.app = self.config.get_app() + model = self.app.model + model.Base.metadata.create_all(bind=self.config.appdb_engine) + self.session = self.app.make_session() + self.user = model.User(username='barney') + self.session.add(self.user) + self.session.commit() + + self.policy = self.make_policy() + + def tearDown(self): + testing.tearDown() + + def make_policy(self): + return mod.WuttaSecurityPolicy(db_session=self.session) + + def test_remember(self): + uuid = self.user.uuid + self.assertIsNotNone(uuid) + self.assertIsNone(self.policy.session_helper.authenticated_userid(self.request)) + self.policy.remember(self.request, uuid) + self.assertEqual(self.policy.session_helper.authenticated_userid(self.request), uuid) + + def test_forget(self): + uuid = self.user.uuid + self.policy.remember(self.request, uuid) + self.assertEqual(self.policy.session_helper.authenticated_userid(self.request), uuid) + self.policy.forget(self.request) + self.assertIsNone(self.policy.session_helper.authenticated_userid(self.request)) + + def test_identity(self): + + # no identity + user = self.policy.identity(self.request) + self.assertIsNone(user) + + # identity is remembered (must use new policy to bust cache) + self.policy = self.make_policy() + uuid = self.user.uuid + self.assertIsNotNone(uuid) + self.policy.remember(self.request, uuid) + user = self.policy.identity(self.request) + self.assertIs(user, self.user) + + # invalid identity yields no user + self.policy = self.make_policy() + self.policy.remember(self.request, 'bogus-user-uuid') + user = self.policy.identity(self.request) + self.assertIsNone(user) + + def test_authenticated_userid(self): + + # no identity + uuid = self.policy.authenticated_userid(self.request) + self.assertIsNone(uuid) + + # identity is remembered (must use new policy to bust cache) + self.policy = self.make_policy() + self.policy.remember(self.request, self.user.uuid) + uuid = self.policy.authenticated_userid(self.request) + self.assertEqual(uuid, self.user.uuid) + + def test_permits(self): + auth = self.app.get_auth_handler() + model = self.app.model + + # anon has no perms + self.assertFalse(self.policy.permits(self.request, None, 'foo.bar')) + + # but we can grant it + anons = auth.get_role_anonymous(self.session) + self.user.roles.append(anons) + auth.grant_permission(anons, 'foo.bar') + self.session.commit() + + # and then perm check is satisfied + self.assertTrue(self.policy.permits(self.request, None, 'foo.bar')) + + # now, create a separate role and grant another perm + # (but user does not yet belong to this role) + role = model.Role(name='whatever') + self.session.add(role) + auth.grant_permission(role, 'baz.edit') + self.session.commit() + + # so far then, user does not have the permission + self.policy = self.make_policy() + self.policy.remember(self.request, self.user.uuid) + self.assertFalse(self.policy.permits(self.request, None, 'baz.edit')) + + # but if we assign user to role, perm check should pass + self.user.roles.append(role) + self.session.commit() + self.assertTrue(self.policy.permits(self.request, None, 'baz.edit'))