3
0
Fork 0

Compare commits

...

11 commits

Author SHA1 Message Date
Lance Edgar 17df2c0f56 bump: version 0.2.0 → 0.3.0 2024-08-05 15:32:46 -05:00
Lance Edgar 0e0460b831 fix: allow custom user getter for new_request_set_user() hook 2024-08-05 15:06:55 -05:00
Lance Edgar fc339ba81b feat: add support for admin user to become / stop being root 2024-08-05 14:21:54 -05:00
Lance Edgar a2ba88ca8f feat: add view to change current user password 2024-08-05 11:45:00 -05:00
Lance Edgar 70d13ee1e7 feat: add basic logo, favicon images
definitely should replace these at some point..
2024-08-05 09:31:22 -05:00
Lance Edgar a505ef27fb feat: add auth views, for login/logout 2024-08-05 09:31:19 -05:00
Lance Edgar e296b50aa4 feat: add custom security policy, login/logout for pyramid
aka. the `wuttaweb.auth` module
2024-08-05 00:12:20 -05:00
Lance Edgar c6f0007908 feat: add wuttaweb.views.essential module 2024-08-05 00:12:20 -05:00
Lance Edgar 95d3623a5e feat: add initial/basic forms support 2024-08-05 00:12:17 -05:00
Lance Edgar 0604651be5 feat: add wuttaweb.db module, with Session 2024-08-04 20:32:08 -05:00
Lance Edgar 3b6b317377 feat: add util.get_form_data() convenience function 2024-08-04 20:32:05 -05:00
40 changed files with 2367 additions and 44 deletions

View file

@ -5,6 +5,24 @@ All notable changes to wuttaweb will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).
## v0.3.0 (2024-08-05)
### Feat
- add support for admin user to become / stop being root
- add view to change current user password
- add basic logo, favicon images
- add auth views, for login/logout
- add custom security policy, login/logout for pyramid
- add `wuttaweb.views.essential` module
- add initial/basic forms support
- add `wuttaweb.db` module, with `Session`
- add `util.get_form_data()` convenience function
### Fix
- allow custom user getter for `new_request_set_user()` hook
## v0.2.0 (2024-07-14)
### Feat

View file

@ -0,0 +1,6 @@
``wuttaweb.auth``
=================
.. automodule:: wuttaweb.auth
:members:

6
docs/api/wuttaweb/db.rst Normal file
View file

@ -0,0 +1,6 @@
``wuttaweb.db``
===============
.. automodule:: wuttaweb.db
:members:

View file

@ -0,0 +1,6 @@
``wuttaweb.forms.base``
=======================
.. automodule:: wuttaweb.forms.base
:members:

View file

@ -0,0 +1,6 @@
``wuttaweb.forms``
==================
.. automodule:: wuttaweb.forms
:members:

View file

@ -8,6 +8,10 @@
:maxdepth: 1
app
auth
db
forms
forms.base
handler
helpers
menus
@ -15,5 +19,7 @@
subscribers
util
views
views.auth
views.base
views.common
views.essential

View file

@ -0,0 +1,6 @@
``wuttaweb.views.auth``
=======================
.. automodule:: wuttaweb.views.auth
:members:

View file

@ -0,0 +1,6 @@
``wuttaweb.views.essential``
============================
.. automodule:: wuttaweb.views.essential
:members:

View file

@ -20,12 +20,15 @@ extensions = [
'sphinx.ext.autodoc',
'sphinx.ext.intersphinx',
'sphinx.ext.viewcode',
'sphinx.ext.todo',
]
templates_path = ['_templates']
exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store']
intersphinx_mapping = {
'colander': ('https://docs.pylonsproject.org/projects/colander/en/latest/', None),
'deform': ('https://docs.pylonsproject.org/projects/deform/en/latest/', None),
'pyramid': ('https://docs.pylonsproject.org/projects/pyramid/en/latest/', None),
'python': ('https://docs.python.org/3/', None),
'webhelpers2': ('https://webhelpers2.readthedocs.io/en/latest/', None),

View file

@ -6,7 +6,7 @@ build-backend = "hatchling.build"
[project]
name = "WuttaWeb"
version = "0.2.0"
version = "0.3.0"
description = "Web App for Wutta Framework"
readme = "README.md"
authors = [{name = "Lance Edgar", email = "lance@edbob.org"}]
@ -32,10 +32,13 @@ requires-python = ">= 3.8"
dependencies = [
"pyramid>=2",
"pyramid_beaker",
"pyramid_deform",
"pyramid_mako",
"pyramid_tm",
"waitress",
"WebHelpers2",
"WuttJamaican[db]>=0.7.0",
"zope.sqlalchemy>=1.5",
]

View file

@ -31,6 +31,9 @@ from wuttjamaican.conf import make_config
from pyramid.config import Configurator
import wuttaweb.db
from wuttaweb.auth import WuttaSecurityPolicy
class WebAppProvider(AppProvider):
"""
@ -83,17 +86,21 @@ def make_wutta_config(settings):
If this config file path cannot be discovered, an error is raised.
"""
# initialize config and embed in settings dict, to make
# available for web requests later
# validate config file path
path = settings.get('wutta.config')
if not path or not os.path.exists(path):
raise ValueError("Please set 'wutta.config' in [app:main] "
"section of config to the path of your "
"config file. Lame, but necessary.")
# make config per usual, add to settings
wutta_config = make_config(path)
settings['wutta_config'] = wutta_config
# configure database sessions
if hasattr(wutta_config, 'appdb_engine'):
wuttaweb.db.Session.configure(bind=wutta_config.appdb_engine)
return wutta_config
@ -104,10 +111,18 @@ def make_pyramid_config(settings):
The config is initialized with certain features deemed useful for
all apps.
"""
settings.setdefault('pyramid_deform.template_search_path',
'wuttaweb:templates/deform')
pyramid_config = Configurator(settings=settings)
# configure user authorization / authentication
pyramid_config.set_security_policy(WuttaSecurityPolicy())
pyramid_config.include('pyramid_beaker')
pyramid_config.include('pyramid_deform')
pyramid_config.include('pyramid_mako')
pyramid_config.include('pyramid_tm')
return pyramid_config

150
src/wuttaweb/auth.py Normal file
View file

@ -0,0 +1,150 @@
# -*- coding: utf-8; -*-
################################################################################
#
# wuttaweb -- Web App for Wutta Framework
# Copyright © 2024 Lance Edgar
#
# This file is part of Wutta Framework.
#
# Wutta Framework is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# Wutta Framework is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License along with
# Wutta Framework. If not, see <http://www.gnu.org/licenses/>.
#
################################################################################
"""
Auth Utility Logic
"""
import re
from pyramid.authentication import SessionAuthenticationHelper
from pyramid.request import RequestLocalCache
from pyramid.security import remember, forget
from wuttaweb.db import Session
def login_user(request, user):
"""
Perform the steps necessary to "login" the given user. This
returns a ``headers`` dict which you should pass to the final
redirect, like so::
from pyramid.httpexceptions import HTTPFound
headers = login_user(request, user)
return HTTPFound(location='/', headers=headers)
.. warning::
This logic does not "authenticate" the user! It assumes caller
has already authenticated the user and they are safe to login.
See also :func:`logout_user()`.
"""
headers = remember(request, user.uuid)
return headers
def logout_user(request):
"""
Perform the logout action for the given request. This returns a
``headers`` dict which you should pass to the final redirect, like
so::
from pyramid.httpexceptions import HTTPFound
headers = logout_user(request)
return HTTPFound(location='/', headers=headers)
See also :func:`login_user()`.
"""
request.session.delete()
request.session.invalidate()
headers = forget(request)
return headers
class WuttaSecurityPolicy:
"""
Pyramid :term:`security policy` for WuttaWeb.
For more on the Pyramid details, see :doc:`pyramid:narr/security`.
But the idea here is that you should be able to just use this,
without thinking too hard::
from pyramid.config import Configurator
from wuttaweb.auth import WuttaSecurityPolicy
pyramid_config = Configurator()
pyramid_config.set_security_policy(WuttaSecurityPolicy())
This security policy will then do the following:
* use the request "web session" for auth storage (e.g. current
``user.uuid``)
* check permissions as needed, by calling
:meth:`~wuttjamaican:wuttjamaican.auth.AuthHandler.has_permission()`
for current user
:param db_session: Optional :term:`db session` to use, instead of
:class:`wuttaweb.db.Session`. Probably only useful for tests.
"""
def __init__(self, db_session=None):
self.session_helper = SessionAuthenticationHelper()
self.identity_cache = RequestLocalCache(self.load_identity)
self.db_session = db_session or Session()
def load_identity(self, request):
config = request.registry.settings['wutta_config']
app = config.get_app()
model = app.model
# fetch user uuid from current session
uuid = self.session_helper.authenticated_userid(request)
if not uuid:
return
# fetch user object from db
user = self.db_session.get(model.User, uuid)
if not user:
return
return user
def identity(self, request):
return self.identity_cache.get_or_create(request)
def authenticated_userid(self, request):
user = self.identity(request)
if user is not None:
return user.uuid
def remember(self, request, userid, **kw):
return self.session_helper.remember(request, userid, **kw)
def forget(self, request, **kw):
return self.session_helper.forget(request, **kw)
def permits(self, request, context, permission):
# nb. root user can do anything
if getattr(request, 'is_root', False):
return True
config = request.registry.settings['wutta_config']
app = config.get_app()
auth = app.get_auth_handler()
user = self.identity(request)
return auth.has_permission(self.db_session, user, permission)

66
src/wuttaweb/db.py Normal file
View file

@ -0,0 +1,66 @@
# -*- coding: utf-8; -*-
################################################################################
#
# wuttaweb -- Web App for Wutta Framework
# Copyright © 2024 Lance Edgar
#
# This file is part of Wutta Framework.
#
# Wutta Framework is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# Wutta Framework is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License along with
# Wutta Framework. If not, see <http://www.gnu.org/licenses/>.
#
################################################################################
"""
Database sessions for web app
The web app uses a different database session than other
(e.g. console) apps. The web session is "registered" to the HTTP
request/response life cycle (aka. transaction) such that the session
is automatically rolled back on error, and automatically committed if
the response is finalized without error.
.. class:: Session
Primary database session class for the web app.
Note that you often do not need to "instantiate" this session, and
can instead call methods directly on the class::
from wuttaweb.db import Session
users = Session.query(model.User).all()
However in certain cases you may still want/need to instantiate it,
e.g. when passing a "true/normal" session to other logic. But you
can always call instance methods as well::
from wuttaweb.db import Session
from some_place import some_func
session = Session()
# nb. assuming func does not expect a "web" session per se, pass instance
some_func(session)
# nb. these behave the same (instance vs. class method)
users = session.query(model.User).all()
users = Session.query(model.User).all()
"""
from sqlalchemy import orm
from zope.sqlalchemy.datamanager import register
Session = orm.scoped_session(orm.sessionmaker())
register(Session)

View file

@ -0,0 +1,31 @@
# -*- coding: utf-8; -*-
################################################################################
#
# wuttaweb -- Web App for Wutta Framework
# Copyright © 2024 Lance Edgar
#
# This file is part of Wutta Framework.
#
# Wutta Framework is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# Wutta Framework is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License along with
# Wutta Framework. If not, see <http://www.gnu.org/licenses/>.
#
################################################################################
"""
Forms Library
The ``wuttaweb.forms`` namespace contains the following:
* :class:`~wuttaweb.forms.base.Form`
"""
from .base import Form

476
src/wuttaweb/forms/base.py Normal file
View file

@ -0,0 +1,476 @@
# -*- coding: utf-8; -*-
################################################################################
#
# wuttaweb -- Web App for Wutta Framework
# Copyright © 2024 Lance Edgar
#
# This file is part of Wutta Framework.
#
# Wutta Framework is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# Wutta Framework is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License along with
# Wutta Framework. If not, see <http://www.gnu.org/licenses/>.
#
################################################################################
"""
Base form classes
"""
import json
import logging
import colander
import deform
from pyramid.renderers import render
from webhelpers2.html import HTML
from wuttaweb.util import get_form_data
log = logging.getLogger(__name__)
class FieldList(list):
"""
Convenience wrapper for a form's field list. This is a subclass
of :class:`python:list`.
You normally would not need to instantiate this yourself, but it
is used under the hood for e.g. :attr:`Form.fields`.
"""
def insert_before(self, field, newfield):
"""
Insert a new field, before an existing field.
:param field: String name for the existing field.
:param newfield: String name for the new field, to be inserted
just before the existing ``field``.
"""
if field in self:
i = self.index(field)
self.insert(i, newfield)
else:
log.warning("field '%s' not found, will append new field: %s",
field, newfield)
self.append(newfield)
def insert_after(self, field, newfield):
"""
Insert a new field, after an existing field.
:param field: String name for the existing field.
:param newfield: String name for the new field, to be inserted
just after the existing ``field``.
"""
if field in self:
i = self.index(field)
self.insert(i + 1, newfield)
else:
log.warning("field '%s' not found, will append new field: %s",
field, newfield)
self.append(newfield)
class Form:
"""
Base class for all forms.
:param request: Reference to current :term:`request` object.
:param fields: List of field names for the form. This is
optional; if not specified an attempt will be made to deduce
the list automatically. See also :attr:`fields`.
:param schema: Colander-based schema object for the form. This is
optional; if not specified an attempt will be made to construct
one automatically. See also :meth:`get_schema()`.
:param labels: Optional dict of default field labels.
.. note::
Some parameters are not explicitly described above. However
their corresponding attributes are described below.
Form instances contain the following attributes:
.. attribute:: fields
:class:`FieldList` instance containing string field names for
the form. By default, fields will appear in the same order as
they are in this list.
.. attribute:: request
Reference to current :term:`request` object.
.. attribute:: action_url
String URL to which the form should be submitted, if applicable.
.. attribute:: vue_tagname
String name for Vue component tag. By default this is
``'wutta-form'``. See also :meth:`render_vue_tag()`.
.. attribute:: align_buttons_right
Flag indicating whether the buttons (submit, cancel etc.)
should be aligned to the right of the area below the form. If
not set, the buttons are left-aligned.
.. attribute:: auto_disable_submit
Flag indicating whether the submit button should be
auto-disabled, whenever the form is submitted.
.. attribute:: button_label_submit
String label for the form submit button. Default is ``"Save"``.
.. attribute:: button_icon_submit
String icon name for the form submit button. Default is ``'save'``.
.. attribute:: show_button_reset
Flag indicating whether a Reset button should be shown.
.. attribute:: validated
If the :meth:`validate()` method was called, and it succeeded,
this will be set to the validated data dict.
Note that in all other cases, this attribute may not exist.
"""
def __init__(
self,
request,
fields=None,
schema=None,
labels={},
action_url=None,
vue_tagname='wutta-form',
align_buttons_right=False,
auto_disable_submit=True,
button_label_submit="Save",
button_icon_submit='save',
show_button_reset=False,
):
self.request = request
self.schema = schema
self.labels = labels or {}
self.action_url = action_url
self.vue_tagname = vue_tagname
self.align_buttons_right = align_buttons_right
self.auto_disable_submit = auto_disable_submit
self.button_label_submit = button_label_submit
self.button_icon_submit = button_icon_submit
self.show_button_reset = show_button_reset
self.config = self.request.wutta_config
self.app = self.config.get_app()
if fields is not None:
self.set_fields(fields)
elif self.schema:
self.set_fields([f.name for f in self.schema])
else:
self.fields = None
def __contains__(self, name):
"""
Custom logic for the ``in`` operator, to allow easily checking
if the form contains a given field::
myform = Form()
if 'somefield' in myform:
print("my form has some field")
"""
return bool(self.fields and name in self.fields)
def __iter__(self):
"""
Custom logic to allow iterating over form field names::
myform = Form(fields=['foo', 'bar'])
for fieldname in myform:
print(fieldname)
"""
return iter(self.fields)
@property
def vue_component(self):
"""
String name for the Vue component, e.g. ``'WuttaForm'``.
This is a generated value based on :attr:`vue_tagname`.
"""
words = self.vue_tagname.split('-')
return ''.join([word.capitalize() for word in words])
def set_fields(self, fields):
"""
Explicitly set the list of form fields.
This will overwrite :attr:`fields` with a new
:class:`FieldList` instance.
:param fields: List of string field names.
"""
self.fields = FieldList(fields)
def set_label(self, key, label):
"""
Set the label for given field name.
See also :meth:`get_label()`.
"""
self.labels[key] = label
# update schema if necessary
if self.schema and key in self.schema:
self.schema[key].title = label
def get_label(self, key):
"""
Get the label for given field name.
Note that this will always return a string, auto-generating
the label if needed.
See also :meth:`set_label()`.
"""
return self.labels.get(key, self.app.make_title(key))
def get_schema(self):
"""
Return the :class:`colander:colander.Schema` object for the
form, generating it automatically if necessary.
"""
if not self.schema:
raise NotImplementedError
return self.schema
def get_deform(self):
"""
Return the :class:`deform:deform.Form` instance for the form,
generating it automatically if necessary.
"""
if not hasattr(self, 'deform_form'):
schema = self.get_schema()
form = deform.Form(schema)
self.deform_form = form
return self.deform_form
def render_vue_tag(self, **kwargs):
"""
Render the Vue component tag for the form.
By default this simply returns:
.. code-block:: html
<wutta-form></wutta-form>
The actual output will depend on various form attributes, in
particular :attr:`vue_tagname`.
"""
return HTML.tag(self.vue_tagname, **kwargs)
def render_vue_template(
self,
template='/forms/vue_template.mako',
**context):
"""
Render the Vue template block for the form.
This returns something like:
.. code-block:: none
<script type="text/x-template" id="wutta-form-template">
<form>
<!-- fields etc. -->
</form>
</script>
.. todo::
Why can't Sphinx render the above code block as 'html' ?
It acts like it can't handle a ``<script>`` tag at all?
Actual output will of course depend on form attributes, i.e.
:attr:`vue_tagname` and :attr:`fields` list etc.
:param template: Path to Mako template which is used to render
the output.
"""
context['form'] = self
context.setdefault('form_attrs', {})
# auto disable button on submit
if self.auto_disable_submit:
context['form_attrs']['@submit'] = 'formSubmitting = true'
output = render(template, context)
return HTML.literal(output)
def render_vue_field(self, fieldname):
"""
Render the given field completely, i.e. ``<b-field>`` wrapper
with label and containing a widget.
Actual output will depend on the field attributes etc.
Typical output might look like:
.. code-block:: html
<b-field label="Foo"
horizontal
type="is-danger"
message="something went wrong!">
<!-- widget element(s) -->
</b-field>
"""
dform = self.get_deform()
field = dform[fieldname]
# render the field widget or whatever
html = field.serialize()
html = HTML.literal(html)
# render field label
label = self.get_label(fieldname)
# b-field attrs
attrs = {
':horizontal': 'true',
'label': label,
}
# next we will build array of messages to display..some
# fields always show a "helptext" msg, and some may have
# validation errors..
field_type = None
messages = []
# show errors if present
errors = self.get_field_errors(fieldname)
if errors:
field_type = 'is-danger'
messages.extend(errors)
# ..okay now we can declare the field messages and type
if field_type:
attrs['type'] = field_type
if messages:
if len(messages) == 1:
msg = messages[0]
if msg.startswith('`') and msg.endswith('`'):
attrs[':message'] = msg
else:
attrs['message'] = msg
# TODO
# else:
# # nb. must pass an array as JSON string
# attrs[':message'] = '[{}]'.format(', '.join([
# "'{}'".format(msg.replace("'", r"\'"))
# for msg in messages]))
return HTML.tag('b-field', c=[html], **attrs)
def get_field_errors(self, field):
"""
Return a list of error messages for the given field.
Not useful unless a call to :meth:`validate()` failed.
"""
dform = self.get_deform()
if field in dform:
error = dform[field].errormsg
if error:
return [error]
return []
def get_vue_field_value(self, field):
"""
This method returns a JSON string which will be assigned as
the initial model value for the given field. This JSON will
be written as part of the overall response, to be interpreted
on the client side.
Again, this must return a *string* such as:
* ``'null'``
* ``'{"foo": "bar"}'``
In practice this calls :meth:`jsonify_value()` to convert the
``field.cstruct`` value to string.
"""
if isinstance(field, str):
dform = self.get_deform()
field = dform[field]
return self.jsonify_value(field.cstruct)
def jsonify_value(self, value):
"""
Convert a Python value to JSON string.
See also :meth:`get_vue_field_value()`.
"""
if value is colander.null:
return 'null'
return json.dumps(value)
def validate(self):
"""
Try to validate the form.
This should work whether request data was submitted as classic
POST data, or as JSON body.
If the form data is valid, this method returns the data dict.
This data dict is also then available on the form object via
the :attr:`validated` attribute.
However if the data is not valid, ``False`` is returned, and
there will be no :attr:`validated` attribute. In that case
you should inspect the form errors to learn/display what went
wrong for the user's sake. See also
:meth:`get_field_errors()`.
:returns: Data dict, or ``False``.
"""
if hasattr(self, 'validated'):
del self.validated
if self.request.method != 'POST':
return False
dform = self.get_deform()
controls = get_form_data(self.request).items()
try:
self.validated = dform.validate(controls)
except deform.ValidationFailure:
return False
return self.validated

Binary file not shown.

After

Width:  |  Height:  |  Size: 5.6 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 20 KiB

View file

@ -35,12 +35,14 @@ However some custom apps may need to supplement or replace the event
hooks contained here, depending on the circumstance.
"""
import functools
import json
import logging
from pyramid import threadlocal
from wuttaweb import helpers
from wuttaweb.db import Session
log = logging.getLogger(__name__)
@ -48,7 +50,7 @@ log = logging.getLogger(__name__)
def new_request(event):
"""
Event hook called when processing a new request.
Event hook called when processing a new :term:`request`.
The hook is auto-registered if this module is "included" by
Pyramid config object. Or you can explicitly register it::
@ -56,17 +58,27 @@ def new_request(event):
pyramid_config.add_subscriber('wuttaweb.subscribers.new_request',
'pyramid.events.NewRequest')
This will add some things to the request object:
This will add to the request object:
.. attribute:: request.wutta_config
Reference to the app :term:`config object`.
.. method:: request.get_referrer(default=None)
Request method to get the "canonical" HTTP referrer value.
This has logic to check for referrer in the request params,
user session etc.
:param default: Optional default URL if none is found in
request params/session. If no default is specified,
the ``'home'`` route is used.
.. attribute:: request.use_oruga
Flag indicating whether the frontend should be displayed using
Vue 3 + Oruga (if ``True``), or else Vue 2 + Buefy (if
``False``).
``False``). This flag is ``False`` by default.
"""
request = event.request
config = request.registry.settings['wutta_config']
@ -74,6 +86,19 @@ def new_request(event):
request.wutta_config = config
def get_referrer(default=None):
if request.params.get('referrer'):
return request.params['referrer']
if request.session.get('referrer'):
return request.session.pop('referrer')
referrer = getattr(request, 'referrer', None)
if (not referrer or referrer == request.current_route_url()
or not referrer.startswith(request.host_url)):
referrer = default or request.route_url('home')
return referrer
request.get_referrer = get_referrer
def use_oruga(request):
spec = config.get('wuttaweb.oruga_detector.spec')
if spec:
@ -84,6 +109,89 @@ def new_request(event):
request.set_property(use_oruga, reify=True)
def default_user_getter(request, db_session=None):
"""
This is the default function used to retrieve user object from
database. Result of this is then assigned to :attr:`request.user`
as part of the :func:`new_request_set_user()` hook.
"""
uuid = request.authenticated_userid
if uuid:
config = request.wutta_config
app = config.get_app()
model = app.model
session = db_session or Session()
return session.get(model.User, uuid)
def new_request_set_user(
event,
user_getter=default_user_getter,
db_session=None,
):
"""
Event hook called when processing a new :term:`request`, for sake
of setting the :attr:`request.user` and similar properties.
The hook is auto-registered if this module is "included" by
Pyramid config object. Or you can explicitly register it::
pyramid_config.add_subscriber('wuttaweb.subscribers.new_request_set_user',
'pyramid.events.NewRequest')
This will add to the request object:
.. attribute:: request.user
Reference to the authenticated
:class:`~wuttjamaican:wuttjamaican.db.model.auth.User` instance
(if logged in), or ``None``.
.. attribute:: request.is_admin
Flag indicating whether current user is a member of the
Administrator role.
.. attribute:: request.is_root
Flag indicating whether user is currently elevated to root
privileges. This is only possible if :attr:`request.is_admin`
is also true.
You may wish to "supplement" this hook by registering your own
custom hook and then invoking this one as needed. You can then
pass certain params to override only parts of the logic:
:param user_getter: Optional getter function to retrieve the user
from database, instead of :func:`default_user_getter()`.
:param db_session: Optional :term:`db session` to use,
instead of :class:`wuttaweb.db.Session`.
"""
request = event.request
config = request.registry.settings['wutta_config']
app = config.get_app()
# request.user
if db_session:
user_getter = functools.partial(user_getter, db_session=db_session)
request.set_property(user_getter, name='user', reify=True)
# request.is_admin
def is_admin(request):
auth = app.get_auth_handler()
return auth.user_is_admin(request.user)
request.set_property(is_admin, reify=True)
# request.is_root
def is_root(request):
if request.is_admin:
if request.session.get('is_root', False):
return True
return False
request.set_property(is_root, reify=True)
def before_render(event):
"""
Event hook called just before rendering a template.
@ -151,4 +259,5 @@ def before_render(event):
def includeme(config):
config.add_subscriber(new_request, 'pyramid.events.NewRequest')
config.add_subscriber(new_request_set_user, 'pyramid.events.NewRequest')
config.add_subscriber(before_render, 'pyramid.events.BeforeRender')

View file

@ -0,0 +1,7 @@
## -*- coding: utf-8; -*-
<%inherit file="/form.mako" />
<%def name="title()">Change Password</%def>
${parent.body()}

View file

@ -0,0 +1,48 @@
## -*- coding: utf-8; -*-
<%inherit file="/form.mako" />
<%namespace name="base_meta" file="/base_meta.mako" />
<%def name="title()">Login</%def>
<%def name="render_this_page()">
${self.page_content()}
</%def>
<%def name="page_content()">
<div style="height: 100%; display: flex; flex-direction: column; align-items: center; justify-content: center; gap: 1rem;">
<div>${base_meta.full_logo()}</div>
<div class="card">
<div class="card-content">
${form.render_vue_tag()}
</div>
</div>
</div>
</%def>
<%def name="modify_this_page_vars()">
<script>
${form.vue_component}Data.usernameInput = null
${form.vue_component}.mounted = function() {
this.$refs.username.focus()
this.usernameInput = this.$refs.username.$el.querySelector('input')
this.usernameInput.addEventListener('keydown', this.usernameKeydown)
}
${form.vue_component}.beforeDestroy = function() {
this.usernameInput.removeEventListener('keydown', this.usernameKeydown)
}
${form.vue_component}.methods.usernameKeydown = function(event) {
if (event.which == 13) { // ENTER
event.preventDefault()
this.$refs.password.focus()
}
}
</script>
</%def>
${parent.body()}

View file

@ -151,9 +151,11 @@
<div class="navbar-brand">
<a class="navbar-item" href="${url('home')}">
${base_meta.header_logo()}
<div id="global-header-title">
${base_meta.global_title()}
<div style="display: flex; gap: 0.3rem; align-items: center;">
${base_meta.header_logo()}
<div id="global-header-title">
${base_meta.global_title()}
</div>
</div>
</a>
<a role="button" class="navbar-burger" data-target="navbar-menu" aria-label="menu" aria-expanded="false">
@ -309,7 +311,40 @@
</div>
</%def>
<%def name="render_user_menu()"></%def>
<%def name="render_user_menu()">
% if request.user:
<div class="navbar-item has-dropdown is-hoverable">
<a class="navbar-link ${'has-background-danger has-text-white' if request.is_root else ''}">${request.user}</a>
<div class="navbar-dropdown">
% if request.is_root:
${h.form(url('stop_root'), ref='stopBeingRootForm')}
## TODO
## ${h.csrf_token(request)}
<input type="hidden" name="referrer" value="${request.current_route_url()}" />
<a @click="stopBeingRoot()"
class="navbar-item has-background-danger has-text-white">
Stop being root
</a>
${h.end_form()}
% elif request.is_admin:
${h.form(url('become_root'), ref='startBeingRootForm')}
## TODO
## ${h.csrf_token(request)}
<input type="hidden" name="referrer" value="${request.current_route_url()}" />
<a @click="startBeingRoot()"
class="navbar-item has-background-danger has-text-white">
Become root
</a>
${h.end_form()}
% endif
${h.link_to("Change Password", url('change_password'), class_='navbar-item')}
${h.link_to("Logout", url('logout'), class_='navbar-item')}
</div>
</div>
% else:
${h.link_to("Login", url('login'), class_='navbar-item')}
% endif
</%def>
<%def name="render_instance_header_title_extras()"></%def>
@ -345,6 +380,18 @@
const key = 'menu_' + hash + '_shown'
this[key] = !this[key]
},
% if request.is_admin:
startBeingRoot() {
this.$refs.startBeingRootForm.submit()
},
stopBeingRoot() {
this.$refs.stopBeingRootForm.submit()
},
% endif
},
}

View file

@ -7,11 +7,15 @@
<%def name="extra_styles()"></%def>
<%def name="favicon()">
## <link rel="icon" type="image/x-icon" href="${config.get('tailbone', 'favicon_url', default=request.static_url('wuttaweb:static/img/favicon.ico'))}" />
<link rel="icon" type="image/x-icon" href="${config.get('wuttaweb.favicon_url', default=request.static_url('wuttaweb:static/img/favicon.ico'))}" />
</%def>
<%def name="header_logo()">
## ${h.image(config.get('wuttaweb.header_image_url', default=request.static_url('wuttaweb:static/img/logo.png')), "Header Logo", style="height: 49px;")}
${h.image(config.get('wuttaweb.header_logo_url', default=request.static_url('wuttaweb:static/img/favicon.ico')), "Header Logo", style="height: 49px;")}
</%def>
<%def name="full_logo()">
${h.image(config.get('wuttaweb.logo_url', default=request.static_url('wuttaweb:static/img/logo.png')), f"{app.get_title()} logo")}
</%def>
<%def name="footer()">

View file

@ -0,0 +1,13 @@
<div tal:define="name name|field.name;
vmodel vmodel|'model_'+name;">
${field.start_mapping()}
<b-input name="${name}"
value="${field.widget.redisplay and cstruct or ''}"
type="password"
placeholder="Password" />
<b-input name="${name}-confirm"
value="${field.widget.redisplay and confirm or ''}"
type="password"
placeholder="Confirm Password" />
${field.end_mapping()}
</div>

View file

@ -0,0 +1,8 @@
<div tal:omit-tag=""
tal:define="name name|field.name;
vmodel vmodel|'model_'+name;">
<b-input name="${name}"
v-model="${vmodel}"
type="password"
tal:attributes="attributes|field.widget.attributes|{};" />
</div>

View file

@ -0,0 +1,7 @@
<div tal:omit-tag=""
tal:define="name name|field.name;
vmodel vmodel|'model_'+name;">
<b-input name="${name}"
v-model="${vmodel}"
tal:attributes="attributes|field.widget.attributes|{};" />
</div>

View file

@ -0,0 +1,24 @@
## -*- coding: utf-8; -*-
<%inherit file="/page.mako" />
<%def name="page_content()">
<div style="margin-top: 2rem; width: 50%;">
${form.render_vue_tag()}
</div>
</%def>
<%def name="render_this_page_template()">
${parent.render_this_page_template()}
${form.render_vue_template()}
</%def>
<%def name="finalize_this_page_vars()">
${parent.finalize_this_page_vars()}
<script>
${form.vue_component}.data = function() { return ${form.vue_component}Data }
Vue.component('${form.vue_tagname}', ${form.vue_component})
</script>
</%def>
${parent.body()}

View file

@ -0,0 +1,58 @@
## -*- coding: utf-8; -*-
<script type="text/x-template" id="${form.vue_tagname}-template">
${h.form(form.action_url, method='post', enctype='multipart/form-data', **form_attrs)}
<section>
% for fieldname in form:
${form.render_vue_field(fieldname)}
% endfor
</section>
<div style="margin-top: 1.5rem; display: flex; gap: 0.5rem; justify-content: ${'end' if form.align_buttons_right else 'start'}; width: 100%; padding-left: 10rem;">
% if form.show_button_reset:
<b-button native-type="reset">
Reset
</b-button>
% endif
<b-button type="is-primary"
native-type="submit"
% if form.auto_disable_submit:
:disabled="formSubmitting"
% endif
icon-pack="fas"
icon-left="${form.button_icon_submit}">
% if form.auto_disable_submit:
{{ formSubmitting ? "Working, please wait..." : "${form.button_label_submit}" }}
% else:
${form.button_label_submit}
% endif
</b-button>
</div>
${h.end_form()}
</script>
<script>
let ${form.vue_component} = {
template: '#${form.vue_tagname}-template',
methods: {},
}
let ${form.vue_component}Data = {
## field model values
% for key in form:
model_${key}: ${form.get_vue_field_value(key)|n},
% endfor
% if form.auto_disable_submit:
formSubmitting: false,
% endif
}
</script>

View file

@ -9,11 +9,9 @@
</%def>
<%def name="page_content()">
<div style="height: 100%; display: flex; align-items: center; justify-content: center;">
<div class="logo">
## ${h.image(image_url, "{} logo".format(capture(base_meta.app_title)))}
<h1 class="is-size-1">Welcome to ${base_meta.app_title()}</h1>
</div>
<div style="height: 100%; display: flex; flex-direction: column; align-items: center; justify-content: center; gap: 1rem;">
<div>${base_meta.full_logo()}</div>
<h1 class="is-size-1">Welcome to ${app.get_title()}</h1>
</div>
</%def>

View file

@ -21,12 +21,33 @@
#
################################################################################
"""
Utilities
Web Utilities
"""
import importlib
def get_form_data(request):
"""
Returns the effective form data for the given request.
Mostly this is a convenience, which simply returns one of the
following, depending on various attributes of the request.
* :attr:`pyramid:pyramid.request.Request.POST`
* :attr:`pyramid:pyramid.request.Request.json_body`
"""
# nb. we prefer JSON only if no POST is present
# TODO: this seems to work for our use case at least, but perhaps
# there is a better way? see also
# https://docs.pylonsproject.org/projects/pyramid/en/latest/api/request.html#pyramid.request.Request.is_xhr
if not request.POST and (
getattr(request, 'is_xhr', False)
or getattr(request, 'content_type', None) == 'application/json'):
return request.json_body
return request.POST
def get_libver(
request,
key,

View file

@ -33,4 +33,4 @@ from .base import View
def includeme(config):
config.include('wuttaweb.views.common')
config.include('wuttaweb.views.essential')

288
src/wuttaweb/views/auth.py Normal file
View file

@ -0,0 +1,288 @@
# -*- coding: utf-8; -*-
################################################################################
#
# wuttaweb -- Web App for Wutta Framework
# Copyright © 2024 Lance Edgar
#
# This file is part of Wutta Framework.
#
# Wutta Framework is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# Wutta Framework is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License along with
# Wutta Framework. If not, see <http://www.gnu.org/licenses/>.
#
################################################################################
"""
Auth Views
"""
import colander
from deform.widget import TextInputWidget, PasswordWidget, CheckedPasswordWidget
from wuttaweb.views import View
from wuttaweb.db import Session
from wuttaweb.auth import login_user, logout_user
class AuthView(View):
"""
Auth views shared by all apps.
"""
def login(self, session=None):
"""
View for user login.
This view shows the login form, and handles its submission.
Upon successful login, user is redirected to home page.
* route: ``login``
* template: ``/auth/login.mako``
"""
auth = self.app.get_auth_handler()
# TODO: should call request.get_referrer()
referrer = self.request.route_url('home')
# redirect if already logged in
if self.request.user:
self.request.session.flash(f"{self.request.user} is already logged in", 'error')
return self.redirect(referrer)
form = self.make_form(schema=self.login_make_schema(),
align_buttons_right=True,
show_button_reset=True,
button_label_submit="Login",
button_icon_submit='user')
# TODO
# form.show_cancel = False
# validate basic form data (sanity check)
data = form.validate()
if data:
# truly validate user credentials
session = session or Session()
user = auth.authenticate_user(session, data['username'], data['password'])
if user:
# okay now they're truly logged in
headers = login_user(self.request, user)
return self.redirect(referrer, headers=headers)
else:
self.request.session.flash("Invalid user credentials", 'error')
return {
'index_title': self.app.get_title(),
'form': form,
# TODO
# 'referrer': referrer,
}
def login_make_schema(self):
schema = colander.Schema()
# nb. we must explicitly declare the widgets in order to also
# specify the ref attribute. this is needed for autofocus and
# keydown behavior for login form.
schema.add(colander.SchemaNode(
colander.String(),
name='username',
widget=TextInputWidget(attributes={
'ref': 'username',
})))
schema.add(colander.SchemaNode(
colander.String(),
name='password',
widget=PasswordWidget(attributes={
'ref': 'password',
})))
return schema
def logout(self):
"""
View for user logout.
This deletes/invalidates the current user session and then
redirects to the login page.
Note that a simple GET is sufficient; POST is not required.
* route: ``logout``
* template: n/a
"""
# truly logout the user
headers = logout_user(self.request)
# TODO
# # redirect to home page after logout, if so configured
# if self.config.get_bool('wuttaweb.home_after_logout', default=False):
# return self.redirect(self.request.route_url('home'), headers=headers)
# otherwise redirect to referrer, with 'login' page as fallback
# TODO: should call request.get_referrer()
# referrer = self.request.get_referrer(default=self.request.route_url('login'))
referrer = self.request.route_url('login')
return self.redirect(referrer, headers=headers)
def change_password(self):
"""
View allowing a user to change their own password.
This view shows a change-password form, and handles its
submission. If successful, user is redirected to home page.
If current user is not authenticated, no form is shown and
user is redirected to home page.
* route: ``change_password``
* template: ``/auth/change_password.mako``
"""
if not self.request.user:
return self.redirect(self.request.route_url('home'))
form = self.make_form(schema=self.change_password_make_schema(),
show_button_reset=True)
data = form.validate()
if data:
auth = self.app.get_auth_handler()
auth.set_user_password(self.request.user, data['new_password'])
self.request.session.flash("Your password has been changed.")
# TODO: should use request.get_referrer() instead
referrer = self.request.route_url('home')
return self.redirect(referrer)
return {'index_title': str(self.request.user),
'form': form}
def change_password_make_schema(self):
schema = colander.Schema()
schema.add(colander.SchemaNode(
colander.String(),
name='current_password',
widget=PasswordWidget(),
validator=self.change_password_validate_current_password))
schema.add(colander.SchemaNode(
colander.String(),
name='new_password',
widget=CheckedPasswordWidget(),
validator=self.change_password_validate_new_password))
return schema
def change_password_validate_current_password(self, node, value):
auth = self.app.get_auth_handler()
user = self.request.user
if not auth.check_user_password(user, value):
node.raise_invalid("Current password is incorrect.")
def change_password_validate_new_password(self, node, value):
auth = self.app.get_auth_handler()
user = self.request.user
if auth.check_user_password(user, value):
node.raise_invalid("New password must be different from old password.")
def become_root(self):
"""
Elevate the current request to 'root' for full system access.
This is only allowed if current (authenticated) user is a
member of the Administrator role. Also note that GET is not
allowed for this view, only POST.
See also :meth:`stop_root()`.
"""
if self.request.method != 'POST':
raise self.forbidden()
if not self.request.is_admin:
raise self.forbidden()
self.request.session['is_root'] = True
self.request.session.flash("You have been elevated to 'root' and now have full system access")
url = self.request.get_referrer()
return self.redirect(url)
def stop_root(self):
"""
Lower the current request from 'root' back to normal access.
Also note that GET is not allowed for this view, only POST.
See also :meth:`become_root()`.
"""
if self.request.method != 'POST':
raise self.forbidden()
if not self.request.is_admin:
raise self.forbidden()
self.request.session['is_root'] = False
self.request.session.flash("Your normal system access has been restored")
url = self.request.get_referrer()
return self.redirect(url)
@classmethod
def defaults(cls, config):
cls._auth_defaults(config)
@classmethod
def _auth_defaults(cls, config):
# login
config.add_route('login', '/login')
config.add_view(cls, attr='login',
route_name='login',
renderer='/auth/login.mako')
# logout
config.add_route('logout', '/logout')
config.add_view(cls, attr='logout',
route_name='logout')
# change password
config.add_route('change_password', '/change-password')
config.add_view(cls, attr='change_password',
route_name='change_password',
renderer='/auth/change_password.mako')
# become root
config.add_route('become_root', '/root/yes',
request_method='POST')
config.add_view(cls, attr='become_root',
route_name='become_root')
# stop root
config.add_route('stop_root', '/root/no',
request_method='POST')
config.add_view(cls, attr='stop_root',
route_name='stop_root')
def defaults(config, **kwargs):
base = globals()
AuthView = kwargs.get('AuthView', base['AuthView'])
AuthView.defaults(config)
def includeme(config):
defaults(config)

View file

@ -24,6 +24,10 @@
Base Logic for Views
"""
from pyramid import httpexceptions
from wuttaweb import forms
class View:
"""
@ -35,8 +39,7 @@ class View:
.. attribute:: request
Reference to the current
:class:`pyramid:pyramid.request.Request` object.
Reference to the current :term:`request` object.
.. attribute:: app
@ -51,3 +54,38 @@ class View:
self.request = request
self.config = self.request.wutta_config
self.app = self.config.get_app()
def forbidden(self):
"""
Convenience method, to raise a HTTP 403 Forbidden exception::
raise self.forbidden()
"""
return httpexceptions.HTTPForbidden()
def make_form(self, **kwargs):
"""
Make and return a new :class:`~wuttaweb.forms.base.Form`
instance, per the given ``kwargs``.
This is the "default" form factory which merely invokes
the constructor.
"""
return forms.Form(self.request, **kwargs)
def redirect(self, url, **kwargs):
"""
Convenience method to return a HTTP 302 response.
Note that this technically returns an "exception" - so in
your code, you can either return that error, or raise it::
return self.redirect('/')
# ..or
raise self.redirect('/')
Which you should do will depend on context, but raising the
error is always "safe" since Pyramid will handle that
correctly no matter what.
"""
return httpexceptions.HTTPFound(location=url, **kwargs)

View file

@ -0,0 +1,45 @@
# -*- coding: utf-8; -*-
################################################################################
#
# wuttaweb -- Web App for Wutta Framework
# Copyright © 2024 Lance Edgar
#
# This file is part of Wutta Framework.
#
# Wutta Framework is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# Wutta Framework is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License along with
# Wutta Framework. If not, see <http://www.gnu.org/licenses/>.
#
################################################################################
"""
Essential views for convenient includes
Most apps should include this module::
pyramid_config.include('wuttaweb.views.essential')
That will in turn include the following modules:
* :mod:`wuttaweb.views.auth`
* :mod:`wuttaweb.views.common`
"""
def defaults(config, **kwargs):
mod = lambda spec: kwargs.get(spec, spec)
config.include(mod('wuttaweb.views.auth'))
config.include(mod('wuttaweb.views.common'))
def includeme(config):
defaults(config)

0
tests/forms/__init__.py Normal file
View file

271
tests/forms/test_base.py Normal file
View file

@ -0,0 +1,271 @@
# -*- coding: utf-8; -*-
from unittest import TestCase
from unittest.mock import MagicMock
import colander
import deform
from pyramid import testing
from wuttjamaican.conf import WuttaConfig
from wuttaweb.forms import base
from wuttaweb import helpers
class TestFieldList(TestCase):
def test_insert_before(self):
fields = base.FieldList(['f1', 'f2'])
self.assertEqual(fields, ['f1', 'f2'])
# typical
fields.insert_before('f1', 'XXX')
self.assertEqual(fields, ['XXX', 'f1', 'f2'])
fields.insert_before('f2', 'YYY')
self.assertEqual(fields, ['XXX', 'f1', 'YYY', 'f2'])
# appends new field if reference field is invalid
fields.insert_before('f3', 'ZZZ')
self.assertEqual(fields, ['XXX', 'f1', 'YYY', 'f2', 'ZZZ'])
def test_insert_after(self):
fields = base.FieldList(['f1', 'f2'])
self.assertEqual(fields, ['f1', 'f2'])
# typical
fields.insert_after('f1', 'XXX')
self.assertEqual(fields, ['f1', 'XXX', 'f2'])
fields.insert_after('XXX', 'YYY')
self.assertEqual(fields, ['f1', 'XXX', 'YYY', 'f2'])
# appends new field if reference field is invalid
fields.insert_after('f3', 'ZZZ')
self.assertEqual(fields, ['f1', 'XXX', 'YYY', 'f2', 'ZZZ'])
class TestForm(TestCase):
def setUp(self):
self.config = WuttaConfig()
self.request = testing.DummyRequest(wutta_config=self.config)
self.pyramid_config = testing.setUp(request=self.request, settings={
'mako.directories': ['wuttaweb:templates'],
'pyramid_deform.template_search_path': 'wuttaweb:templates/deform',
})
def tearDown(self):
testing.tearDown()
def make_form(self, request=None, **kwargs):
return base.Form(request or self.request, **kwargs)
def make_schema(self):
schema = colander.Schema(children=[
colander.SchemaNode(colander.String(),
name='foo'),
colander.SchemaNode(colander.String(),
name='bar'),
])
return schema
def test_init_with_none(self):
form = self.make_form()
self.assertIsNone(form.fields)
def test_init_with_fields(self):
form = self.make_form(fields=['foo', 'bar'])
self.assertEqual(form.fields, ['foo', 'bar'])
def test_init_with_schema(self):
schema = self.make_schema()
form = self.make_form(schema=schema)
self.assertEqual(form.fields, ['foo', 'bar'])
def test_vue_tagname(self):
form = self.make_form()
self.assertEqual(form.vue_tagname, 'wutta-form')
def test_vue_component(self):
form = self.make_form()
self.assertEqual(form.vue_component, 'WuttaForm')
def test_contains(self):
form = self.make_form(fields=['foo', 'bar'])
self.assertIn('foo', form)
self.assertNotIn('baz', form)
def test_iter(self):
form = self.make_form(fields=['foo', 'bar'])
fields = list(iter(form))
self.assertEqual(fields, ['foo', 'bar'])
fields = []
for field in form:
fields.append(field)
self.assertEqual(fields, ['foo', 'bar'])
def test_set_fields(self):
form = self.make_form(fields=['foo', 'bar'])
self.assertEqual(form.fields, ['foo', 'bar'])
form.set_fields(['baz'])
self.assertEqual(form.fields, ['baz'])
def test_get_schema(self):
form = self.make_form()
self.assertIsNone(form.schema)
# provided schema is returned
schema = self.make_schema()
form = self.make_form(schema=schema)
self.assertIs(form.schema, schema)
self.assertIs(form.get_schema(), schema)
# auto-generating schema not yet supported
form = self.make_form(fields=['foo', 'bar'])
self.assertIsNone(form.schema)
self.assertRaises(NotImplementedError, form.get_schema)
def test_get_deform(self):
schema = self.make_schema()
form = self.make_form(schema=schema)
self.assertFalse(hasattr(form, 'deform_form'))
dform = form.get_deform()
self.assertIsInstance(dform, deform.Form)
self.assertIs(form.deform_form, dform)
def test_get_label(self):
form = self.make_form(fields=['foo', 'bar'])
self.assertEqual(form.get_label('foo'), "Foo")
form.set_label('foo', "Baz")
self.assertEqual(form.get_label('foo'), "Baz")
def test_set_label(self):
form = self.make_form(fields=['foo', 'bar'])
self.assertEqual(form.get_label('foo'), "Foo")
form.set_label('foo', "Baz")
self.assertEqual(form.get_label('foo'), "Baz")
# schema should be updated when setting label
schema = self.make_schema()
form = self.make_form(schema=schema)
form.set_label('foo', "Woohoo")
self.assertEqual(form.get_label('foo'), "Woohoo")
self.assertEqual(schema['foo'].title, "Woohoo")
def test_render_vue_tag(self):
schema = self.make_schema()
form = self.make_form(schema=schema)
html = form.render_vue_tag()
self.assertEqual(html, '<wutta-form></wutta-form>')
def test_render_vue_template(self):
self.pyramid_config.include('pyramid_mako')
self.pyramid_config.add_subscriber('wuttaweb.subscribers.before_render',
'pyramid.events.BeforeRender')
# form button is disabled on @submit by default
schema = self.make_schema()
form = self.make_form(schema=schema)
html = form.render_vue_template()
self.assertIn('<script type="text/x-template" id="wutta-form-template">', html)
self.assertIn('@submit', html)
# but not if form is configured otherwise
form = self.make_form(schema=schema, auto_disable_submit=False)
html = form.render_vue_template()
self.assertIn('<script type="text/x-template" id="wutta-form-template">', html)
self.assertNotIn('@submit', html)
def test_render_vue_field(self):
self.pyramid_config.include('pyramid_deform')
schema = self.make_schema()
form = self.make_form(schema=schema)
dform = form.get_deform()
# typical
html = form.render_vue_field('foo')
self.assertIn('<b-field :horizontal="true" label="Foo">', html)
self.assertIn('<b-input name="foo"', html)
# nb. no error message
self.assertNotIn('message', html)
# with single "static" error
dform['foo'].error = MagicMock(msg="something is wrong")
html = form.render_vue_field('foo')
self.assertIn(' message="something is wrong"', html)
# with single "dynamic" error
dform['foo'].error = MagicMock(msg="`something is wrong`")
html = form.render_vue_field('foo')
self.assertIn(':message="`something is wrong`"', html)
def test_get_field_errors(self):
schema = self.make_schema()
form = self.make_form(schema=schema)
dform = form.get_deform()
# no error
errors = form.get_field_errors('foo')
self.assertEqual(len(errors), 0)
# simple error
dform['foo'].error = MagicMock(msg="something is wrong")
errors = form.get_field_errors('foo')
self.assertEqual(len(errors), 1)
self.assertEqual(errors[0], "something is wrong")
def test_get_vue_field_value(self):
schema = self.make_schema()
form = self.make_form(schema=schema)
# null field value
value = form.get_vue_field_value('foo')
self.assertEqual(value, 'null')
# non-default / explicit value
# TODO: surely need a different approach to set value
dform = form.get_deform()
dform['foo'].cstruct = 'blarg'
value = form.get_vue_field_value('foo')
self.assertEqual(value, '"blarg"')
def test_jsonify_value(self):
form = self.make_form()
# null field value
value = form.jsonify_value(colander.null)
self.assertEqual(value, 'null')
value = form.jsonify_value(None)
self.assertEqual(value, 'null')
# string value
value = form.jsonify_value('blarg')
self.assertEqual(value, '"blarg"')
def test_validate(self):
schema = self.make_schema()
form = self.make_form(schema=schema)
self.assertFalse(hasattr(form, 'validated'))
# will not validate unless request is POST
self.request.POST = {'foo': 'blarg', 'bar': 'baz'}
self.request.method = 'GET'
self.assertFalse(form.validate())
self.request.method = 'POST'
data = form.validate()
self.assertEqual(data, {'foo': 'blarg', 'bar': 'baz'})
# validating a second type updates form.validated
self.request.POST = {'foo': 'BLARG', 'bar': 'BAZ'}
data = form.validate()
self.assertEqual(data, {'foo': 'BLARG', 'bar': 'BAZ'})
self.assertIs(form.validated, data)
# bad data does not validate
self.request.POST = {'foo': 42, 'bar': None}
self.assertFalse(form.validate())
dform = form.get_deform()
self.assertEqual(len(dform.error.children), 2)
self.assertEqual(dform['foo'].errormsg, "Pstruct is not a string")

145
tests/test_auth.py Normal file
View file

@ -0,0 +1,145 @@
# -*- coding: utf-8; -*-
from unittest import TestCase
from unittest.mock import MagicMock
from pyramid import testing
from wuttjamaican.conf import WuttaConfig
from wuttaweb import auth as mod
class TestLoginUser(TestCase):
def test_basic(self):
config = WuttaConfig()
app = config.get_app()
model = app.model
request = testing.DummyRequest(wutta_config=config)
user = model.User(username='barney')
headers = mod.login_user(request, user)
self.assertEqual(headers, [])
class TestLogoutUser(TestCase):
def test_basic(self):
config = WuttaConfig()
request = testing.DummyRequest(wutta_config=config)
request.session.delete = MagicMock()
headers = mod.logout_user(request)
request.session.delete.assert_called_once_with()
self.assertEqual(headers, [])
class TestWuttaSecurityPolicy(TestCase):
def setUp(self):
self.config = WuttaConfig(defaults={
'wutta.db.default.url': 'sqlite://',
})
self.request = testing.DummyRequest()
self.pyramid_config = testing.setUp(request=self.request, settings={
'wutta_config': self.config,
})
self.app = self.config.get_app()
model = self.app.model
model.Base.metadata.create_all(bind=self.config.appdb_engine)
self.session = self.app.make_session()
self.user = model.User(username='barney')
self.session.add(self.user)
self.session.commit()
self.policy = self.make_policy()
def tearDown(self):
testing.tearDown()
def make_policy(self):
return mod.WuttaSecurityPolicy(db_session=self.session)
def test_remember(self):
uuid = self.user.uuid
self.assertIsNotNone(uuid)
self.assertIsNone(self.policy.session_helper.authenticated_userid(self.request))
self.policy.remember(self.request, uuid)
self.assertEqual(self.policy.session_helper.authenticated_userid(self.request), uuid)
def test_forget(self):
uuid = self.user.uuid
self.policy.remember(self.request, uuid)
self.assertEqual(self.policy.session_helper.authenticated_userid(self.request), uuid)
self.policy.forget(self.request)
self.assertIsNone(self.policy.session_helper.authenticated_userid(self.request))
def test_identity(self):
# no identity
user = self.policy.identity(self.request)
self.assertIsNone(user)
# identity is remembered (must use new policy to bust cache)
self.policy = self.make_policy()
uuid = self.user.uuid
self.assertIsNotNone(uuid)
self.policy.remember(self.request, uuid)
user = self.policy.identity(self.request)
self.assertIs(user, self.user)
# invalid identity yields no user
self.policy = self.make_policy()
self.policy.remember(self.request, 'bogus-user-uuid')
user = self.policy.identity(self.request)
self.assertIsNone(user)
def test_authenticated_userid(self):
# no identity
uuid = self.policy.authenticated_userid(self.request)
self.assertIsNone(uuid)
# identity is remembered (must use new policy to bust cache)
self.policy = self.make_policy()
self.policy.remember(self.request, self.user.uuid)
uuid = self.policy.authenticated_userid(self.request)
self.assertEqual(uuid, self.user.uuid)
def test_permits(self):
auth = self.app.get_auth_handler()
model = self.app.model
# anon has no perms
self.assertFalse(self.policy.permits(self.request, None, 'foo.bar'))
# but we can grant it
anons = auth.get_role_anonymous(self.session)
self.user.roles.append(anons)
auth.grant_permission(anons, 'foo.bar')
self.session.commit()
# and then perm check is satisfied
self.assertTrue(self.policy.permits(self.request, None, 'foo.bar'))
# now, create a separate role and grant another perm
# (but user does not yet belong to this role)
role = model.Role(name='whatever')
self.session.add(role)
auth.grant_permission(role, 'baz.edit')
self.session.commit()
# so far then, user does not have the permission
self.policy = self.make_policy()
self.policy.remember(self.request, self.user.uuid)
self.assertFalse(self.policy.permits(self.request, None, 'baz.edit'))
# but if we assign user to role, perm check should pass
self.user.roles.append(role)
self.session.commit()
self.assertTrue(self.policy.permits(self.request, None, 'baz.edit'))
# now let's try another perm - we won't grant it, but will
# confirm user is denied access unless they become root
self.assertFalse(self.policy.permits(self.request, None, 'some-root-perm'))
self.request.is_root = True
self.assertTrue(self.policy.permits(self.request, None, 'some-root-perm'))

View file

@ -7,55 +7,210 @@ from unittest.mock import MagicMock
from wuttjamaican.conf import WuttaConfig
from pyramid import testing
from pyramid.security import remember
from wuttaweb import subscribers
from wuttaweb import helpers
from wuttaweb.auth import WuttaSecurityPolicy
class TestNewRequest(TestCase):
def setUp(self):
self.config = WuttaConfig()
self.request = self.make_request()
self.pyramid_config = testing.setUp(request=self.request, settings={
'wutta_config': self.config,
})
def tearDown(self):
testing.tearDown()
def make_request(self):
request = testing.DummyRequest()
request.registry.settings = {'wutta_config': self.config}
# request.registry.settings = {'wutta_config': self.config}
return request
def test_wutta_config(self):
request = self.make_request()
event = MagicMock(request=request)
event = MagicMock(request=self.request)
# request gets a new attr
self.assertFalse(hasattr(request, 'wutta_config'))
self.assertFalse(hasattr(self.request, 'wutta_config'))
subscribers.new_request(event)
self.assertTrue(hasattr(request, 'wutta_config'))
self.assertIs(request.wutta_config, self.config)
self.assertTrue(hasattr(self.request, 'wutta_config'))
self.assertIs(self.request.wutta_config, self.config)
def test_use_oruga_default(self):
request = self.make_request()
event = MagicMock(request=request)
event = MagicMock(request=self.request)
# request gets a new attr, false by default
self.assertFalse(hasattr(request, 'use_oruga'))
self.assertFalse(hasattr(self.request, 'use_oruga'))
subscribers.new_request(event)
self.assertFalse(request.use_oruga)
self.assertFalse(self.request.use_oruga)
def test_use_oruga_custom(self):
self.config.setdefault('wuttaweb.oruga_detector.spec', 'tests.test_subscribers:custom_oruga_detector')
request = self.make_request()
event = MagicMock(request=request)
event = MagicMock(request=self.request)
# request gets a new attr, which should be true
self.assertFalse(hasattr(request, 'use_oruga'))
self.assertFalse(hasattr(self.request, 'use_oruga'))
subscribers.new_request(event)
self.assertTrue(request.use_oruga)
self.assertTrue(self.request.use_oruga)
def test_get_referrer(self):
event = MagicMock(request=self.request)
def home(request):
pass
self.pyramid_config.add_route('home', '/')
self.pyramid_config.add_view(home, route_name='home')
self.assertFalse(hasattr(self.request, 'get_referrer'))
subscribers.new_request(event)
self.assertTrue(hasattr(self.request, 'get_referrer'))
# default if no referrer, is home route
url = self.request.get_referrer()
self.assertEqual(url, self.request.route_url('home'))
# can specify another default
url = self.request.get_referrer(default='https://wuttaproject.org')
self.assertEqual(url, 'https://wuttaproject.org')
# or referrer can come from user session
self.request.session['referrer'] = 'https://rattailproject.org'
self.assertIn('referrer', self.request.session)
url = self.request.get_referrer()
self.assertEqual(url, 'https://rattailproject.org')
# nb. referrer should also have been removed from user session
self.assertNotIn('referrer', self.request.session)
# or referrer can come from request params
self.request.params['referrer'] = 'https://kernel.org'
url = self.request.get_referrer()
self.assertEqual(url, 'https://kernel.org')
def custom_oruga_detector(request):
return True
class TestNewRequestSetUser(TestCase):
def setUp(self):
self.config = WuttaConfig(defaults={
'wutta.db.default.url': 'sqlite://',
})
self.request = testing.DummyRequest(wutta_config=self.config)
self.pyramid_config = testing.setUp(request=self.request, settings={
'wutta_config': self.config,
})
self.app = self.config.get_app()
model = self.app.model
model.Base.metadata.create_all(bind=self.config.appdb_engine)
self.session = self.app.make_session()
self.user = model.User(username='barney')
self.session.add(self.user)
self.session.commit()
self.pyramid_config.set_security_policy(WuttaSecurityPolicy(db_session=self.session))
def tearDown(self):
testing.tearDown()
def test_anonymous(self):
self.assertFalse(hasattr(self.request, 'user'))
event = MagicMock(request=self.request)
subscribers.new_request_set_user(event)
self.assertIsNone(self.request.user)
def test_authenticated(self):
uuid = self.user.uuid
self.assertIsNotNone(uuid)
remember(self.request, uuid)
event = MagicMock(request=self.request)
subscribers.new_request_set_user(event, db_session=self.session)
self.assertIs(self.request.user, self.user)
def test_is_admin(self):
event = MagicMock(request=self.request)
# anonymous user
self.assertFalse(hasattr(self.request, 'user'))
self.assertFalse(hasattr(self.request, 'is_admin'))
subscribers.new_request_set_user(event, db_session=self.session)
self.assertIsNone(self.request.user)
self.assertFalse(self.request.is_admin)
# reset
del self.request.is_admin
# authenticated user, but still not an admin
self.request.user = self.user
subscribers.new_request_set_user(event, db_session=self.session)
self.assertIs(self.request.user, self.user)
self.assertFalse(self.request.is_admin)
# reset
del self.request.is_admin
# but if we make them an admin, it changes
auth = self.app.get_auth_handler()
admin = auth.get_role_administrator(self.session)
self.user.roles.append(admin)
self.session.commit()
subscribers.new_request_set_user(event, db_session=self.session)
self.assertIs(self.request.user, self.user)
self.assertTrue(self.request.is_admin)
def test_is_root(self):
event = MagicMock(request=self.request)
# anonymous user
self.assertFalse(hasattr(self.request, 'user'))
self.assertFalse(hasattr(self.request, 'is_root'))
subscribers.new_request_set_user(event, db_session=self.session)
self.assertIsNone(self.request.user)
self.assertFalse(self.request.is_root)
# reset
del self.request.is_admin
del self.request.is_root
# authenticated user, but still not an admin
self.request.user = self.user
subscribers.new_request_set_user(event, db_session=self.session)
self.assertIs(self.request.user, self.user)
self.assertFalse(self.request.is_root)
# reset
del self.request.is_admin
del self.request.is_root
# even if we make them an admin, still not yet root
auth = self.app.get_auth_handler()
admin = auth.get_role_administrator(self.session)
self.user.roles.append(admin)
self.session.commit()
subscribers.new_request_set_user(event, db_session=self.session)
self.assertIs(self.request.user, self.user)
self.assertTrue(self.request.is_admin)
self.assertFalse(self.request.is_root)
# reset
del self.request.is_admin
del self.request.is_root
# root status flag lives in user session
self.request.session['is_root'] = True
subscribers.new_request_set_user(event, db_session=self.session)
self.assertTrue(self.request.is_admin)
self.assertTrue(self.request.is_root)
class TestBeforeRender(TestCase):
def setUp(self):

View file

@ -263,3 +263,30 @@ class TestGetLibUrl(TestCase):
self.config.setdefault('wuttaweb.liburl.bb_vue_fontawesome', '/lib/vue-fontawesome.js')
url = util.get_liburl(self.request, 'bb_vue_fontawesome')
self.assertEqual(url, '/lib/vue-fontawesome.js')
class TestGetFormData(TestCase):
def setUp(self):
self.config = WuttaConfig()
def make_request(self, **kwargs):
kwargs.setdefault('wutta_config', self.config)
kwargs.setdefault('POST', {'foo1': 'bar'})
kwargs.setdefault('json_body', {'foo2': 'baz'})
return testing.DummyRequest(**kwargs)
def test_default(self):
request = self.make_request()
data = util.get_form_data(request)
self.assertEqual(data, {'foo1': 'bar'})
def test_is_xhr(self):
request = self.make_request(POST=None, is_xhr=True)
data = util.get_form_data(request)
self.assertEqual(data, {'foo2': 'baz'})
def test_content_type(self):
request = self.make_request(POST=None, content_type='application/json')
data = util.get_form_data(request)
self.assertEqual(data, {'foo2': 'baz'})

190
tests/views/test_auth.py Normal file
View file

@ -0,0 +1,190 @@
# -*- coding: utf-8; -*-
from unittest import TestCase
from unittest.mock import MagicMock
from pyramid import testing
from pyramid.httpexceptions import HTTPFound, HTTPForbidden
from wuttjamaican.conf import WuttaConfig
from wuttaweb.views import auth as mod
from wuttaweb.auth import WuttaSecurityPolicy
from wuttaweb.subscribers import new_request
class TestAuthView(TestCase):
def setUp(self):
self.config = WuttaConfig(defaults={
'wutta.db.default.url': 'sqlite://',
})
self.request = testing.DummyRequest(wutta_config=self.config, user=None)
self.pyramid_config = testing.setUp(request=self.request, settings={
'wutta_config': self.config,
})
self.app = self.config.get_app()
auth = self.app.get_auth_handler()
model = self.app.model
model.Base.metadata.create_all(bind=self.config.appdb_engine)
self.session = self.app.make_session()
self.user = model.User(username='barney')
self.session.add(self.user)
auth.set_user_password(self.user, 'testpass')
self.session.commit()
self.pyramid_config.set_security_policy(WuttaSecurityPolicy(db_session=self.session))
self.pyramid_config.include('wuttaweb.views.auth')
self.pyramid_config.include('wuttaweb.views.common')
def tearDown(self):
testing.tearDown()
def test_login(self):
view = mod.AuthView(self.request)
context = view.login()
self.assertIn('form', context)
# redirect if user already logged in
self.request.user = self.user
view = mod.AuthView(self.request)
redirect = view.login(session=self.session)
self.assertIsInstance(redirect, HTTPFound)
# login fails w/ wrong password
self.request.user = None
self.request.method = 'POST'
self.request.POST = {'username': 'barney', 'password': 'WRONG'}
view = mod.AuthView(self.request)
context = view.login(session=self.session)
self.assertIn('form', context)
# redirect if login succeeds
self.request.method = 'POST'
self.request.POST = {'username': 'barney', 'password': 'testpass'}
view = mod.AuthView(self.request)
redirect = view.login(session=self.session)
self.assertIsInstance(redirect, HTTPFound)
def test_logout(self):
view = mod.AuthView(self.request)
self.request.session.delete = MagicMock()
redirect = view.logout()
self.request.session.delete.assert_called_once_with()
self.assertIsInstance(redirect, HTTPFound)
def test_change_password(self):
view = mod.AuthView(self.request)
auth = self.app.get_auth_handler()
# unauthenticated user is redirected
redirect = view.change_password()
self.assertIsInstance(redirect, HTTPFound)
# now "login" the user, and set initial password
self.request.user = self.user
auth.set_user_password(self.user, 'foo')
self.session.commit()
# view should now return context w/ form
context = view.change_password()
self.assertIn('form', context)
# submit valid form, ensure password is changed
# (nb. this also would redirect user to home page)
self.request.method = 'POST'
self.request.POST = {
'current_password': 'foo',
# nb. new_password requires colander mapping structure
'__start__': 'new_password:mapping',
'new_password': 'bar',
'new_password-confirm': 'bar',
'__end__': 'new_password:mapping',
}
redirect = view.change_password()
self.assertIsInstance(redirect, HTTPFound)
self.session.commit()
self.session.refresh(self.user)
self.assertFalse(auth.check_user_password(self.user, 'foo'))
self.assertTrue(auth.check_user_password(self.user, 'bar'))
# at this point 'foo' is the password, now let's submit some
# invalid forms and make sure we get back a context w/ form
# first try empty data
self.request.POST = {}
context = view.change_password()
self.assertIn('form', context)
dform = context['form'].get_deform()
self.assertEqual(dform['current_password'].errormsg, "Required")
self.assertEqual(dform['new_password'].errormsg, "Required")
# now try bad current password
self.request.POST = {
'current_password': 'blahblah',
'__start__': 'new_password:mapping',
'new_password': 'baz',
'new_password-confirm': 'baz',
'__end__': 'new_password:mapping',
}
context = view.change_password()
self.assertIn('form', context)
dform = context['form'].get_deform()
self.assertEqual(dform['current_password'].errormsg, "Current password is incorrect.")
# now try bad new password
self.request.POST = {
'current_password': 'bar',
'__start__': 'new_password:mapping',
'new_password': 'bar',
'new_password-confirm': 'bar',
'__end__': 'new_password:mapping',
}
context = view.change_password()
self.assertIn('form', context)
dform = context['form'].get_deform()
self.assertEqual(dform['new_password'].errormsg, "New password must be different from old password.")
def test_become_root(self):
event = MagicMock(request=self.request)
new_request(event) # add request.get_referrer()
view = mod.AuthView(self.request)
# GET not allowed
self.request.method = 'GET'
self.assertRaises(HTTPForbidden, view.become_root)
# non-admin users also not allowed
self.request.method = 'POST'
self.request.is_admin = False
self.assertRaises(HTTPForbidden, view.become_root)
# but admin users can become root
self.request.is_admin = True
self.assertNotIn('is_root', self.request.session)
redirect = view.become_root()
self.assertIsInstance(redirect, HTTPFound)
self.assertTrue(self.request.session['is_root'])
def test_stop_root(self):
event = MagicMock(request=self.request)
new_request(event) # add request.get_referrer()
view = mod.AuthView(self.request)
# GET not allowed
self.request.method = 'GET'
self.assertRaises(HTTPForbidden, view.stop_root)
# non-admin users also not allowed
self.request.method = 'POST'
self.request.is_admin = False
self.assertRaises(HTTPForbidden, view.stop_root)
# but admin users can stop being root
# (nb. there is no check whether user is currently root)
self.request.is_admin = True
self.assertNotIn('is_root', self.request.session)
redirect = view.stop_root()
self.assertIsInstance(redirect, HTTPFound)
self.assertFalse(self.request.session['is_root'])

View file

@ -3,19 +3,35 @@
from unittest import TestCase
from pyramid import testing
from pyramid.httpexceptions import HTTPFound, HTTPForbidden
from wuttjamaican.conf import WuttaConfig
from wuttaweb.views import base
from wuttaweb.forms import Form
class TestView(TestCase):
def test_basic(self):
config = WuttaConfig()
request = testing.DummyRequest()
request.wutta_config = config
def setUp(self):
self.config = WuttaConfig()
self.app = self.config.get_app()
self.request = testing.DummyRequest(wutta_config=self.config)
self.view = base.View(self.request)
view = base.View(request)
self.assertIs(view.request, request)
self.assertIs(view.config, config)
self.assertIs(view.app, config.get_app())
def test_basic(self):
self.assertIs(self.view.request, self.request)
self.assertIs(self.view.config, self.config)
self.assertIs(self.view.app, self.app)
def test_forbidden(self):
error = self.view.forbidden()
self.assertIsInstance(error, HTTPForbidden)
def test_make_form(self):
form = self.view.make_form()
self.assertIsInstance(form, Form)
def test_redirect(self):
error = self.view.redirect('/')
self.assertIsInstance(error, HTTPFound)
self.assertEqual(error.location, '/')