diff --git a/src/wuttaweb/auth.py b/src/wuttaweb/auth.py
index 0c2f26d..de9b868 100644
--- a/src/wuttaweb/auth.py
+++ b/src/wuttaweb/auth.py
@@ -138,9 +138,13 @@ class WuttaSecurityPolicy:
return self.session_helper.forget(request, **kw)
def permits(self, request, context, permission):
+
+ # nb. root user can do anything
+ if getattr(request, 'is_root', False):
+ return True
+
config = request.registry.settings['wutta_config']
app = config.get_app()
auth = app.get_auth_handler()
-
user = self.identity(request)
return auth.has_permission(self.db_session, user, permission)
diff --git a/src/wuttaweb/subscribers.py b/src/wuttaweb/subscribers.py
index eebefb4..1b711e3 100644
--- a/src/wuttaweb/subscribers.py
+++ b/src/wuttaweb/subscribers.py
@@ -63,6 +63,16 @@ def new_request(event):
Reference to the app :term:`config object`.
+ .. method:: request.get_referrer(default=None)
+
+ Request method to get the "canonical" HTTP referrer value.
+ This has logic to check for referrer in the request params,
+ user session etc.
+
+ :param default: Optional default URL if none is found in
+ request params/session. If no default is specified,
+ the ``'home'`` route is used.
+
.. attribute:: request.use_oruga
Flag indicating whether the frontend should be displayed using
@@ -75,6 +85,19 @@ def new_request(event):
request.wutta_config = config
+ def get_referrer(default=None):
+ if request.params.get('referrer'):
+ return request.params['referrer']
+ if request.session.get('referrer'):
+ return request.session.pop('referrer')
+ referrer = getattr(request, 'referrer', None)
+ if (not referrer or referrer == request.current_route_url()
+ or not referrer.startswith(request.host_url)):
+ referrer = default or request.route_url('home')
+ return referrer
+
+ request.get_referrer = get_referrer
+
def use_oruga(request):
spec = config.get('wuttaweb.oruga_detector.spec')
if spec:
@@ -104,22 +127,44 @@ def new_request_set_user(event, db_session=None):
:class:`~wuttjamaican:wuttjamaican.db.model.auth.User` instance
(if logged in), or ``None``.
- :param db_session: Optional :term:`db session` to use, instead of
- :class:`wuttaweb.db.Session`. Probably only useful for tests.
+ .. attribute:: request.is_admin
+
+ Flag indicating whether current user is a member of the
+ Administrator role.
+
+ .. attribute:: request.is_root
+
+ Flag indicating whether user is currently elevated to root
+ privileges. This is only possible if :attr:`request.is_admin`
+ is also true.
"""
request = event.request
config = request.registry.settings['wutta_config']
app = config.get_app()
- model = app.model
def user(request):
uuid = request.authenticated_userid
if uuid:
session = db_session or Session()
+ model = app.model
return session.get(model.User, uuid)
request.set_property(user, reify=True)
+ def is_admin(request):
+ auth = app.get_auth_handler()
+ return auth.user_is_admin(request.user)
+
+ request.set_property(is_admin, reify=True)
+
+ def is_root(request):
+ if request.is_admin:
+ if request.session.get('is_root', False):
+ return True
+ return False
+
+ request.set_property(is_root, reify=True)
+
def before_render(event):
"""
diff --git a/src/wuttaweb/templates/base.mako b/src/wuttaweb/templates/base.mako
index dd51690..b04c980 100644
--- a/src/wuttaweb/templates/base.mako
+++ b/src/wuttaweb/templates/base.mako
@@ -314,8 +314,29 @@
<%def name="render_user_menu()">
% if request.user:
-
${request.user}
+
${request.user}
+ % if request.is_root:
+ ${h.form(url('stop_root'), ref='stopBeingRootForm')}
+ ## TODO
+ ## ${h.csrf_token(request)}
+
+
+ Stop being root
+
+ ${h.end_form()}
+ % elif request.is_admin:
+ ${h.form(url('become_root'), ref='startBeingRootForm')}
+ ## TODO
+ ## ${h.csrf_token(request)}
+
+
+ Become root
+
+ ${h.end_form()}
+ % endif
${h.link_to("Change Password", url('change_password'), class_='navbar-item')}
${h.link_to("Logout", url('logout'), class_='navbar-item')}
@@ -359,6 +380,18 @@
const key = 'menu_' + hash + '_shown'
this[key] = !this[key]
},
+
+ % if request.is_admin:
+
+ startBeingRoot() {
+ this.$refs.startBeingRootForm.submit()
+ },
+
+ stopBeingRoot() {
+ this.$refs.stopBeingRootForm.submit()
+ },
+
+ % endif
},
}
diff --git a/src/wuttaweb/views/auth.py b/src/wuttaweb/views/auth.py
index 9fc838c..389271b 100644
--- a/src/wuttaweb/views/auth.py
+++ b/src/wuttaweb/views/auth.py
@@ -198,6 +198,48 @@ class AuthView(View):
if auth.check_user_password(user, value):
node.raise_invalid("New password must be different from old password.")
+ def become_root(self):
+ """
+ Elevate the current request to 'root' for full system access.
+
+ This is only allowed if current (authenticated) user is a
+ member of the Administrator role. Also note that GET is not
+ allowed for this view, only POST.
+
+ See also :meth:`stop_root()`.
+ """
+ if self.request.method != 'POST':
+ raise self.forbidden()
+
+ if not self.request.is_admin:
+ raise self.forbidden()
+
+ self.request.session['is_root'] = True
+ self.request.session.flash("You have been elevated to 'root' and now have full system access")
+
+ url = self.request.get_referrer()
+ return self.redirect(url)
+
+ def stop_root(self):
+ """
+ Lower the current request from 'root' back to normal access.
+
+ Also note that GET is not allowed for this view, only POST.
+
+ See also :meth:`become_root()`.
+ """
+ if self.request.method != 'POST':
+ raise self.forbidden()
+
+ if not self.request.is_admin:
+ raise self.forbidden()
+
+ self.request.session['is_root'] = False
+ self.request.session.flash("Your normal system access has been restored")
+
+ url = self.request.get_referrer()
+ return self.redirect(url)
+
@classmethod
def defaults(cls, config):
cls._auth_defaults(config)
@@ -222,6 +264,18 @@ class AuthView(View):
route_name='change_password',
renderer='/auth/change_password.mako')
+ # become root
+ config.add_route('become_root', '/root/yes',
+ request_method='POST')
+ config.add_view(cls, attr='become_root',
+ route_name='become_root')
+
+ # stop root
+ config.add_route('stop_root', '/root/no',
+ request_method='POST')
+ config.add_view(cls, attr='stop_root',
+ route_name='stop_root')
+
def defaults(config, **kwargs):
base = globals()
diff --git a/src/wuttaweb/views/base.py b/src/wuttaweb/views/base.py
index e7bfea3..e412ed2 100644
--- a/src/wuttaweb/views/base.py
+++ b/src/wuttaweb/views/base.py
@@ -55,6 +55,14 @@ class View:
self.config = self.request.wutta_config
self.app = self.config.get_app()
+ def forbidden(self):
+ """
+ Convenience method, to raise a HTTP 403 Forbidden exception::
+
+ raise self.forbidden()
+ """
+ return httpexceptions.HTTPForbidden()
+
def make_form(self, **kwargs):
"""
Make and return a new :class:`~wuttaweb.forms.base.Form`
diff --git a/tests/test_auth.py b/tests/test_auth.py
index a6bea29..5d6c406 100644
--- a/tests/test_auth.py
+++ b/tests/test_auth.py
@@ -137,3 +137,9 @@ class TestWuttaSecurityPolicy(TestCase):
self.user.roles.append(role)
self.session.commit()
self.assertTrue(self.policy.permits(self.request, None, 'baz.edit'))
+
+ # now let's try another perm - we won't grant it, but will
+ # confirm user is denied access unless they become root
+ self.assertFalse(self.policy.permits(self.request, None, 'some-root-perm'))
+ self.request.is_root = True
+ self.assertTrue(self.policy.permits(self.request, None, 'some-root-perm'))
diff --git a/tests/test_subscribers.py b/tests/test_subscribers.py
index 63b6640..27c85c3 100644
--- a/tests/test_subscribers.py
+++ b/tests/test_subscribers.py
@@ -18,40 +18,78 @@ class TestNewRequest(TestCase):
def setUp(self):
self.config = WuttaConfig()
+ self.request = self.make_request()
+ self.pyramid_config = testing.setUp(request=self.request, settings={
+ 'wutta_config': self.config,
+ })
+
+ def tearDown(self):
+ testing.tearDown()
def make_request(self):
request = testing.DummyRequest()
- request.registry.settings = {'wutta_config': self.config}
+ # request.registry.settings = {'wutta_config': self.config}
return request
def test_wutta_config(self):
- request = self.make_request()
- event = MagicMock(request=request)
+ event = MagicMock(request=self.request)
# request gets a new attr
- self.assertFalse(hasattr(request, 'wutta_config'))
+ self.assertFalse(hasattr(self.request, 'wutta_config'))
subscribers.new_request(event)
- self.assertTrue(hasattr(request, 'wutta_config'))
- self.assertIs(request.wutta_config, self.config)
+ self.assertTrue(hasattr(self.request, 'wutta_config'))
+ self.assertIs(self.request.wutta_config, self.config)
def test_use_oruga_default(self):
- request = self.make_request()
- event = MagicMock(request=request)
+ event = MagicMock(request=self.request)
# request gets a new attr, false by default
- self.assertFalse(hasattr(request, 'use_oruga'))
+ self.assertFalse(hasattr(self.request, 'use_oruga'))
subscribers.new_request(event)
- self.assertFalse(request.use_oruga)
+ self.assertFalse(self.request.use_oruga)
def test_use_oruga_custom(self):
self.config.setdefault('wuttaweb.oruga_detector.spec', 'tests.test_subscribers:custom_oruga_detector')
- request = self.make_request()
- event = MagicMock(request=request)
+ event = MagicMock(request=self.request)
# request gets a new attr, which should be true
- self.assertFalse(hasattr(request, 'use_oruga'))
+ self.assertFalse(hasattr(self.request, 'use_oruga'))
subscribers.new_request(event)
- self.assertTrue(request.use_oruga)
+ self.assertTrue(self.request.use_oruga)
+
+ def test_get_referrer(self):
+ event = MagicMock(request=self.request)
+
+ def home(request):
+ pass
+
+ self.pyramid_config.add_route('home', '/')
+ self.pyramid_config.add_view(home, route_name='home')
+
+ self.assertFalse(hasattr(self.request, 'get_referrer'))
+ subscribers.new_request(event)
+ self.assertTrue(hasattr(self.request, 'get_referrer'))
+
+ # default if no referrer, is home route
+ url = self.request.get_referrer()
+ self.assertEqual(url, self.request.route_url('home'))
+
+ # can specify another default
+ url = self.request.get_referrer(default='https://wuttaproject.org')
+ self.assertEqual(url, 'https://wuttaproject.org')
+
+ # or referrer can come from user session
+ self.request.session['referrer'] = 'https://rattailproject.org'
+ self.assertIn('referrer', self.request.session)
+ url = self.request.get_referrer()
+ self.assertEqual(url, 'https://rattailproject.org')
+ # nb. referrer should also have been removed from user session
+ self.assertNotIn('referrer', self.request.session)
+
+ # or referrer can come from request params
+ self.request.params['referrer'] = 'https://kernel.org'
+ url = self.request.get_referrer()
+ self.assertEqual(url, 'https://kernel.org')
def custom_oruga_detector(request):
@@ -97,6 +135,81 @@ class TestNewRequestSetUser(TestCase):
subscribers.new_request_set_user(event, db_session=self.session)
self.assertIs(self.request.user, self.user)
+ def test_is_admin(self):
+ event = MagicMock(request=self.request)
+
+ # anonymous user
+ self.assertFalse(hasattr(self.request, 'user'))
+ self.assertFalse(hasattr(self.request, 'is_admin'))
+ subscribers.new_request_set_user(event, db_session=self.session)
+ self.assertIsNone(self.request.user)
+ self.assertFalse(self.request.is_admin)
+
+ # reset
+ del self.request.is_admin
+
+ # authenticated user, but still not an admin
+ self.request.user = self.user
+ subscribers.new_request_set_user(event, db_session=self.session)
+ self.assertIs(self.request.user, self.user)
+ self.assertFalse(self.request.is_admin)
+
+ # reset
+ del self.request.is_admin
+
+ # but if we make them an admin, it changes
+ auth = self.app.get_auth_handler()
+ admin = auth.get_role_administrator(self.session)
+ self.user.roles.append(admin)
+ self.session.commit()
+ subscribers.new_request_set_user(event, db_session=self.session)
+ self.assertIs(self.request.user, self.user)
+ self.assertTrue(self.request.is_admin)
+
+ def test_is_root(self):
+ event = MagicMock(request=self.request)
+
+ # anonymous user
+ self.assertFalse(hasattr(self.request, 'user'))
+ self.assertFalse(hasattr(self.request, 'is_root'))
+ subscribers.new_request_set_user(event, db_session=self.session)
+ self.assertIsNone(self.request.user)
+ self.assertFalse(self.request.is_root)
+
+ # reset
+ del self.request.is_admin
+ del self.request.is_root
+
+ # authenticated user, but still not an admin
+ self.request.user = self.user
+ subscribers.new_request_set_user(event, db_session=self.session)
+ self.assertIs(self.request.user, self.user)
+ self.assertFalse(self.request.is_root)
+
+ # reset
+ del self.request.is_admin
+ del self.request.is_root
+
+ # even if we make them an admin, still not yet root
+ auth = self.app.get_auth_handler()
+ admin = auth.get_role_administrator(self.session)
+ self.user.roles.append(admin)
+ self.session.commit()
+ subscribers.new_request_set_user(event, db_session=self.session)
+ self.assertIs(self.request.user, self.user)
+ self.assertTrue(self.request.is_admin)
+ self.assertFalse(self.request.is_root)
+
+ # reset
+ del self.request.is_admin
+ del self.request.is_root
+
+ # root status flag lives in user session
+ self.request.session['is_root'] = True
+ subscribers.new_request_set_user(event, db_session=self.session)
+ self.assertTrue(self.request.is_admin)
+ self.assertTrue(self.request.is_root)
+
class TestBeforeRender(TestCase):
diff --git a/tests/views/test_auth.py b/tests/views/test_auth.py
index b22989f..d10e759 100644
--- a/tests/views/test_auth.py
+++ b/tests/views/test_auth.py
@@ -4,11 +4,12 @@ from unittest import TestCase
from unittest.mock import MagicMock
from pyramid import testing
-from pyramid.httpexceptions import HTTPFound
+from pyramid.httpexceptions import HTTPFound, HTTPForbidden
from wuttjamaican.conf import WuttaConfig
from wuttaweb.views import auth as mod
from wuttaweb.auth import WuttaSecurityPolicy
+from wuttaweb.subscribers import new_request
class TestAuthView(TestCase):
@@ -19,7 +20,9 @@ class TestAuthView(TestCase):
})
self.request = testing.DummyRequest(wutta_config=self.config, user=None)
- self.pyramid_config = testing.setUp(request=self.request)
+ self.pyramid_config = testing.setUp(request=self.request, settings={
+ 'wutta_config': self.config,
+ })
self.app = self.config.get_app()
auth = self.app.get_auth_handler()
@@ -142,3 +145,46 @@ class TestAuthView(TestCase):
self.assertIn('form', context)
dform = context['form'].get_deform()
self.assertEqual(dform['new_password'].errormsg, "New password must be different from old password.")
+
+ def test_become_root(self):
+ event = MagicMock(request=self.request)
+ new_request(event) # add request.get_referrer()
+ view = mod.AuthView(self.request)
+
+ # GET not allowed
+ self.request.method = 'GET'
+ self.assertRaises(HTTPForbidden, view.become_root)
+
+ # non-admin users also not allowed
+ self.request.method = 'POST'
+ self.request.is_admin = False
+ self.assertRaises(HTTPForbidden, view.become_root)
+
+ # but admin users can become root
+ self.request.is_admin = True
+ self.assertNotIn('is_root', self.request.session)
+ redirect = view.become_root()
+ self.assertIsInstance(redirect, HTTPFound)
+ self.assertTrue(self.request.session['is_root'])
+
+ def test_stop_root(self):
+ event = MagicMock(request=self.request)
+ new_request(event) # add request.get_referrer()
+ view = mod.AuthView(self.request)
+
+ # GET not allowed
+ self.request.method = 'GET'
+ self.assertRaises(HTTPForbidden, view.stop_root)
+
+ # non-admin users also not allowed
+ self.request.method = 'POST'
+ self.request.is_admin = False
+ self.assertRaises(HTTPForbidden, view.stop_root)
+
+ # but admin users can stop being root
+ # (nb. there is no check whether user is currently root)
+ self.request.is_admin = True
+ self.assertNotIn('is_root', self.request.session)
+ redirect = view.stop_root()
+ self.assertIsInstance(redirect, HTTPFound)
+ self.assertFalse(self.request.session['is_root'])
diff --git a/tests/views/test_base.py b/tests/views/test_base.py
index 52c717a..103e005 100644
--- a/tests/views/test_base.py
+++ b/tests/views/test_base.py
@@ -3,7 +3,7 @@
from unittest import TestCase
from pyramid import testing
-from pyramid.httpexceptions import HTTPFound
+from pyramid.httpexceptions import HTTPFound, HTTPForbidden
from wuttjamaican.conf import WuttaConfig
from wuttaweb.views import base
@@ -23,6 +23,10 @@ class TestView(TestCase):
self.assertIs(self.view.config, self.config)
self.assertIs(self.view.app, self.app)
+ def test_forbidden(self):
+ error = self.view.forbidden()
+ self.assertIsInstance(error, HTTPForbidden)
+
def test_make_form(self):
form = self.view.make_form()
self.assertIsInstance(form, Form)