2
0
Fork 0

feat: add custom security policy, login/logout for pyramid

aka. the `wuttaweb.auth` module
This commit is contained in:
Lance Edgar 2024-08-04 21:54:46 -05:00
parent c6f0007908
commit e296b50aa4
5 changed files with 296 additions and 0 deletions

View file

@ -0,0 +1,6 @@
``wuttaweb.auth``
=================
.. automodule:: wuttaweb.auth
:members:

View file

@ -8,6 +8,7 @@
:maxdepth: 1 :maxdepth: 1
app app
auth
db db
forms forms
forms.base forms.base

View file

@ -32,6 +32,7 @@ from wuttjamaican.conf import make_config
from pyramid.config import Configurator from pyramid.config import Configurator
import wuttaweb.db import wuttaweb.db
from wuttaweb.auth import WuttaSecurityPolicy
class WebAppProvider(AppProvider): class WebAppProvider(AppProvider):
@ -115,6 +116,9 @@ def make_pyramid_config(settings):
pyramid_config = Configurator(settings=settings) pyramid_config = Configurator(settings=settings)
# configure user authorization / authentication
pyramid_config.set_security_policy(WuttaSecurityPolicy())
pyramid_config.include('pyramid_beaker') pyramid_config.include('pyramid_beaker')
pyramid_config.include('pyramid_deform') pyramid_config.include('pyramid_deform')
pyramid_config.include('pyramid_mako') pyramid_config.include('pyramid_mako')

146
src/wuttaweb/auth.py Normal file
View file

@ -0,0 +1,146 @@
# -*- coding: utf-8; -*-
################################################################################
#
# wuttaweb -- Web App for Wutta Framework
# Copyright © 2024 Lance Edgar
#
# This file is part of Wutta Framework.
#
# Wutta Framework is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# Wutta Framework is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License along with
# Wutta Framework. If not, see <http://www.gnu.org/licenses/>.
#
################################################################################
"""
Auth Utility Logic
"""
import re
from pyramid.authentication import SessionAuthenticationHelper
from pyramid.request import RequestLocalCache
from pyramid.security import remember, forget
from wuttaweb.db import Session
def login_user(request, user):
"""
Perform the steps necessary to "login" the given user. This
returns a ``headers`` dict which you should pass to the final
redirect, like so::
from pyramid.httpexceptions import HTTPFound
headers = login_user(request, user)
return HTTPFound(location='/', headers=headers)
.. warning::
This logic does not "authenticate" the user! It assumes caller
has already authenticated the user and they are safe to login.
See also :func:`logout_user()`.
"""
headers = remember(request, user.uuid)
return headers
def logout_user(request):
"""
Perform the logout action for the given request. This returns a
``headers`` dict which you should pass to the final redirect, like
so::
from pyramid.httpexceptions import HTTPFound
headers = logout_user(request)
return HTTPFound(location='/', headers=headers)
See also :func:`login_user()`.
"""
request.session.delete()
request.session.invalidate()
headers = forget(request)
return headers
class WuttaSecurityPolicy:
"""
Pyramid :term:`security policy` for WuttaWeb.
For more on the Pyramid details, see :doc:`pyramid:narr/security`.
But the idea here is that you should be able to just use this,
without thinking too hard::
from pyramid.config import Configurator
from wuttaweb.auth import WuttaSecurityPolicy
pyramid_config = Configurator()
pyramid_config.set_security_policy(WuttaSecurityPolicy())
This security policy will then do the following:
* use the request "web session" for auth storage (e.g. current
``user.uuid``)
* check permissions as needed, by calling
:meth:`~wuttjamaican:wuttjamaican.auth.AuthHandler.has_permission()`
for current user
:param db_session: Optional :term:`db session` to use, instead of
:class:`wuttaweb.db.Session`. Probably only useful for tests.
"""
def __init__(self, db_session=None):
self.session_helper = SessionAuthenticationHelper()
self.identity_cache = RequestLocalCache(self.load_identity)
self.db_session = db_session or Session()
def load_identity(self, request):
config = request.registry.settings['wutta_config']
app = config.get_app()
model = app.model
# fetch user uuid from current session
uuid = self.session_helper.authenticated_userid(request)
if not uuid:
return
# fetch user object from db
user = self.db_session.get(model.User, uuid)
if not user:
return
return user
def identity(self, request):
return self.identity_cache.get_or_create(request)
def authenticated_userid(self, request):
user = self.identity(request)
if user is not None:
return user.uuid
def remember(self, request, userid, **kw):
return self.session_helper.remember(request, userid, **kw)
def forget(self, request, **kw):
return self.session_helper.forget(request, **kw)
def permits(self, request, context, permission):
config = request.registry.settings['wutta_config']
app = config.get_app()
auth = app.get_auth_handler()
user = self.identity(request)
return auth.has_permission(self.db_session, user, permission)

139
tests/test_auth.py Normal file
View file

@ -0,0 +1,139 @@
# -*- coding: utf-8; -*-
from unittest import TestCase
from unittest.mock import MagicMock
from pyramid import testing
from wuttjamaican.conf import WuttaConfig
from wuttaweb import auth as mod
class TestLoginUser(TestCase):
def test_basic(self):
config = WuttaConfig()
app = config.get_app()
model = app.model
request = testing.DummyRequest(wutta_config=config)
user = model.User(username='barney')
headers = mod.login_user(request, user)
self.assertEqual(headers, [])
class TestLogoutUser(TestCase):
def test_basic(self):
config = WuttaConfig()
request = testing.DummyRequest(wutta_config=config)
request.session.delete = MagicMock()
headers = mod.logout_user(request)
request.session.delete.assert_called_once_with()
self.assertEqual(headers, [])
class TestWuttaSecurityPolicy(TestCase):
def setUp(self):
self.config = WuttaConfig(defaults={
'wutta.db.default.url': 'sqlite://',
})
self.request = testing.DummyRequest()
self.pyramid_config = testing.setUp(request=self.request, settings={
'wutta_config': self.config,
})
self.app = self.config.get_app()
model = self.app.model
model.Base.metadata.create_all(bind=self.config.appdb_engine)
self.session = self.app.make_session()
self.user = model.User(username='barney')
self.session.add(self.user)
self.session.commit()
self.policy = self.make_policy()
def tearDown(self):
testing.tearDown()
def make_policy(self):
return mod.WuttaSecurityPolicy(db_session=self.session)
def test_remember(self):
uuid = self.user.uuid
self.assertIsNotNone(uuid)
self.assertIsNone(self.policy.session_helper.authenticated_userid(self.request))
self.policy.remember(self.request, uuid)
self.assertEqual(self.policy.session_helper.authenticated_userid(self.request), uuid)
def test_forget(self):
uuid = self.user.uuid
self.policy.remember(self.request, uuid)
self.assertEqual(self.policy.session_helper.authenticated_userid(self.request), uuid)
self.policy.forget(self.request)
self.assertIsNone(self.policy.session_helper.authenticated_userid(self.request))
def test_identity(self):
# no identity
user = self.policy.identity(self.request)
self.assertIsNone(user)
# identity is remembered (must use new policy to bust cache)
self.policy = self.make_policy()
uuid = self.user.uuid
self.assertIsNotNone(uuid)
self.policy.remember(self.request, uuid)
user = self.policy.identity(self.request)
self.assertIs(user, self.user)
# invalid identity yields no user
self.policy = self.make_policy()
self.policy.remember(self.request, 'bogus-user-uuid')
user = self.policy.identity(self.request)
self.assertIsNone(user)
def test_authenticated_userid(self):
# no identity
uuid = self.policy.authenticated_userid(self.request)
self.assertIsNone(uuid)
# identity is remembered (must use new policy to bust cache)
self.policy = self.make_policy()
self.policy.remember(self.request, self.user.uuid)
uuid = self.policy.authenticated_userid(self.request)
self.assertEqual(uuid, self.user.uuid)
def test_permits(self):
auth = self.app.get_auth_handler()
model = self.app.model
# anon has no perms
self.assertFalse(self.policy.permits(self.request, None, 'foo.bar'))
# but we can grant it
anons = auth.get_role_anonymous(self.session)
self.user.roles.append(anons)
auth.grant_permission(anons, 'foo.bar')
self.session.commit()
# and then perm check is satisfied
self.assertTrue(self.policy.permits(self.request, None, 'foo.bar'))
# now, create a separate role and grant another perm
# (but user does not yet belong to this role)
role = model.Role(name='whatever')
self.session.add(role)
auth.grant_permission(role, 'baz.edit')
self.session.commit()
# so far then, user does not have the permission
self.policy = self.make_policy()
self.policy.remember(self.request, self.user.uuid)
self.assertFalse(self.policy.permits(self.request, None, 'baz.edit'))
# but if we assign user to role, perm check should pass
self.user.roles.append(role)
self.session.commit()
self.assertTrue(self.policy.permits(self.request, None, 'baz.edit'))