feat: add support for admin user to become / stop being root
This commit is contained in:
parent
a2ba88ca8f
commit
fc339ba81b
|
@ -138,9 +138,13 @@ class WuttaSecurityPolicy:
|
|||
return self.session_helper.forget(request, **kw)
|
||||
|
||||
def permits(self, request, context, permission):
|
||||
|
||||
# nb. root user can do anything
|
||||
if getattr(request, 'is_root', False):
|
||||
return True
|
||||
|
||||
config = request.registry.settings['wutta_config']
|
||||
app = config.get_app()
|
||||
auth = app.get_auth_handler()
|
||||
|
||||
user = self.identity(request)
|
||||
return auth.has_permission(self.db_session, user, permission)
|
||||
|
|
|
@ -63,6 +63,16 @@ def new_request(event):
|
|||
|
||||
Reference to the app :term:`config object`.
|
||||
|
||||
.. method:: request.get_referrer(default=None)
|
||||
|
||||
Request method to get the "canonical" HTTP referrer value.
|
||||
This has logic to check for referrer in the request params,
|
||||
user session etc.
|
||||
|
||||
:param default: Optional default URL if none is found in
|
||||
request params/session. If no default is specified,
|
||||
the ``'home'`` route is used.
|
||||
|
||||
.. attribute:: request.use_oruga
|
||||
|
||||
Flag indicating whether the frontend should be displayed using
|
||||
|
@ -75,6 +85,19 @@ def new_request(event):
|
|||
|
||||
request.wutta_config = config
|
||||
|
||||
def get_referrer(default=None):
|
||||
if request.params.get('referrer'):
|
||||
return request.params['referrer']
|
||||
if request.session.get('referrer'):
|
||||
return request.session.pop('referrer')
|
||||
referrer = getattr(request, 'referrer', None)
|
||||
if (not referrer or referrer == request.current_route_url()
|
||||
or not referrer.startswith(request.host_url)):
|
||||
referrer = default or request.route_url('home')
|
||||
return referrer
|
||||
|
||||
request.get_referrer = get_referrer
|
||||
|
||||
def use_oruga(request):
|
||||
spec = config.get('wuttaweb.oruga_detector.spec')
|
||||
if spec:
|
||||
|
@ -104,22 +127,44 @@ def new_request_set_user(event, db_session=None):
|
|||
:class:`~wuttjamaican:wuttjamaican.db.model.auth.User` instance
|
||||
(if logged in), or ``None``.
|
||||
|
||||
:param db_session: Optional :term:`db session` to use, instead of
|
||||
:class:`wuttaweb.db.Session`. Probably only useful for tests.
|
||||
.. attribute:: request.is_admin
|
||||
|
||||
Flag indicating whether current user is a member of the
|
||||
Administrator role.
|
||||
|
||||
.. attribute:: request.is_root
|
||||
|
||||
Flag indicating whether user is currently elevated to root
|
||||
privileges. This is only possible if :attr:`request.is_admin`
|
||||
is also true.
|
||||
"""
|
||||
request = event.request
|
||||
config = request.registry.settings['wutta_config']
|
||||
app = config.get_app()
|
||||
model = app.model
|
||||
|
||||
def user(request):
|
||||
uuid = request.authenticated_userid
|
||||
if uuid:
|
||||
session = db_session or Session()
|
||||
model = app.model
|
||||
return session.get(model.User, uuid)
|
||||
|
||||
request.set_property(user, reify=True)
|
||||
|
||||
def is_admin(request):
|
||||
auth = app.get_auth_handler()
|
||||
return auth.user_is_admin(request.user)
|
||||
|
||||
request.set_property(is_admin, reify=True)
|
||||
|
||||
def is_root(request):
|
||||
if request.is_admin:
|
||||
if request.session.get('is_root', False):
|
||||
return True
|
||||
return False
|
||||
|
||||
request.set_property(is_root, reify=True)
|
||||
|
||||
|
||||
def before_render(event):
|
||||
"""
|
||||
|
|
|
@ -314,8 +314,29 @@
|
|||
<%def name="render_user_menu()">
|
||||
% if request.user:
|
||||
<div class="navbar-item has-dropdown is-hoverable">
|
||||
<a class="navbar-link">${request.user}</a>
|
||||
<a class="navbar-link ${'has-background-danger has-text-white' if request.is_root else ''}">${request.user}</a>
|
||||
<div class="navbar-dropdown">
|
||||
% if request.is_root:
|
||||
${h.form(url('stop_root'), ref='stopBeingRootForm')}
|
||||
## TODO
|
||||
## ${h.csrf_token(request)}
|
||||
<input type="hidden" name="referrer" value="${request.current_route_url()}" />
|
||||
<a @click="stopBeingRoot()"
|
||||
class="navbar-item has-background-danger has-text-white">
|
||||
Stop being root
|
||||
</a>
|
||||
${h.end_form()}
|
||||
% elif request.is_admin:
|
||||
${h.form(url('become_root'), ref='startBeingRootForm')}
|
||||
## TODO
|
||||
## ${h.csrf_token(request)}
|
||||
<input type="hidden" name="referrer" value="${request.current_route_url()}" />
|
||||
<a @click="startBeingRoot()"
|
||||
class="navbar-item has-background-danger has-text-white">
|
||||
Become root
|
||||
</a>
|
||||
${h.end_form()}
|
||||
% endif
|
||||
${h.link_to("Change Password", url('change_password'), class_='navbar-item')}
|
||||
${h.link_to("Logout", url('logout'), class_='navbar-item')}
|
||||
</div>
|
||||
|
@ -359,6 +380,18 @@
|
|||
const key = 'menu_' + hash + '_shown'
|
||||
this[key] = !this[key]
|
||||
},
|
||||
|
||||
% if request.is_admin:
|
||||
|
||||
startBeingRoot() {
|
||||
this.$refs.startBeingRootForm.submit()
|
||||
},
|
||||
|
||||
stopBeingRoot() {
|
||||
this.$refs.stopBeingRootForm.submit()
|
||||
},
|
||||
|
||||
% endif
|
||||
},
|
||||
}
|
||||
|
||||
|
|
|
@ -198,6 +198,48 @@ class AuthView(View):
|
|||
if auth.check_user_password(user, value):
|
||||
node.raise_invalid("New password must be different from old password.")
|
||||
|
||||
def become_root(self):
|
||||
"""
|
||||
Elevate the current request to 'root' for full system access.
|
||||
|
||||
This is only allowed if current (authenticated) user is a
|
||||
member of the Administrator role. Also note that GET is not
|
||||
allowed for this view, only POST.
|
||||
|
||||
See also :meth:`stop_root()`.
|
||||
"""
|
||||
if self.request.method != 'POST':
|
||||
raise self.forbidden()
|
||||
|
||||
if not self.request.is_admin:
|
||||
raise self.forbidden()
|
||||
|
||||
self.request.session['is_root'] = True
|
||||
self.request.session.flash("You have been elevated to 'root' and now have full system access")
|
||||
|
||||
url = self.request.get_referrer()
|
||||
return self.redirect(url)
|
||||
|
||||
def stop_root(self):
|
||||
"""
|
||||
Lower the current request from 'root' back to normal access.
|
||||
|
||||
Also note that GET is not allowed for this view, only POST.
|
||||
|
||||
See also :meth:`become_root()`.
|
||||
"""
|
||||
if self.request.method != 'POST':
|
||||
raise self.forbidden()
|
||||
|
||||
if not self.request.is_admin:
|
||||
raise self.forbidden()
|
||||
|
||||
self.request.session['is_root'] = False
|
||||
self.request.session.flash("Your normal system access has been restored")
|
||||
|
||||
url = self.request.get_referrer()
|
||||
return self.redirect(url)
|
||||
|
||||
@classmethod
|
||||
def defaults(cls, config):
|
||||
cls._auth_defaults(config)
|
||||
|
@ -222,6 +264,18 @@ class AuthView(View):
|
|||
route_name='change_password',
|
||||
renderer='/auth/change_password.mako')
|
||||
|
||||
# become root
|
||||
config.add_route('become_root', '/root/yes',
|
||||
request_method='POST')
|
||||
config.add_view(cls, attr='become_root',
|
||||
route_name='become_root')
|
||||
|
||||
# stop root
|
||||
config.add_route('stop_root', '/root/no',
|
||||
request_method='POST')
|
||||
config.add_view(cls, attr='stop_root',
|
||||
route_name='stop_root')
|
||||
|
||||
|
||||
def defaults(config, **kwargs):
|
||||
base = globals()
|
||||
|
|
|
@ -55,6 +55,14 @@ class View:
|
|||
self.config = self.request.wutta_config
|
||||
self.app = self.config.get_app()
|
||||
|
||||
def forbidden(self):
|
||||
"""
|
||||
Convenience method, to raise a HTTP 403 Forbidden exception::
|
||||
|
||||
raise self.forbidden()
|
||||
"""
|
||||
return httpexceptions.HTTPForbidden()
|
||||
|
||||
def make_form(self, **kwargs):
|
||||
"""
|
||||
Make and return a new :class:`~wuttaweb.forms.base.Form`
|
||||
|
|
|
@ -137,3 +137,9 @@ class TestWuttaSecurityPolicy(TestCase):
|
|||
self.user.roles.append(role)
|
||||
self.session.commit()
|
||||
self.assertTrue(self.policy.permits(self.request, None, 'baz.edit'))
|
||||
|
||||
# now let's try another perm - we won't grant it, but will
|
||||
# confirm user is denied access unless they become root
|
||||
self.assertFalse(self.policy.permits(self.request, None, 'some-root-perm'))
|
||||
self.request.is_root = True
|
||||
self.assertTrue(self.policy.permits(self.request, None, 'some-root-perm'))
|
||||
|
|
|
@ -18,40 +18,78 @@ class TestNewRequest(TestCase):
|
|||
|
||||
def setUp(self):
|
||||
self.config = WuttaConfig()
|
||||
self.request = self.make_request()
|
||||
self.pyramid_config = testing.setUp(request=self.request, settings={
|
||||
'wutta_config': self.config,
|
||||
})
|
||||
|
||||
def tearDown(self):
|
||||
testing.tearDown()
|
||||
|
||||
def make_request(self):
|
||||
request = testing.DummyRequest()
|
||||
request.registry.settings = {'wutta_config': self.config}
|
||||
# request.registry.settings = {'wutta_config': self.config}
|
||||
return request
|
||||
|
||||
def test_wutta_config(self):
|
||||
request = self.make_request()
|
||||
event = MagicMock(request=request)
|
||||
event = MagicMock(request=self.request)
|
||||
|
||||
# request gets a new attr
|
||||
self.assertFalse(hasattr(request, 'wutta_config'))
|
||||
self.assertFalse(hasattr(self.request, 'wutta_config'))
|
||||
subscribers.new_request(event)
|
||||
self.assertTrue(hasattr(request, 'wutta_config'))
|
||||
self.assertIs(request.wutta_config, self.config)
|
||||
self.assertTrue(hasattr(self.request, 'wutta_config'))
|
||||
self.assertIs(self.request.wutta_config, self.config)
|
||||
|
||||
def test_use_oruga_default(self):
|
||||
request = self.make_request()
|
||||
event = MagicMock(request=request)
|
||||
event = MagicMock(request=self.request)
|
||||
|
||||
# request gets a new attr, false by default
|
||||
self.assertFalse(hasattr(request, 'use_oruga'))
|
||||
self.assertFalse(hasattr(self.request, 'use_oruga'))
|
||||
subscribers.new_request(event)
|
||||
self.assertFalse(request.use_oruga)
|
||||
self.assertFalse(self.request.use_oruga)
|
||||
|
||||
def test_use_oruga_custom(self):
|
||||
self.config.setdefault('wuttaweb.oruga_detector.spec', 'tests.test_subscribers:custom_oruga_detector')
|
||||
request = self.make_request()
|
||||
event = MagicMock(request=request)
|
||||
event = MagicMock(request=self.request)
|
||||
|
||||
# request gets a new attr, which should be true
|
||||
self.assertFalse(hasattr(request, 'use_oruga'))
|
||||
self.assertFalse(hasattr(self.request, 'use_oruga'))
|
||||
subscribers.new_request(event)
|
||||
self.assertTrue(request.use_oruga)
|
||||
self.assertTrue(self.request.use_oruga)
|
||||
|
||||
def test_get_referrer(self):
|
||||
event = MagicMock(request=self.request)
|
||||
|
||||
def home(request):
|
||||
pass
|
||||
|
||||
self.pyramid_config.add_route('home', '/')
|
||||
self.pyramid_config.add_view(home, route_name='home')
|
||||
|
||||
self.assertFalse(hasattr(self.request, 'get_referrer'))
|
||||
subscribers.new_request(event)
|
||||
self.assertTrue(hasattr(self.request, 'get_referrer'))
|
||||
|
||||
# default if no referrer, is home route
|
||||
url = self.request.get_referrer()
|
||||
self.assertEqual(url, self.request.route_url('home'))
|
||||
|
||||
# can specify another default
|
||||
url = self.request.get_referrer(default='https://wuttaproject.org')
|
||||
self.assertEqual(url, 'https://wuttaproject.org')
|
||||
|
||||
# or referrer can come from user session
|
||||
self.request.session['referrer'] = 'https://rattailproject.org'
|
||||
self.assertIn('referrer', self.request.session)
|
||||
url = self.request.get_referrer()
|
||||
self.assertEqual(url, 'https://rattailproject.org')
|
||||
# nb. referrer should also have been removed from user session
|
||||
self.assertNotIn('referrer', self.request.session)
|
||||
|
||||
# or referrer can come from request params
|
||||
self.request.params['referrer'] = 'https://kernel.org'
|
||||
url = self.request.get_referrer()
|
||||
self.assertEqual(url, 'https://kernel.org')
|
||||
|
||||
|
||||
def custom_oruga_detector(request):
|
||||
|
@ -97,6 +135,81 @@ class TestNewRequestSetUser(TestCase):
|
|||
subscribers.new_request_set_user(event, db_session=self.session)
|
||||
self.assertIs(self.request.user, self.user)
|
||||
|
||||
def test_is_admin(self):
|
||||
event = MagicMock(request=self.request)
|
||||
|
||||
# anonymous user
|
||||
self.assertFalse(hasattr(self.request, 'user'))
|
||||
self.assertFalse(hasattr(self.request, 'is_admin'))
|
||||
subscribers.new_request_set_user(event, db_session=self.session)
|
||||
self.assertIsNone(self.request.user)
|
||||
self.assertFalse(self.request.is_admin)
|
||||
|
||||
# reset
|
||||
del self.request.is_admin
|
||||
|
||||
# authenticated user, but still not an admin
|
||||
self.request.user = self.user
|
||||
subscribers.new_request_set_user(event, db_session=self.session)
|
||||
self.assertIs(self.request.user, self.user)
|
||||
self.assertFalse(self.request.is_admin)
|
||||
|
||||
# reset
|
||||
del self.request.is_admin
|
||||
|
||||
# but if we make them an admin, it changes
|
||||
auth = self.app.get_auth_handler()
|
||||
admin = auth.get_role_administrator(self.session)
|
||||
self.user.roles.append(admin)
|
||||
self.session.commit()
|
||||
subscribers.new_request_set_user(event, db_session=self.session)
|
||||
self.assertIs(self.request.user, self.user)
|
||||
self.assertTrue(self.request.is_admin)
|
||||
|
||||
def test_is_root(self):
|
||||
event = MagicMock(request=self.request)
|
||||
|
||||
# anonymous user
|
||||
self.assertFalse(hasattr(self.request, 'user'))
|
||||
self.assertFalse(hasattr(self.request, 'is_root'))
|
||||
subscribers.new_request_set_user(event, db_session=self.session)
|
||||
self.assertIsNone(self.request.user)
|
||||
self.assertFalse(self.request.is_root)
|
||||
|
||||
# reset
|
||||
del self.request.is_admin
|
||||
del self.request.is_root
|
||||
|
||||
# authenticated user, but still not an admin
|
||||
self.request.user = self.user
|
||||
subscribers.new_request_set_user(event, db_session=self.session)
|
||||
self.assertIs(self.request.user, self.user)
|
||||
self.assertFalse(self.request.is_root)
|
||||
|
||||
# reset
|
||||
del self.request.is_admin
|
||||
del self.request.is_root
|
||||
|
||||
# even if we make them an admin, still not yet root
|
||||
auth = self.app.get_auth_handler()
|
||||
admin = auth.get_role_administrator(self.session)
|
||||
self.user.roles.append(admin)
|
||||
self.session.commit()
|
||||
subscribers.new_request_set_user(event, db_session=self.session)
|
||||
self.assertIs(self.request.user, self.user)
|
||||
self.assertTrue(self.request.is_admin)
|
||||
self.assertFalse(self.request.is_root)
|
||||
|
||||
# reset
|
||||
del self.request.is_admin
|
||||
del self.request.is_root
|
||||
|
||||
# root status flag lives in user session
|
||||
self.request.session['is_root'] = True
|
||||
subscribers.new_request_set_user(event, db_session=self.session)
|
||||
self.assertTrue(self.request.is_admin)
|
||||
self.assertTrue(self.request.is_root)
|
||||
|
||||
|
||||
class TestBeforeRender(TestCase):
|
||||
|
||||
|
|
|
@ -4,11 +4,12 @@ from unittest import TestCase
|
|||
from unittest.mock import MagicMock
|
||||
|
||||
from pyramid import testing
|
||||
from pyramid.httpexceptions import HTTPFound
|
||||
from pyramid.httpexceptions import HTTPFound, HTTPForbidden
|
||||
|
||||
from wuttjamaican.conf import WuttaConfig
|
||||
from wuttaweb.views import auth as mod
|
||||
from wuttaweb.auth import WuttaSecurityPolicy
|
||||
from wuttaweb.subscribers import new_request
|
||||
|
||||
|
||||
class TestAuthView(TestCase):
|
||||
|
@ -19,7 +20,9 @@ class TestAuthView(TestCase):
|
|||
})
|
||||
|
||||
self.request = testing.DummyRequest(wutta_config=self.config, user=None)
|
||||
self.pyramid_config = testing.setUp(request=self.request)
|
||||
self.pyramid_config = testing.setUp(request=self.request, settings={
|
||||
'wutta_config': self.config,
|
||||
})
|
||||
|
||||
self.app = self.config.get_app()
|
||||
auth = self.app.get_auth_handler()
|
||||
|
@ -142,3 +145,46 @@ class TestAuthView(TestCase):
|
|||
self.assertIn('form', context)
|
||||
dform = context['form'].get_deform()
|
||||
self.assertEqual(dform['new_password'].errormsg, "New password must be different from old password.")
|
||||
|
||||
def test_become_root(self):
|
||||
event = MagicMock(request=self.request)
|
||||
new_request(event) # add request.get_referrer()
|
||||
view = mod.AuthView(self.request)
|
||||
|
||||
# GET not allowed
|
||||
self.request.method = 'GET'
|
||||
self.assertRaises(HTTPForbidden, view.become_root)
|
||||
|
||||
# non-admin users also not allowed
|
||||
self.request.method = 'POST'
|
||||
self.request.is_admin = False
|
||||
self.assertRaises(HTTPForbidden, view.become_root)
|
||||
|
||||
# but admin users can become root
|
||||
self.request.is_admin = True
|
||||
self.assertNotIn('is_root', self.request.session)
|
||||
redirect = view.become_root()
|
||||
self.assertIsInstance(redirect, HTTPFound)
|
||||
self.assertTrue(self.request.session['is_root'])
|
||||
|
||||
def test_stop_root(self):
|
||||
event = MagicMock(request=self.request)
|
||||
new_request(event) # add request.get_referrer()
|
||||
view = mod.AuthView(self.request)
|
||||
|
||||
# GET not allowed
|
||||
self.request.method = 'GET'
|
||||
self.assertRaises(HTTPForbidden, view.stop_root)
|
||||
|
||||
# non-admin users also not allowed
|
||||
self.request.method = 'POST'
|
||||
self.request.is_admin = False
|
||||
self.assertRaises(HTTPForbidden, view.stop_root)
|
||||
|
||||
# but admin users can stop being root
|
||||
# (nb. there is no check whether user is currently root)
|
||||
self.request.is_admin = True
|
||||
self.assertNotIn('is_root', self.request.session)
|
||||
redirect = view.stop_root()
|
||||
self.assertIsInstance(redirect, HTTPFound)
|
||||
self.assertFalse(self.request.session['is_root'])
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
from unittest import TestCase
|
||||
|
||||
from pyramid import testing
|
||||
from pyramid.httpexceptions import HTTPFound
|
||||
from pyramid.httpexceptions import HTTPFound, HTTPForbidden
|
||||
|
||||
from wuttjamaican.conf import WuttaConfig
|
||||
from wuttaweb.views import base
|
||||
|
@ -23,6 +23,10 @@ class TestView(TestCase):
|
|||
self.assertIs(self.view.config, self.config)
|
||||
self.assertIs(self.view.app, self.app)
|
||||
|
||||
def test_forbidden(self):
|
||||
error = self.view.forbidden()
|
||||
self.assertIsInstance(error, HTTPForbidden)
|
||||
|
||||
def test_make_form(self):
|
||||
form = self.view.make_form()
|
||||
self.assertIsInstance(form, Form)
|
||||
|
|
Loading…
Reference in a new issue