Compare commits
4 commits
1995095627
...
ca997807e4
Author | SHA1 | Date | |
---|---|---|---|
![]() |
ca997807e4 | ||
![]() |
e899d06151 | ||
![]() |
43ca404837 | ||
![]() |
60d3fcd13b |
11
CHANGELOG.md
11
CHANGELOG.md
|
@ -5,6 +5,17 @@ All notable changes to WuttJamaican will be documented in this file.
|
||||||
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
|
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
|
||||||
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).
|
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).
|
||||||
|
|
||||||
|
## v0.8.0 (2024-07-14)
|
||||||
|
|
||||||
|
### Feat
|
||||||
|
|
||||||
|
- flesh out the auth handler; add people handler
|
||||||
|
- add model for Person; tie to User
|
||||||
|
|
||||||
|
### Fix
|
||||||
|
|
||||||
|
- add migration for auth tables
|
||||||
|
|
||||||
## v0.7.0 (2024-07-14)
|
## v0.7.0 (2024-07-14)
|
||||||
|
|
||||||
### Feat
|
### Feat
|
||||||
|
|
|
@ -17,5 +17,6 @@
|
||||||
db.model.base
|
db.model.base
|
||||||
db.sess
|
db.sess
|
||||||
exc
|
exc
|
||||||
|
people
|
||||||
testing
|
testing
|
||||||
util
|
util
|
||||||
|
|
6
docs/api/wuttjamaican/people.rst
Normal file
6
docs/api/wuttjamaican/people.rst
Normal file
|
@ -0,0 +1,6 @@
|
||||||
|
|
||||||
|
``wuttjamaican.people``
|
||||||
|
=======================
|
||||||
|
|
||||||
|
.. automodule:: wuttjamaican.people
|
||||||
|
:members:
|
|
@ -109,6 +109,13 @@ Glossary
|
||||||
Most :term:`apps<app>` will have at least one :term:`app
|
Most :term:`apps<app>` will have at least one :term:`app
|
||||||
database`.
|
database`.
|
||||||
|
|
||||||
|
db session
|
||||||
|
The "session" is a SQLAlchemy abstraction for an open database
|
||||||
|
connection, essentially.
|
||||||
|
|
||||||
|
In practice this generally refers to a
|
||||||
|
:class:`~wuttjamaican.db.sess.Session` instance.
|
||||||
|
|
||||||
entry point
|
entry point
|
||||||
This refers to a "setuptools-style" entry point specifically,
|
This refers to a "setuptools-style" entry point specifically,
|
||||||
which is a mechanism used to register "plugins" and the like.
|
which is a mechanism used to register "plugins" and the like.
|
||||||
|
|
|
@ -6,7 +6,7 @@ build-backend = "hatchling.build"
|
||||||
|
|
||||||
[project]
|
[project]
|
||||||
name = "WuttJamaican"
|
name = "WuttJamaican"
|
||||||
version = "0.7.0"
|
version = "0.8.0"
|
||||||
description = "Base package for Wutta Framework"
|
description = "Base package for Wutta Framework"
|
||||||
readme = "README.md"
|
readme = "README.md"
|
||||||
authors = [{name = "Lance Edgar", email = "lance@edbob.org"}]
|
authors = [{name = "Lance Edgar", email = "lance@edbob.org"}]
|
||||||
|
@ -32,7 +32,7 @@ dependencies = [
|
||||||
|
|
||||||
|
|
||||||
[project.optional-dependencies]
|
[project.optional-dependencies]
|
||||||
db = ["SQLAlchemy<2", "alembic"]
|
db = ["SQLAlchemy<2", "alembic", "passlib"]
|
||||||
docs = ["Sphinx", "sphinxcontrib-programoutput", "furo"]
|
docs = ["Sphinx", "sphinxcontrib-programoutput", "furo"]
|
||||||
tests = ["pytest-cov", "tox"]
|
tests = ["pytest-cov", "tox"]
|
||||||
|
|
||||||
|
|
|
@ -66,6 +66,8 @@ class AppHandler:
|
||||||
"""
|
"""
|
||||||
default_app_title = "WuttJamaican"
|
default_app_title = "WuttJamaican"
|
||||||
default_model_spec = 'wuttjamaican.db.model'
|
default_model_spec = 'wuttjamaican.db.model'
|
||||||
|
default_auth_handler_spec = 'wuttjamaican.auth:AuthHandler'
|
||||||
|
default_people_handler_spec = 'wuttjamaican.people:PeopleHandler'
|
||||||
|
|
||||||
def __init__(self, config):
|
def __init__(self, config):
|
||||||
self.config = config
|
self.config = config
|
||||||
|
@ -241,6 +243,16 @@ class AppHandler:
|
||||||
"""
|
"""
|
||||||
return make_uuid()
|
return make_uuid()
|
||||||
|
|
||||||
|
def get_session(self, obj):
|
||||||
|
"""
|
||||||
|
Returns the SQLAlchemy session with which the given object is
|
||||||
|
associated. Simple convenience wrapper around
|
||||||
|
:func:`sqlalchemy:sqlalchemy.orm.object_session()`.
|
||||||
|
"""
|
||||||
|
from sqlalchemy import orm
|
||||||
|
|
||||||
|
return orm.object_session(obj)
|
||||||
|
|
||||||
def short_session(self, **kwargs):
|
def short_session(self, **kwargs):
|
||||||
"""
|
"""
|
||||||
Returns a context manager for a short-lived database session.
|
Returns a context manager for a short-lived database session.
|
||||||
|
@ -279,6 +291,51 @@ class AppHandler:
|
||||||
|
|
||||||
return get_setting(session, name)
|
return get_setting(session, name)
|
||||||
|
|
||||||
|
##############################
|
||||||
|
# getters for other handlers
|
||||||
|
##############################
|
||||||
|
|
||||||
|
def get_auth_handler(self, **kwargs):
|
||||||
|
"""
|
||||||
|
Get the configured :term:`auth handler`.
|
||||||
|
|
||||||
|
:rtype: :class:`~wuttjamaican.auth.AuthHandler`
|
||||||
|
"""
|
||||||
|
if 'auth' not in self.handlers:
|
||||||
|
spec = self.config.get(f'{self.appname}.auth.handler',
|
||||||
|
default=self.default_auth_handler_spec)
|
||||||
|
factory = self.load_object(spec)
|
||||||
|
self.handlers['auth'] = factory(self.config, **kwargs)
|
||||||
|
return self.handlers['auth']
|
||||||
|
|
||||||
|
def get_people_handler(self, **kwargs):
|
||||||
|
"""
|
||||||
|
Get the configured "people" :term:`handler`.
|
||||||
|
|
||||||
|
:rtype: :class:`~wuttjamaican.people.PeopleHandler`
|
||||||
|
"""
|
||||||
|
if 'people' not in self.handlers:
|
||||||
|
spec = self.config.get(f'{self.appname}.people.handler',
|
||||||
|
default=self.default_people_handler_spec)
|
||||||
|
factory = self.load_object(spec)
|
||||||
|
self.handlers['people'] = factory(self.config, **kwargs)
|
||||||
|
return self.handlers['people']
|
||||||
|
|
||||||
|
##############################
|
||||||
|
# convenience delegators
|
||||||
|
##############################
|
||||||
|
|
||||||
|
def get_person(self, obj, **kwargs):
|
||||||
|
"""
|
||||||
|
Convenience method to locate a
|
||||||
|
:class:`~wuttjamaican.db.model.base.Person` for the given
|
||||||
|
object.
|
||||||
|
|
||||||
|
This delegates to the "people" handler method,
|
||||||
|
:meth:`~wuttjamaican.people.PeopleHandler.get_person()`.
|
||||||
|
"""
|
||||||
|
return self.get_people_handler().get_person(obj, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
class AppProvider:
|
class AppProvider:
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -29,6 +29,16 @@ This defines the default :term:`auth handler`.
|
||||||
from wuttjamaican.app import GenericHandler
|
from wuttjamaican.app import GenericHandler
|
||||||
|
|
||||||
|
|
||||||
|
# nb. this only works if passlib is installed (part of 'db' extra)
|
||||||
|
try:
|
||||||
|
from passlib.context import CryptContext
|
||||||
|
except ImportError: # pragma: no cover
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
password_context = CryptContext(schemes=['bcrypt'])
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class AuthHandler(GenericHandler):
|
class AuthHandler(GenericHandler):
|
||||||
"""
|
"""
|
||||||
Base class and default implementation for the :term:`auth
|
Base class and default implementation for the :term:`auth
|
||||||
|
@ -37,8 +47,436 @@ class AuthHandler(GenericHandler):
|
||||||
This is responsible for "authentication and authorization" - for
|
This is responsible for "authentication and authorization" - for
|
||||||
instance:
|
instance:
|
||||||
|
|
||||||
* create new users, roles
|
* authenticate user from login credentials
|
||||||
|
* check which permissions a user/role has
|
||||||
|
* create/modify users, roles
|
||||||
* grant/revoke role permissions
|
* grant/revoke role permissions
|
||||||
* determine which permissions a user has
|
|
||||||
* identify user from login credentials
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
def authenticate_user(self, session, username, password, **kwargs):
|
||||||
|
"""
|
||||||
|
Authenticate the given user credentials, and if successful,
|
||||||
|
return the :class:`~wuttjamaican.db.model.auth.User`.
|
||||||
|
|
||||||
|
Default logic will (try to) locate a user with matching
|
||||||
|
username, then confirm the supplied password is also a match.
|
||||||
|
|
||||||
|
Custom handlers can authenticate against anything else, using
|
||||||
|
the given credentials. But they still must return a "native"
|
||||||
|
``User`` object for the app to consider the authentication
|
||||||
|
successful. The handler may auto-create the user if needed.
|
||||||
|
|
||||||
|
Generally speaking the credentials will have come directly
|
||||||
|
from a user login attempt in the web app etc. Again the
|
||||||
|
default logic assumes a "username" but in practice it may be
|
||||||
|
an email address etc. - whatever the user entered.
|
||||||
|
|
||||||
|
:param session: Open :term:`db session`.
|
||||||
|
|
||||||
|
:param username: Usually a string, but also may be a
|
||||||
|
:class:`~wuttjamaican.db.model.auth.User` instance, in
|
||||||
|
which case no user lookup will occur. (However the user is
|
||||||
|
still authenticated otherwise, i.e. the password must be
|
||||||
|
correct etc.)
|
||||||
|
|
||||||
|
:param password: Password as string.
|
||||||
|
|
||||||
|
:returns: :class:`~wuttjamaican.db.model.auth.User` instance,
|
||||||
|
or ``None``.
|
||||||
|
"""
|
||||||
|
model = self.app.model
|
||||||
|
|
||||||
|
if isinstance(username, model.User):
|
||||||
|
user = username
|
||||||
|
else:
|
||||||
|
user = session.query(model.User)\
|
||||||
|
.filter_by(username=username)\
|
||||||
|
.first()
|
||||||
|
|
||||||
|
if user and user.active and user.password:
|
||||||
|
if password_context.verify(password, user.password):
|
||||||
|
return user
|
||||||
|
|
||||||
|
def get_role(self, session, key, **kwargs):
|
||||||
|
"""
|
||||||
|
Locate and return a :class:`~wuttjamaican.db.model.auth.Role`
|
||||||
|
per the given key, if possible.
|
||||||
|
|
||||||
|
:param session: Open :term:`db session`.
|
||||||
|
|
||||||
|
:param key: Value to use when searching for the role. Can be
|
||||||
|
a UUID or name of a role.
|
||||||
|
|
||||||
|
:returns: :class:`~wuttjamaican.db.model.auth.Role` instance;
|
||||||
|
or ``None``.
|
||||||
|
"""
|
||||||
|
model = self.app.model
|
||||||
|
|
||||||
|
if not key:
|
||||||
|
return
|
||||||
|
|
||||||
|
# try to match on Role.uuid
|
||||||
|
role = session.get(model.Role, key)
|
||||||
|
if role:
|
||||||
|
return role
|
||||||
|
|
||||||
|
# try to match on Role.name
|
||||||
|
role = session.query(model.Role)\
|
||||||
|
.filter_by(name=key)\
|
||||||
|
.first()
|
||||||
|
if role:
|
||||||
|
return role
|
||||||
|
|
||||||
|
# try settings; if value then recurse
|
||||||
|
key = self.config.get(f'{self.appname}.role.{key}',
|
||||||
|
session=session)
|
||||||
|
if key:
|
||||||
|
return self.get_role(session, key)
|
||||||
|
|
||||||
|
def get_user(self, obj, **kwargs):
|
||||||
|
"""
|
||||||
|
Return the :class:`~wuttjamaican.db.model.auth.User`
|
||||||
|
associated with the given object, if one can be found.
|
||||||
|
|
||||||
|
This method should accept "any" type of ``obj`` and inspect it
|
||||||
|
to determine if/how a user can be found. It should return the
|
||||||
|
"first, most obvious" user in the event that the given object
|
||||||
|
is associated with multiple users.
|
||||||
|
|
||||||
|
The default logic only knows how to navigate the Person/User
|
||||||
|
relationship, so if ``obj`` is a
|
||||||
|
:class:`~wuttjamaican.db.model.base.Person` then it will
|
||||||
|
return the "first" user account for the person (according to
|
||||||
|
:attr:`~wuttjamaican.db.model.base.Person.user`).
|
||||||
|
|
||||||
|
:returns: :class:`~wuttjamaican.db.model.auth.User` or ``None``.
|
||||||
|
"""
|
||||||
|
model = self.app.model
|
||||||
|
|
||||||
|
if isinstance(obj, model.User):
|
||||||
|
return obj
|
||||||
|
|
||||||
|
person = self.app.get_person(obj)
|
||||||
|
if person:
|
||||||
|
return person.user
|
||||||
|
|
||||||
|
def make_user(self, session=None, **kwargs):
|
||||||
|
"""
|
||||||
|
Make and return a new
|
||||||
|
:class:`~wuttjamaican.db.model.auth.User`.
|
||||||
|
|
||||||
|
This is mostly a simple wrapper around the
|
||||||
|
:class:`~wuttjamaican.db.model.auth.User` constructor. All
|
||||||
|
``kwargs`` are passed on to the constructor as-is, for
|
||||||
|
instance. It also will add the user to the session, if
|
||||||
|
applicable.
|
||||||
|
|
||||||
|
This method also adds one other convenience:
|
||||||
|
|
||||||
|
If there is no ``username`` specified in the ``kwargs`` then
|
||||||
|
it will call :meth:`make_unique_username()` to automatically
|
||||||
|
provide one. (Note that the ``kwargs`` will be passed along
|
||||||
|
to that call as well.)
|
||||||
|
|
||||||
|
:param session: Open :term:`db session`, if applicable.
|
||||||
|
|
||||||
|
:returns: The new :class:`~wuttjamaican.db.model.auth.User`
|
||||||
|
instance.
|
||||||
|
"""
|
||||||
|
model = self.app.model
|
||||||
|
|
||||||
|
if session and 'username' not in kwargs:
|
||||||
|
kwargs['username'] = self.make_unique_username(session, **kwargs)
|
||||||
|
|
||||||
|
user = model.User(**kwargs)
|
||||||
|
if session:
|
||||||
|
session.add(user)
|
||||||
|
return user
|
||||||
|
|
||||||
|
def delete_user(self, user, **kwargs):
|
||||||
|
"""
|
||||||
|
Delete the given user account. Use with caution! As this
|
||||||
|
generally cannot be undone.
|
||||||
|
|
||||||
|
Default behavior simply deletes the user account. Depending
|
||||||
|
on the DB schema and data present, this may cause an error
|
||||||
|
(i.e. if the user is still referenced by other tables).
|
||||||
|
|
||||||
|
:param user: :class:`~wuttjamaican.db.model.auth.User` to
|
||||||
|
delete.
|
||||||
|
"""
|
||||||
|
session = self.app.get_session(user)
|
||||||
|
session.delete(user)
|
||||||
|
|
||||||
|
def make_preferred_username(self, session, **kwargs):
|
||||||
|
"""
|
||||||
|
Generate a "preferred" username, using data from ``kwargs`` as
|
||||||
|
hints.
|
||||||
|
|
||||||
|
Note that ``kwargs`` should be of the same sort that might be
|
||||||
|
passed to the :class:`~wuttjamaican.db.model.auth.User`
|
||||||
|
constructor.
|
||||||
|
|
||||||
|
So far this logic is rather simple:
|
||||||
|
|
||||||
|
If ``kwargs`` contains ``person`` then a username will be
|
||||||
|
constructed using the name data from the person
|
||||||
|
(e.g. ``'john.doe'``).
|
||||||
|
|
||||||
|
In all other cases it will return ``'newuser'``.
|
||||||
|
|
||||||
|
.. note::
|
||||||
|
|
||||||
|
This method does not confirm if the username it generates
|
||||||
|
is actually "available" for a new user. See
|
||||||
|
:meth:`make_unique_username()` for that.
|
||||||
|
|
||||||
|
:param session: Open :term:`db session`.
|
||||||
|
|
||||||
|
:returns: Generated username as string.
|
||||||
|
"""
|
||||||
|
person = kwargs.get('person')
|
||||||
|
if person:
|
||||||
|
first = (person.first_name or '').strip().lower()
|
||||||
|
last = (person.last_name or '').strip().lower()
|
||||||
|
if first and last:
|
||||||
|
return f'{first}.{last}'
|
||||||
|
if first:
|
||||||
|
return first
|
||||||
|
if last:
|
||||||
|
return last
|
||||||
|
|
||||||
|
return 'newuser'
|
||||||
|
|
||||||
|
def make_unique_username(self, session, **kwargs):
|
||||||
|
"""
|
||||||
|
Generate a *unique* username, using data from ``kwargs`` as
|
||||||
|
hints.
|
||||||
|
|
||||||
|
Note that ``kwargs`` should be of the same sort that might be
|
||||||
|
passed to the :class:`~wuttjamaican.db.model.auth.User`
|
||||||
|
constructor.
|
||||||
|
|
||||||
|
This method is a convenience which does two things:
|
||||||
|
|
||||||
|
First it calls :meth:`make_preferred_username()` to obtain the
|
||||||
|
"preferred" username. (It passes all ``kwargs`` along when it
|
||||||
|
makes that call.)
|
||||||
|
|
||||||
|
Then it checks to see if the resulting username is already
|
||||||
|
taken. If it is, then a "counter" is appended to the
|
||||||
|
username, and incremented until a username can be found which
|
||||||
|
is *not* yet taken.
|
||||||
|
|
||||||
|
It returns the first "available" (hence unique) username which
|
||||||
|
is found. Note that it is considered unique and therefore
|
||||||
|
available *at the time*; however this method does not
|
||||||
|
"reserve" the username in any way. It is assumed that you
|
||||||
|
would create the user yourself once you have the username.
|
||||||
|
|
||||||
|
:param session: Open :term:`db session`.
|
||||||
|
|
||||||
|
:returns: Username as string.
|
||||||
|
"""
|
||||||
|
model = self.app.model
|
||||||
|
|
||||||
|
original_username = self.make_preferred_username(session, **kwargs)
|
||||||
|
username = original_username
|
||||||
|
|
||||||
|
# check for unique username
|
||||||
|
counter = 1
|
||||||
|
while True:
|
||||||
|
users = session.query(model.User)\
|
||||||
|
.filter(model.User.username == username)\
|
||||||
|
.count()
|
||||||
|
if not users:
|
||||||
|
break
|
||||||
|
username = f"{original_username}{counter:02d}"
|
||||||
|
counter += 1
|
||||||
|
|
||||||
|
return username
|
||||||
|
|
||||||
|
def set_user_password(self, user, password, **kwargs):
|
||||||
|
"""
|
||||||
|
Set a user's password.
|
||||||
|
|
||||||
|
This will update the
|
||||||
|
:attr:`~wuttjamaican.db.model.auth.User.password` attribute
|
||||||
|
for the user. The value will be hashed using ``bcrypt``.
|
||||||
|
|
||||||
|
:param user: :class:`~wuttjamaican.db.model.auth.User` instance.
|
||||||
|
|
||||||
|
:param password: New password in plain text.
|
||||||
|
"""
|
||||||
|
user.password = password_context.hash(password)
|
||||||
|
|
||||||
|
def get_role_administrator(self, session, **kwargs):
|
||||||
|
"""
|
||||||
|
Returns the special "Administrator" role.
|
||||||
|
"""
|
||||||
|
return self._special_role(session, 'd937fa8a965611dfa0dd001143047286', "Administrator")
|
||||||
|
|
||||||
|
def get_role_anonymous(self, session, **kwargs):
|
||||||
|
"""
|
||||||
|
Returns the special "Anonymous" (aka. "Guest") role.
|
||||||
|
"""
|
||||||
|
return self._special_role(session, 'f8a27c98965a11dfaff7001143047286', "Anonymous")
|
||||||
|
|
||||||
|
def get_role_authenticated(self, session, **kwargs):
|
||||||
|
"""
|
||||||
|
Returns the special "Authenticated" role.
|
||||||
|
"""
|
||||||
|
return self._special_role(session, 'b765a9cc331a11e6ac2a3ca9f40bc550', "Authenticated")
|
||||||
|
|
||||||
|
def get_permissions(self, session, principal,
|
||||||
|
include_anonymous=True,
|
||||||
|
include_authenticated=True):
|
||||||
|
"""
|
||||||
|
Return a set of permission names, which represents all
|
||||||
|
permissions effectively granted to the given user or role.
|
||||||
|
|
||||||
|
:param session: Open :term:`db session`.
|
||||||
|
|
||||||
|
:param principal: :class:`~wuttjamaican.db.model.auth.User` or
|
||||||
|
:class:`~wuttjamaican.db.model.auth.Role` instance. Can
|
||||||
|
also be ``None``, in which case the "Anonymous" role will
|
||||||
|
be assumed.
|
||||||
|
|
||||||
|
:param include_anonymous: Whether the "Anonymous" role should
|
||||||
|
be included when checking permissions. If ``False``, the
|
||||||
|
Anonymous permissions will *not* be checked.
|
||||||
|
|
||||||
|
:param include_authenticated: Whether the "Authenticated" role
|
||||||
|
should be included when checking permissions.
|
||||||
|
|
||||||
|
:returns: Set of permission names.
|
||||||
|
:rtype: set
|
||||||
|
"""
|
||||||
|
# we will use any `roles` attribute which may be present. in
|
||||||
|
# practice we would be assuming a User in this case
|
||||||
|
if hasattr(principal, 'roles'):
|
||||||
|
roles = [role
|
||||||
|
for role in principal.roles
|
||||||
|
if self._role_is_pertinent(role)]
|
||||||
|
|
||||||
|
# here our User assumption gets a little more explicit
|
||||||
|
if include_authenticated:
|
||||||
|
roles.append(self.get_role_authenticated(session))
|
||||||
|
|
||||||
|
# otherwise a non-null principal is assumed to be a Role
|
||||||
|
elif principal is not None:
|
||||||
|
roles = [principal]
|
||||||
|
|
||||||
|
# fallback assumption is "no roles"
|
||||||
|
else:
|
||||||
|
roles = []
|
||||||
|
|
||||||
|
# maybe include anonymous role
|
||||||
|
if include_anonymous:
|
||||||
|
roles.append(self.get_role_anonymous(session))
|
||||||
|
|
||||||
|
# build the permissions cache
|
||||||
|
cache = set()
|
||||||
|
for role in roles:
|
||||||
|
if hasattr(role, 'permissions'):
|
||||||
|
cache.update(role.permissions)
|
||||||
|
|
||||||
|
return cache
|
||||||
|
|
||||||
|
def has_permission(self, session, principal, permission,
|
||||||
|
include_anonymous=True,
|
||||||
|
include_authenticated=True):
|
||||||
|
"""
|
||||||
|
Check if the given user or role has been granted the given
|
||||||
|
permission.
|
||||||
|
|
||||||
|
.. note::
|
||||||
|
|
||||||
|
While this method is perfectly usable, it is a bit "heavy"
|
||||||
|
if you need to make multiple permission checks for the same
|
||||||
|
user. To optimize, call :meth:`get_permissions()` and keep
|
||||||
|
the result, then instead of calling ``has_permission()``
|
||||||
|
just check if a given permission is contained in the cached
|
||||||
|
result set.
|
||||||
|
|
||||||
|
(The logic just described is exactly what this method does,
|
||||||
|
except it will not keep the result set, hence calling it
|
||||||
|
multiple times for same user is not optimal.)
|
||||||
|
|
||||||
|
:param session: Open :term:`db session`.
|
||||||
|
|
||||||
|
:param principal: Either a
|
||||||
|
:class:`~wuttjamaican.db.model.auth.User` or
|
||||||
|
:class:`~wuttjamaican.db.model.auth.Role` instance. It is
|
||||||
|
also expected that this may sometimes be ``None``, in which
|
||||||
|
case the "Anonymous" role will be assumed.
|
||||||
|
|
||||||
|
:param permission: Name of the permission for which to check.
|
||||||
|
|
||||||
|
:param include_anonymous: Whether the "Anonymous" role should
|
||||||
|
be included when checking permissions. If ``False``, then
|
||||||
|
Anonymous permissions will *not* be checked.
|
||||||
|
|
||||||
|
:param include_authenticated: Whether the "Authenticated" role
|
||||||
|
should be included when checking permissions.
|
||||||
|
|
||||||
|
:returns: Boolean indicating if the permission is granted.
|
||||||
|
"""
|
||||||
|
perms = self.get_permissions(session, principal,
|
||||||
|
include_anonymous=include_anonymous,
|
||||||
|
include_authenticated=include_authenticated)
|
||||||
|
return permission in perms
|
||||||
|
|
||||||
|
def grant_permission(self, role, permission, **kwargs):
|
||||||
|
"""
|
||||||
|
Grant a permission to the role. If the role already has the
|
||||||
|
permission, nothing is done.
|
||||||
|
|
||||||
|
:param role: :class:`~wuttjamaican.db.model.auth.Role`
|
||||||
|
instance.
|
||||||
|
|
||||||
|
:param permission: Name of the permission as string.
|
||||||
|
"""
|
||||||
|
if permission not in role.permissions:
|
||||||
|
role.permissions.append(permission)
|
||||||
|
|
||||||
|
def revoke_permission(self, role, permission, **kwargs):
|
||||||
|
"""
|
||||||
|
Revoke a permission from the role. If the role does not have
|
||||||
|
the permission, nothing is done.
|
||||||
|
|
||||||
|
:param role: A :class:`~rattail.db.model.users.Role` instance.
|
||||||
|
|
||||||
|
:param permission: Name of the permission as string.
|
||||||
|
"""
|
||||||
|
if permission in role.permissions:
|
||||||
|
role.permissions.remove(permission)
|
||||||
|
|
||||||
|
##############################
|
||||||
|
# internal methods
|
||||||
|
##############################
|
||||||
|
|
||||||
|
def _role_is_pertinent(self, role):
|
||||||
|
"""
|
||||||
|
Check the role to ensure it is "pertinent" for the current app.
|
||||||
|
|
||||||
|
The idea behind this is for sake of a multi-node system, where
|
||||||
|
users and roles are synced between nodes. Some roles may be
|
||||||
|
defined for only certain types of nodes and hence not
|
||||||
|
"pertinent" for all nodes.
|
||||||
|
|
||||||
|
As of now there is no actual support for that, but this stub
|
||||||
|
method exists for when it will.
|
||||||
|
"""
|
||||||
|
return True
|
||||||
|
|
||||||
|
def _special_role(self, session, uuid, name):
|
||||||
|
"""
|
||||||
|
Fetch a "special" role, creating if needed.
|
||||||
|
"""
|
||||||
|
model = self.app.model
|
||||||
|
role = session.get(model.Role, uuid)
|
||||||
|
if not role:
|
||||||
|
role = model.Role(uuid=uuid, name=name)
|
||||||
|
session.add(role)
|
||||||
|
return role
|
||||||
|
|
|
@ -0,0 +1,45 @@
|
||||||
|
"""add people
|
||||||
|
|
||||||
|
Revision ID: 3abcc44f7f91
|
||||||
|
Revises: d686f7abe3e0
|
||||||
|
Create Date: 2024-07-14 15:14:30.552682
|
||||||
|
|
||||||
|
"""
|
||||||
|
from typing import Sequence, Union
|
||||||
|
|
||||||
|
from alembic import op
|
||||||
|
import sqlalchemy as sa
|
||||||
|
|
||||||
|
|
||||||
|
# revision identifiers, used by Alembic.
|
||||||
|
revision: str = '3abcc44f7f91'
|
||||||
|
down_revision: Union[str, None] = 'd686f7abe3e0'
|
||||||
|
branch_labels: Union[str, Sequence[str], None] = None
|
||||||
|
depends_on: Union[str, Sequence[str], None] = None
|
||||||
|
|
||||||
|
|
||||||
|
def upgrade() -> None:
|
||||||
|
|
||||||
|
# person
|
||||||
|
op.create_table('person',
|
||||||
|
sa.Column('uuid', sa.String(length=32), nullable=False),
|
||||||
|
sa.Column('full_name', sa.String(length=100), nullable=False),
|
||||||
|
sa.Column('first_name', sa.String(length=50), nullable=True),
|
||||||
|
sa.Column('middle_name', sa.String(length=50), nullable=True),
|
||||||
|
sa.Column('last_name', sa.String(length=50), nullable=True),
|
||||||
|
sa.PrimaryKeyConstraint('uuid', name=op.f('pk_person'))
|
||||||
|
)
|
||||||
|
|
||||||
|
# user
|
||||||
|
op.add_column('user', sa.Column('person_uuid', sa.String(length=32), nullable=True))
|
||||||
|
op.create_foreign_key(op.f('fk_user_person_uuid_person'), 'user', 'person', ['person_uuid'], ['uuid'])
|
||||||
|
|
||||||
|
|
||||||
|
def downgrade() -> None:
|
||||||
|
|
||||||
|
# user
|
||||||
|
op.drop_constraint(op.f('fk_user_person_uuid_person'), 'user', type_='foreignkey')
|
||||||
|
op.drop_column('user', 'person_uuid')
|
||||||
|
|
||||||
|
# person
|
||||||
|
op.drop_table('person')
|
|
@ -0,0 +1,73 @@
|
||||||
|
"""add users, roles
|
||||||
|
|
||||||
|
Revision ID: d686f7abe3e0
|
||||||
|
Revises: fc3a3bcaa069
|
||||||
|
Create Date: 2024-07-14 13:27:22.703093
|
||||||
|
|
||||||
|
"""
|
||||||
|
from typing import Sequence, Union
|
||||||
|
|
||||||
|
from alembic import op
|
||||||
|
import sqlalchemy as sa
|
||||||
|
|
||||||
|
|
||||||
|
# revision identifiers, used by Alembic.
|
||||||
|
revision: str = 'd686f7abe3e0'
|
||||||
|
down_revision: Union[str, None] = 'fc3a3bcaa069'
|
||||||
|
branch_labels: Union[str, Sequence[str], None] = None
|
||||||
|
depends_on: Union[str, Sequence[str], None] = None
|
||||||
|
|
||||||
|
|
||||||
|
def upgrade() -> None:
|
||||||
|
|
||||||
|
# role
|
||||||
|
op.create_table('role',
|
||||||
|
sa.Column('uuid', sa.String(length=32), nullable=False),
|
||||||
|
sa.Column('name', sa.String(length=100), nullable=False),
|
||||||
|
sa.Column('notes', sa.Text(), nullable=True),
|
||||||
|
sa.PrimaryKeyConstraint('uuid'),
|
||||||
|
sa.UniqueConstraint('name', name=op.f('uq_role_name'))
|
||||||
|
)
|
||||||
|
|
||||||
|
# user
|
||||||
|
op.create_table('user',
|
||||||
|
sa.Column('uuid', sa.String(length=32), nullable=False),
|
||||||
|
sa.Column('username', sa.String(length=25), nullable=False),
|
||||||
|
sa.Column('password', sa.String(length=60), nullable=True),
|
||||||
|
sa.Column('active', sa.Boolean(), nullable=False),
|
||||||
|
sa.PrimaryKeyConstraint('uuid'),
|
||||||
|
sa.UniqueConstraint('username', name=op.f('uq_user_username'))
|
||||||
|
)
|
||||||
|
|
||||||
|
# permission
|
||||||
|
op.create_table('permission',
|
||||||
|
sa.Column('role_uuid', sa.String(length=32), nullable=False),
|
||||||
|
sa.Column('permission', sa.String(length=254), nullable=False),
|
||||||
|
sa.ForeignKeyConstraint(['role_uuid'], ['role.uuid'], name=op.f('fk_permission_role_uuid_role')),
|
||||||
|
sa.PrimaryKeyConstraint('role_uuid', 'permission')
|
||||||
|
)
|
||||||
|
|
||||||
|
# user_x_role
|
||||||
|
op.create_table('user_x_role',
|
||||||
|
sa.Column('uuid', sa.String(length=32), nullable=False),
|
||||||
|
sa.Column('user_uuid', sa.String(length=32), nullable=False),
|
||||||
|
sa.Column('role_uuid', sa.String(length=32), nullable=False),
|
||||||
|
sa.ForeignKeyConstraint(['role_uuid'], ['role.uuid'], name=op.f('fk_user_x_role_role_uuid_role')),
|
||||||
|
sa.ForeignKeyConstraint(['user_uuid'], ['user.uuid'], name=op.f('fk_user_x_role_user_uuid_user')),
|
||||||
|
sa.PrimaryKeyConstraint('uuid')
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def downgrade() -> None:
|
||||||
|
|
||||||
|
# user_x_role
|
||||||
|
op.drop_table('user_x_role')
|
||||||
|
|
||||||
|
# permission
|
||||||
|
op.drop_table('permission')
|
||||||
|
|
||||||
|
# user
|
||||||
|
op.drop_table('user')
|
||||||
|
|
||||||
|
# role
|
||||||
|
op.drop_table('role')
|
|
@ -31,11 +31,12 @@ The ``wuttjamaican.db.model`` namespace contains the following:
|
||||||
* :func:`~wuttjamaican.db.model.base.uuid_fk_column()`
|
* :func:`~wuttjamaican.db.model.base.uuid_fk_column()`
|
||||||
* :class:`~wuttjamaican.db.model.base.Base`
|
* :class:`~wuttjamaican.db.model.base.Base`
|
||||||
* :class:`~wuttjamaican.db.model.base.Setting`
|
* :class:`~wuttjamaican.db.model.base.Setting`
|
||||||
|
* :class:`~wuttjamaican.db.model.base.Person`
|
||||||
* :class:`~wuttjamaican.db.model.auth.Role`
|
* :class:`~wuttjamaican.db.model.auth.Role`
|
||||||
* :class:`~wuttjamaican.db.model.auth.Permission`
|
* :class:`~wuttjamaican.db.model.auth.Permission`
|
||||||
* :class:`~wuttjamaican.db.model.auth.User`
|
* :class:`~wuttjamaican.db.model.auth.User`
|
||||||
* :class:`~wuttjamaican.db.model.auth.UserRole`
|
* :class:`~wuttjamaican.db.model.auth.UserRole`
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from .base import Base, Setting, uuid_column, uuid_fk_column
|
from .base import uuid_column, uuid_fk_column, Base, Setting, Person
|
||||||
from .auth import Role, Permission, User, UserRole
|
from .auth import Role, Permission, User, UserRole
|
||||||
|
|
|
@ -65,16 +65,10 @@ class Role(Base):
|
||||||
See also :attr:`user_refs`.
|
See also :attr:`user_refs`.
|
||||||
"""
|
"""
|
||||||
__tablename__ = 'role'
|
__tablename__ = 'role'
|
||||||
__table_args__ = (
|
|
||||||
sa.UniqueConstraint('name',
|
|
||||||
# TODO
|
|
||||||
# name='role_uq_name',
|
|
||||||
),
|
|
||||||
)
|
|
||||||
|
|
||||||
uuid = uuid_column()
|
uuid = uuid_column()
|
||||||
|
|
||||||
name = sa.Column(sa.String(length=100), nullable=False, doc="""
|
name = sa.Column(sa.String(length=100), nullable=False, unique=True, doc="""
|
||||||
Name for the role. Each role must have a name, which must be
|
Name for the role. Each role must have a name, which must be
|
||||||
unique.
|
unique.
|
||||||
""")
|
""")
|
||||||
|
@ -86,6 +80,8 @@ class Role(Base):
|
||||||
permission_refs = orm.relationship(
|
permission_refs = orm.relationship(
|
||||||
'Permission',
|
'Permission',
|
||||||
back_populates='role',
|
back_populates='role',
|
||||||
|
cascade='all, delete-orphan',
|
||||||
|
cascade_backrefs=False,
|
||||||
doc="""
|
doc="""
|
||||||
List of :class:`Permission` references for the role.
|
List of :class:`Permission` references for the role.
|
||||||
|
|
||||||
|
@ -101,10 +97,9 @@ class Role(Base):
|
||||||
|
|
||||||
user_refs = orm.relationship(
|
user_refs = orm.relationship(
|
||||||
'UserRole',
|
'UserRole',
|
||||||
# TODO
|
|
||||||
# cascade='all, delete-orphan',
|
|
||||||
# cascade_backrefs=False,
|
|
||||||
back_populates='role',
|
back_populates='role',
|
||||||
|
cascade='all, delete-orphan',
|
||||||
|
cascade_backrefs=False,
|
||||||
doc="""
|
doc="""
|
||||||
List of :class:`UserRole` instances belonging to the role.
|
List of :class:`UserRole` instances belonging to the role.
|
||||||
|
|
||||||
|
@ -127,17 +122,12 @@ class Permission(Base):
|
||||||
Represents a permission granted to a role.
|
Represents a permission granted to a role.
|
||||||
"""
|
"""
|
||||||
__tablename__ = 'permission'
|
__tablename__ = 'permission'
|
||||||
__table_args__ = (
|
|
||||||
sa.ForeignKeyConstraint(['role_uuid'], ['role.uuid'],
|
|
||||||
# TODO
|
|
||||||
# name='permission_fk_role',
|
|
||||||
),
|
|
||||||
)
|
|
||||||
|
|
||||||
role_uuid = uuid_fk_column(primary_key=True, nullable=False)
|
role_uuid = uuid_fk_column('role.uuid', primary_key=True, nullable=False)
|
||||||
role = orm.relationship(
|
role = orm.relationship(
|
||||||
Role,
|
Role,
|
||||||
back_populates='permission_refs',
|
back_populates='permission_refs',
|
||||||
|
cascade_backrefs=False,
|
||||||
doc="""
|
doc="""
|
||||||
Reference to the :class:`Role` for which the permission is
|
Reference to the :class:`Role` for which the permission is
|
||||||
granted.
|
granted.
|
||||||
|
@ -157,18 +147,18 @@ class User(Base):
|
||||||
|
|
||||||
This may or may not correspond to a real person, i.e. some users
|
This may or may not correspond to a real person, i.e. some users
|
||||||
may exist solely for automated tasks.
|
may exist solely for automated tasks.
|
||||||
|
|
||||||
|
.. attribute:: roles
|
||||||
|
|
||||||
|
List of :class:`Role` instances to which the user belongs.
|
||||||
|
|
||||||
|
See also :attr:`role_refs`.
|
||||||
"""
|
"""
|
||||||
__tablename__ = 'user'
|
__tablename__ = 'user'
|
||||||
__table_args__ = (
|
|
||||||
sa.UniqueConstraint('username',
|
|
||||||
# TODO
|
|
||||||
# name='user_uq_username',
|
|
||||||
),
|
|
||||||
)
|
|
||||||
|
|
||||||
uuid = uuid_column()
|
uuid = uuid_column()
|
||||||
|
|
||||||
username = sa.Column(sa.String(length=25), nullable=False, doc="""
|
username = sa.Column(sa.String(length=25), nullable=False, unique=True, doc="""
|
||||||
Account username. This is required and must be unique.
|
Account username. This is required and must be unique.
|
||||||
""")
|
""")
|
||||||
|
|
||||||
|
@ -176,6 +166,18 @@ class User(Base):
|
||||||
Hashed password for login. (The raw password is not stored.)
|
Hashed password for login. (The raw password is not stored.)
|
||||||
""")
|
""")
|
||||||
|
|
||||||
|
person_uuid = uuid_fk_column('person.uuid', nullable=True)
|
||||||
|
person = orm.relationship(
|
||||||
|
'Person',
|
||||||
|
# TODO: seems like this is not needed?
|
||||||
|
# uselist=False,
|
||||||
|
back_populates='users',
|
||||||
|
cascade_backrefs=False,
|
||||||
|
doc="""
|
||||||
|
Reference to the :class:`~wuttjamaican.db.model.base.Person`
|
||||||
|
whose user account this is.
|
||||||
|
""")
|
||||||
|
|
||||||
active = sa.Column(sa.Boolean(), nullable=False, default=True, doc="""
|
active = sa.Column(sa.Boolean(), nullable=False, default=True, doc="""
|
||||||
Flag indicating whether the user account is "active" - it is
|
Flag indicating whether the user account is "active" - it is
|
||||||
``True`` by default.
|
``True`` by default.
|
||||||
|
@ -186,44 +188,53 @@ class User(Base):
|
||||||
role_refs = orm.relationship(
|
role_refs = orm.relationship(
|
||||||
'UserRole',
|
'UserRole',
|
||||||
back_populates='user',
|
back_populates='user',
|
||||||
|
cascade_backrefs=False,
|
||||||
|
# TODO
|
||||||
|
# cascade='all, delete-orphan',
|
||||||
doc="""
|
doc="""
|
||||||
List of :class:`UserRole` records.
|
List of :class:`UserRole` instances belonging to the user.
|
||||||
|
|
||||||
|
See also :attr:`roles`.
|
||||||
""")
|
""")
|
||||||
|
|
||||||
|
roles = association_proxy(
|
||||||
|
'role_refs', 'role',
|
||||||
|
creator=lambda r: UserRole(role=r),
|
||||||
|
# TODO
|
||||||
|
# getset_factory=getset_factory,
|
||||||
|
)
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
|
if self.person:
|
||||||
|
name = str(self.person)
|
||||||
|
if name:
|
||||||
|
return name
|
||||||
return self.username or ""
|
return self.username or ""
|
||||||
|
|
||||||
|
|
||||||
class UserRole(Base):
|
class UserRole(Base):
|
||||||
"""
|
"""
|
||||||
Represents the association between a user and a role.
|
Represents the association between a user and a role; i.e. the
|
||||||
|
user "belongs" or "is assigned" to the role.
|
||||||
"""
|
"""
|
||||||
__tablename__ = 'user_x_role'
|
__tablename__ = 'user_x_role'
|
||||||
__table_args__ = (
|
|
||||||
sa.ForeignKeyConstraint(['user_uuid'], ['user.uuid'],
|
|
||||||
# TODO
|
|
||||||
# name='user_x_role_fk_user',
|
|
||||||
),
|
|
||||||
sa.ForeignKeyConstraint(['role_uuid'], ['role.uuid'],
|
|
||||||
# TODO
|
|
||||||
# name='user_x_role_fk_role',
|
|
||||||
),
|
|
||||||
)
|
|
||||||
|
|
||||||
uuid = uuid_column()
|
uuid = uuid_column()
|
||||||
|
|
||||||
user_uuid = uuid_fk_column(nullable=False)
|
user_uuid = uuid_fk_column('user.uuid', nullable=False)
|
||||||
user = orm.relationship(
|
user = orm.relationship(
|
||||||
User,
|
User,
|
||||||
back_populates='role_refs',
|
back_populates='role_refs',
|
||||||
|
cascade_backrefs=False,
|
||||||
doc="""
|
doc="""
|
||||||
Reference to the :class:`User` involved.
|
Reference to the :class:`User` involved.
|
||||||
""")
|
""")
|
||||||
|
|
||||||
role_uuid = uuid_fk_column(nullable=False)
|
role_uuid = uuid_fk_column('role.uuid', nullable=False)
|
||||||
role = orm.relationship(
|
role = orm.relationship(
|
||||||
Role,
|
Role,
|
||||||
back_populates='user_refs',
|
back_populates='user_refs',
|
||||||
|
cascade_backrefs=False,
|
||||||
doc="""
|
doc="""
|
||||||
Reference to the :class:`Role` involved.
|
Reference to the :class:`Role` involved.
|
||||||
""")
|
""")
|
||||||
|
|
|
@ -34,7 +34,19 @@ from sqlalchemy import orm
|
||||||
from wuttjamaican.util import make_uuid
|
from wuttjamaican.util import make_uuid
|
||||||
|
|
||||||
|
|
||||||
Base = orm.declarative_base()
|
# nb. this convention comes from upstream docs
|
||||||
|
# https://docs.sqlalchemy.org/en/14/core/constraints.html#constraint-naming-conventions
|
||||||
|
naming_convention = {
|
||||||
|
'ix': 'ix_%(column_0_label)s',
|
||||||
|
'uq': 'uq_%(table_name)s_%(column_0_name)s',
|
||||||
|
'ck': 'ck_%(table_name)s_%(constraint_name)s',
|
||||||
|
'fk': 'fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s',
|
||||||
|
'pk': 'pk_%(table_name)s',
|
||||||
|
}
|
||||||
|
|
||||||
|
metadata = sa.MetaData(naming_convention=naming_convention)
|
||||||
|
|
||||||
|
Base = orm.declarative_base(metadata=metadata)
|
||||||
|
|
||||||
|
|
||||||
def uuid_column(*args, **kwargs):
|
def uuid_column(*args, **kwargs):
|
||||||
|
@ -47,11 +59,14 @@ def uuid_column(*args, **kwargs):
|
||||||
return sa.Column(sa.String(length=32), *args, **kwargs)
|
return sa.Column(sa.String(length=32), *args, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
def uuid_fk_column(*args, **kwargs):
|
def uuid_fk_column(target_column, *args, **kwargs):
|
||||||
"""
|
"""
|
||||||
Returns a UUID column for use as a foreign key to another table.
|
Returns a UUID column for use as a foreign key to another table.
|
||||||
|
|
||||||
|
:param target_column: Name of the table column on the remote side,
|
||||||
|
e.g. ``'user.uuid'``.
|
||||||
"""
|
"""
|
||||||
return sa.Column(sa.String(length=32), *args, **kwargs)
|
return sa.Column(sa.String(length=32), sa.ForeignKey(target_column), *args, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
class Setting(Base):
|
class Setting(Base):
|
||||||
|
@ -70,3 +85,77 @@ class Setting(Base):
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
return self.name or ""
|
return self.name or ""
|
||||||
|
|
||||||
|
|
||||||
|
class Person(Base):
|
||||||
|
"""
|
||||||
|
Represents a person.
|
||||||
|
|
||||||
|
The use for this table in the base framework, is to associate with
|
||||||
|
a :class:`~wuttjamaican.db.model.auth.User` to provide first and
|
||||||
|
last name etc. (However a user does not have to be associated
|
||||||
|
with any person.)
|
||||||
|
|
||||||
|
But this table could also be used as a basis for a Customer or
|
||||||
|
Employee relationship etc.
|
||||||
|
"""
|
||||||
|
__tablename__ = 'person'
|
||||||
|
|
||||||
|
uuid = uuid_column()
|
||||||
|
|
||||||
|
full_name = sa.Column(sa.String(length=100), nullable=False, doc="""
|
||||||
|
Full name for the person. Note that this is *required*.
|
||||||
|
""")
|
||||||
|
|
||||||
|
first_name = sa.Column(sa.String(length=50), nullable=True, doc="""
|
||||||
|
The person's first name.
|
||||||
|
""")
|
||||||
|
|
||||||
|
middle_name = sa.Column(sa.String(length=50), nullable=True, doc="""
|
||||||
|
The person's middle name or initial.
|
||||||
|
""")
|
||||||
|
|
||||||
|
last_name = sa.Column(sa.String(length=50), nullable=True, doc="""
|
||||||
|
The person's last name.
|
||||||
|
""")
|
||||||
|
|
||||||
|
users = orm.relationship(
|
||||||
|
'User',
|
||||||
|
back_populates='person',
|
||||||
|
cascade_backrefs=False,
|
||||||
|
doc="""
|
||||||
|
List of :class:`~wuttjamaican.db.model.auth.User` accounts for
|
||||||
|
the person. Typically there is only one user account per
|
||||||
|
person, but technically multiple are supported.
|
||||||
|
""")
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return self.full_name or ""
|
||||||
|
|
||||||
|
@property
|
||||||
|
def user(self):
|
||||||
|
"""
|
||||||
|
Reference to the "first"
|
||||||
|
:class:`~wuttjamaican.db.model.auth.User` account for the
|
||||||
|
person, or ``None``.
|
||||||
|
|
||||||
|
.. warning::
|
||||||
|
|
||||||
|
Note that the database schema supports multiple users per
|
||||||
|
person, but this property logic ignores that and will only
|
||||||
|
ever return "one or none". That might be fine in 99% of
|
||||||
|
cases, but if multiple accounts exist for a person, the one
|
||||||
|
returned is indeterminate.
|
||||||
|
|
||||||
|
See :attr:`users` to access the full list.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# TODO: i'm not crazy about the ambiguity here re: number of
|
||||||
|
# user accounts a person may have. in particular it's not
|
||||||
|
# clear *which* user account would be returned, as there is no
|
||||||
|
# sequence ordinal defined etc. a better approach might be to
|
||||||
|
# force callers to assume the possibility of multiple
|
||||||
|
# user accounts per person? (if so, remove this property)
|
||||||
|
|
||||||
|
if self.users:
|
||||||
|
return self.users[0]
|
||||||
|
|
66
src/wuttjamaican/people.py
Normal file
66
src/wuttjamaican/people.py
Normal file
|
@ -0,0 +1,66 @@
|
||||||
|
# -*- coding: utf-8; -*-
|
||||||
|
################################################################################
|
||||||
|
#
|
||||||
|
# WuttJamaican -- Base package for Wutta Framework
|
||||||
|
# Copyright © 2023-2024 Lance Edgar
|
||||||
|
#
|
||||||
|
# This file is part of Wutta Framework.
|
||||||
|
#
|
||||||
|
# Wutta Framework is free software: you can redistribute it and/or modify it
|
||||||
|
# under the terms of the GNU General Public License as published by the Free
|
||||||
|
# Software Foundation, either version 3 of the License, or (at your option) any
|
||||||
|
# later version.
|
||||||
|
#
|
||||||
|
# Wutta Framework is distributed in the hope that it will be useful, but
|
||||||
|
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
|
||||||
|
# more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU General Public License along with
|
||||||
|
# Wutta Framework. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
#
|
||||||
|
################################################################################
|
||||||
|
"""
|
||||||
|
People Handler
|
||||||
|
|
||||||
|
This is a :term:`handler` to manage "people" in the DB.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from wuttjamaican.app import GenericHandler
|
||||||
|
|
||||||
|
|
||||||
|
class PeopleHandler(GenericHandler):
|
||||||
|
"""
|
||||||
|
Base class and default implementation for the "people"
|
||||||
|
:term:`handler`.
|
||||||
|
|
||||||
|
This is responsible for managing
|
||||||
|
:class:`~wuttjamaican.db.model.base.Person` records, and related
|
||||||
|
things.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def get_person(self, obj, **kwargs):
|
||||||
|
"""
|
||||||
|
Return the :class:`~wuttjamaican.db.model.base.Person`
|
||||||
|
associated with the given object, if one can be found.
|
||||||
|
|
||||||
|
This method should accept "any" type of ``obj`` and inspect it
|
||||||
|
to determine if/how a person can be found. It should return
|
||||||
|
the "first, most obvious" person in the event that the object
|
||||||
|
is associated with multiple people.
|
||||||
|
|
||||||
|
This is a rather fundamental method, in that it is called by
|
||||||
|
several other methods, both within this handler as well as
|
||||||
|
others. There is also a shortcut to it, accessible via
|
||||||
|
:meth:`wuttjamaican.app.AppHandler.get_person()`.
|
||||||
|
"""
|
||||||
|
model = self.app.model
|
||||||
|
|
||||||
|
if isinstance(obj, model.Person):
|
||||||
|
person = obj
|
||||||
|
return person
|
||||||
|
|
||||||
|
elif isinstance(obj, model.User):
|
||||||
|
user = obj
|
||||||
|
if user.person:
|
||||||
|
return user.person
|
|
@ -5,6 +5,7 @@ from unittest import TestCase
|
||||||
try:
|
try:
|
||||||
import sqlalchemy as sa
|
import sqlalchemy as sa
|
||||||
from wuttjamaican.db.model import auth as model
|
from wuttjamaican.db.model import auth as model
|
||||||
|
from wuttjamaican.db.model.base import Person
|
||||||
except ImportError:
|
except ImportError:
|
||||||
pass
|
pass
|
||||||
else:
|
else:
|
||||||
|
@ -29,8 +30,16 @@ else:
|
||||||
|
|
||||||
class TestUser(TestCase):
|
class TestUser(TestCase):
|
||||||
|
|
||||||
def test_basic(self):
|
def test_str(self):
|
||||||
user = model.User()
|
user = model.User()
|
||||||
self.assertEqual(str(user), "")
|
self.assertEqual(str(user), "")
|
||||||
user.username = 'barney'
|
user.username = 'barney'
|
||||||
self.assertEqual(str(user), "barney")
|
self.assertEqual(str(user), "barney")
|
||||||
|
|
||||||
|
def test_str_with_person(self):
|
||||||
|
user = model.User()
|
||||||
|
self.assertEqual(str(user), "")
|
||||||
|
|
||||||
|
person = Person(full_name="Barney Rubble")
|
||||||
|
user.person = person
|
||||||
|
self.assertEqual(str(user), "Barney Rubble")
|
||||||
|
|
|
@ -5,6 +5,7 @@ from unittest import TestCase
|
||||||
try:
|
try:
|
||||||
import sqlalchemy as sa
|
import sqlalchemy as sa
|
||||||
from wuttjamaican.db.model import base as model
|
from wuttjamaican.db.model import base as model
|
||||||
|
from wuttjamaican.db.model.auth import User
|
||||||
except ImportError:
|
except ImportError:
|
||||||
pass
|
pass
|
||||||
else:
|
else:
|
||||||
|
@ -20,7 +21,7 @@ else:
|
||||||
class TestUUIDFKColumn(TestCase):
|
class TestUUIDFKColumn(TestCase):
|
||||||
|
|
||||||
def test_basic(self):
|
def test_basic(self):
|
||||||
column = model.uuid_column()
|
column = model.uuid_fk_column('foo.bar')
|
||||||
self.assertIsInstance(column, sa.Column)
|
self.assertIsInstance(column, sa.Column)
|
||||||
self.assertIsInstance(column.type, sa.String)
|
self.assertIsInstance(column.type, sa.String)
|
||||||
self.assertEqual(column.type.length, 32)
|
self.assertEqual(column.type.length, 32)
|
||||||
|
@ -32,3 +33,19 @@ else:
|
||||||
self.assertEqual(str(setting), "")
|
self.assertEqual(str(setting), "")
|
||||||
setting.name = 'foo'
|
setting.name = 'foo'
|
||||||
self.assertEqual(str(setting), "foo")
|
self.assertEqual(str(setting), "foo")
|
||||||
|
|
||||||
|
class TestPerson(TestCase):
|
||||||
|
|
||||||
|
def test_basic(self):
|
||||||
|
person = model.Person()
|
||||||
|
self.assertEqual(str(person), "")
|
||||||
|
person.full_name = "Barney Rubble"
|
||||||
|
self.assertEqual(str(person), "Barney Rubble")
|
||||||
|
|
||||||
|
def test_users(self):
|
||||||
|
person = model.Person()
|
||||||
|
self.assertIsNone(person.user)
|
||||||
|
|
||||||
|
user = User()
|
||||||
|
person.users.append(user)
|
||||||
|
self.assertIs(person.user, user)
|
||||||
|
|
|
@ -101,17 +101,17 @@ class TestAppHandler(TestCase):
|
||||||
from wuttjamaican.db import model
|
from wuttjamaican.db import model
|
||||||
except ImportError:
|
except ImportError:
|
||||||
pytest.skip("test not relevant without sqlalchemy")
|
pytest.skip("test not relevant without sqlalchemy")
|
||||||
else:
|
|
||||||
self.assertNotIn('model', self.app.__dict__)
|
self.assertNotIn('model', self.app.__dict__)
|
||||||
self.assertIs(self.app.model, model)
|
self.assertIs(self.app.model, model)
|
||||||
|
|
||||||
def test_get_model(self):
|
def test_get_model(self):
|
||||||
try:
|
try:
|
||||||
from wuttjamaican.db import model
|
from wuttjamaican.db import model
|
||||||
except ImportError:
|
except ImportError:
|
||||||
pytest.skip("test not relevant without sqlalchemy")
|
pytest.skip("test not relevant without sqlalchemy")
|
||||||
else:
|
|
||||||
self.assertIs(self.app.get_model(), model)
|
self.assertIs(self.app.get_model(), model)
|
||||||
|
|
||||||
def test_get_title(self):
|
def test_get_title(self):
|
||||||
self.assertEqual(self.app.get_title(), 'WuttJamaican')
|
self.assertEqual(self.app.get_title(), 'WuttJamaican')
|
||||||
|
@ -120,6 +120,44 @@ class TestAppHandler(TestCase):
|
||||||
uuid = self.app.make_uuid()
|
uuid = self.app.make_uuid()
|
||||||
self.assertEqual(len(uuid), 32)
|
self.assertEqual(len(uuid), 32)
|
||||||
|
|
||||||
|
def test_get_session(self):
|
||||||
|
try:
|
||||||
|
import sqlalchemy as sa
|
||||||
|
from sqlalchemy import orm
|
||||||
|
except ImportError:
|
||||||
|
pytest.skip("test not relevant without sqlalchemy")
|
||||||
|
|
||||||
|
model = self.app.model
|
||||||
|
user = model.User()
|
||||||
|
self.assertIsNone(self.app.get_session(user))
|
||||||
|
|
||||||
|
Session = orm.sessionmaker()
|
||||||
|
engine = sa.create_engine('sqlite://')
|
||||||
|
mysession = Session(bind=engine)
|
||||||
|
mysession.add(user)
|
||||||
|
session = self.app.get_session(user)
|
||||||
|
self.assertIs(session, mysession)
|
||||||
|
|
||||||
|
def test_get_person(self):
|
||||||
|
people = self.app.get_people_handler()
|
||||||
|
with patch.object(people, 'get_person') as get_person:
|
||||||
|
get_person.return_value = 'foo'
|
||||||
|
person = self.app.get_person('bar')
|
||||||
|
get_person.assert_called_once_with('bar')
|
||||||
|
self.assertEqual(person, 'foo')
|
||||||
|
|
||||||
|
def test_get_auth_handler(self):
|
||||||
|
from wuttjamaican.auth import AuthHandler
|
||||||
|
|
||||||
|
auth = self.app.get_auth_handler()
|
||||||
|
self.assertIsInstance(auth, AuthHandler)
|
||||||
|
|
||||||
|
def test_get_people_handler(self):
|
||||||
|
from wuttjamaican.people import PeopleHandler
|
||||||
|
|
||||||
|
people = self.app.get_people_handler()
|
||||||
|
self.assertIsInstance(people, PeopleHandler)
|
||||||
|
|
||||||
|
|
||||||
class TestAppProvider(TestCase):
|
class TestAppProvider(TestCase):
|
||||||
|
|
||||||
|
|
|
@ -5,13 +5,353 @@ from unittest import TestCase
|
||||||
from wuttjamaican import auth as mod
|
from wuttjamaican import auth as mod
|
||||||
from wuttjamaican.conf import WuttaConfig
|
from wuttjamaican.conf import WuttaConfig
|
||||||
|
|
||||||
|
try:
|
||||||
|
import sqlalchemy as sa
|
||||||
|
except ImportError:
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
|
||||||
class TestAuthHandler(TestCase):
|
|
||||||
|
|
||||||
def setUp(self):
|
class TestAuthHandler(TestCase):
|
||||||
self.config = WuttaConfig()
|
|
||||||
self.app = self.config.get_app()
|
|
||||||
|
|
||||||
def test_basic(self):
|
def setUp(self):
|
||||||
handler = mod.AuthHandler(self.config)
|
self.config = WuttaConfig()
|
||||||
self.assertIs(handler.app, self.app)
|
self.app = self.config.get_app()
|
||||||
|
self.handler = mod.AuthHandler(self.config)
|
||||||
|
|
||||||
|
self.engine = sa.create_engine('sqlite://')
|
||||||
|
self.app.model.Base.metadata.create_all(bind=self.engine)
|
||||||
|
self.session = self.make_session()
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
self.session.close()
|
||||||
|
self.app.model.Base.metadata.drop_all(bind=self.engine)
|
||||||
|
|
||||||
|
def make_session(self):
|
||||||
|
return self.app.make_session(bind=self.engine)
|
||||||
|
|
||||||
|
def test_authenticate_user(self):
|
||||||
|
model = self.app.model
|
||||||
|
barney = model.User(username='barney')
|
||||||
|
self.handler.set_user_password(barney, 'goodpass')
|
||||||
|
self.session.add(barney)
|
||||||
|
self.session.commit()
|
||||||
|
|
||||||
|
# login ok
|
||||||
|
user = self.handler.authenticate_user(self.session, 'barney', 'goodpass')
|
||||||
|
self.assertIs(user, barney)
|
||||||
|
|
||||||
|
# can also pass user instead of username
|
||||||
|
user = self.handler.authenticate_user(self.session, barney, 'goodpass')
|
||||||
|
self.assertIs(user, barney)
|
||||||
|
|
||||||
|
# bad password
|
||||||
|
user = self.handler.authenticate_user(self.session, 'barney', 'BADPASS')
|
||||||
|
self.assertIsNone(user)
|
||||||
|
|
||||||
|
# bad username
|
||||||
|
user = self.handler.authenticate_user(self.session, 'NOBODY', 'goodpass')
|
||||||
|
self.assertIsNone(user)
|
||||||
|
|
||||||
|
# inactive user
|
||||||
|
user = self.handler.authenticate_user(self.session, 'barney', 'goodpass')
|
||||||
|
self.assertIs(user, barney)
|
||||||
|
barney.active = False
|
||||||
|
user = self.handler.authenticate_user(self.session, 'barney', 'goodpass')
|
||||||
|
self.assertIsNone(user)
|
||||||
|
|
||||||
|
def test_get_role(self):
|
||||||
|
model = self.app.model
|
||||||
|
myrole = model.Role(name="My Role")
|
||||||
|
self.session.add(myrole)
|
||||||
|
self.session.commit()
|
||||||
|
|
||||||
|
# empty key is ignored
|
||||||
|
role = self.handler.get_role(self.session, None)
|
||||||
|
self.assertIsNone(role)
|
||||||
|
|
||||||
|
# key may be uuid
|
||||||
|
role = self.handler.get_role(self.session, myrole.uuid)
|
||||||
|
self.assertIs(role, myrole)
|
||||||
|
|
||||||
|
# key may be name
|
||||||
|
role = self.handler.get_role(self.session, myrole.name)
|
||||||
|
self.assertIs(role, myrole)
|
||||||
|
|
||||||
|
# key may be represented within a setting
|
||||||
|
self.config.usedb = True
|
||||||
|
role = self.handler.get_role(self.session, 'mykey')
|
||||||
|
self.assertIsNone(role)
|
||||||
|
setting = model.Setting(name='wutta.role.mykey', value=myrole.uuid)
|
||||||
|
self.session.add(setting)
|
||||||
|
self.session.commit()
|
||||||
|
role = self.handler.get_role(self.session, 'mykey')
|
||||||
|
self.assertIs(role, myrole)
|
||||||
|
|
||||||
|
def test_get_user(self):
|
||||||
|
model = self.app.model
|
||||||
|
myuser = model.User(username='myuser')
|
||||||
|
self.session.add(myuser)
|
||||||
|
self.session.commit()
|
||||||
|
|
||||||
|
# empty obj is ignored
|
||||||
|
user = self.handler.get_user(None)
|
||||||
|
self.assertIsNone(user)
|
||||||
|
|
||||||
|
# user is returned as-is
|
||||||
|
user = self.handler.get_user(myuser)
|
||||||
|
self.assertIs(user, myuser)
|
||||||
|
|
||||||
|
# find user from person
|
||||||
|
myperson = model.Person(full_name='My Name')
|
||||||
|
self.session.add(myperson)
|
||||||
|
user.person = myperson
|
||||||
|
self.session.commit()
|
||||||
|
user = self.handler.get_user(myperson)
|
||||||
|
self.assertIs(user, myuser)
|
||||||
|
|
||||||
|
def test_make_user(self):
|
||||||
|
model = self.app.model
|
||||||
|
|
||||||
|
# empty user
|
||||||
|
user = self.handler.make_user()
|
||||||
|
self.assertIsInstance(user, model.User)
|
||||||
|
self.assertIsNone(user.username)
|
||||||
|
|
||||||
|
# user is added to session
|
||||||
|
user = self.handler.make_user(session=self.session)
|
||||||
|
self.assertIn(user, self.session)
|
||||||
|
self.session.rollback()
|
||||||
|
self.assertNotIn(user, self.session)
|
||||||
|
|
||||||
|
# default username
|
||||||
|
# nb. this behavior requires a session
|
||||||
|
user = self.handler.make_user(session=self.session)
|
||||||
|
self.assertEqual(user.username, 'newuser')
|
||||||
|
|
||||||
|
def test_delete_user(self):
|
||||||
|
model = self.app.model
|
||||||
|
|
||||||
|
# basics
|
||||||
|
myuser = model.User(username='myuser')
|
||||||
|
self.session.add(myuser)
|
||||||
|
self.session.commit()
|
||||||
|
user = self.session.query(model.User).one()
|
||||||
|
self.assertIs(user, myuser)
|
||||||
|
self.handler.delete_user(user)
|
||||||
|
self.session.commit()
|
||||||
|
self.assertEqual(self.session.query(model.User).count(), 0)
|
||||||
|
|
||||||
|
def test_make_preferred_username(self):
|
||||||
|
model = self.app.model
|
||||||
|
|
||||||
|
# default
|
||||||
|
name = self.handler.make_preferred_username(self.session)
|
||||||
|
self.assertEqual(name, 'newuser')
|
||||||
|
|
||||||
|
# person/first+last
|
||||||
|
person = model.Person(first_name='Barney', last_name='Rubble')
|
||||||
|
name = self.handler.make_preferred_username(self.session, person=person)
|
||||||
|
self.assertEqual(name, 'barney.rubble')
|
||||||
|
|
||||||
|
# person/first
|
||||||
|
person = model.Person(first_name='Barney')
|
||||||
|
name = self.handler.make_preferred_username(self.session, person=person)
|
||||||
|
self.assertEqual(name, 'barney')
|
||||||
|
|
||||||
|
# person/last
|
||||||
|
person = model.Person(last_name='Rubble')
|
||||||
|
name = self.handler.make_preferred_username(self.session, person=person)
|
||||||
|
self.assertEqual(name, 'rubble')
|
||||||
|
|
||||||
|
def test_make_unique_username(self):
|
||||||
|
model = self.app.model
|
||||||
|
|
||||||
|
# default
|
||||||
|
name = self.handler.make_unique_username(self.session)
|
||||||
|
self.assertEqual(name, 'newuser')
|
||||||
|
user = model.User(username=name)
|
||||||
|
self.session.add(user)
|
||||||
|
self.session.commit()
|
||||||
|
|
||||||
|
# counter invoked if name exists
|
||||||
|
name = self.handler.make_unique_username(self.session)
|
||||||
|
self.assertEqual(name, 'newuser01')
|
||||||
|
user = model.User(username=name)
|
||||||
|
self.session.add(user)
|
||||||
|
self.session.commit()
|
||||||
|
|
||||||
|
# starts by getting preferred name
|
||||||
|
person = model.Person(first_name='Barney', last_name='Rubble')
|
||||||
|
name = self.handler.make_unique_username(self.session, person=person)
|
||||||
|
self.assertEqual(name, 'barney.rubble')
|
||||||
|
user = model.User(username=name)
|
||||||
|
self.session.add(user)
|
||||||
|
self.session.commit()
|
||||||
|
|
||||||
|
# counter invoked if name exists
|
||||||
|
name = self.handler.make_unique_username(self.session, person=person)
|
||||||
|
self.assertEqual(name, 'barney.rubble01')
|
||||||
|
|
||||||
|
def test_set_user_password(self):
|
||||||
|
model = self.app.model
|
||||||
|
myuser = model.User(username='myuser')
|
||||||
|
self.session.add(myuser)
|
||||||
|
|
||||||
|
# basics
|
||||||
|
self.assertIsNone(myuser.password)
|
||||||
|
self.handler.set_user_password(myuser, 'goodpass')
|
||||||
|
self.session.commit()
|
||||||
|
self.assertIsNotNone(myuser.password)
|
||||||
|
# nb. password is hashed
|
||||||
|
self.assertNotEqual(myuser.password, 'goodpass')
|
||||||
|
|
||||||
|
# confirm login works with new password
|
||||||
|
user = self.handler.authenticate_user(self.session, 'myuser', 'goodpass')
|
||||||
|
self.assertIs(user, myuser)
|
||||||
|
|
||||||
|
def test_get_role_administrator(self):
|
||||||
|
model = self.app.model
|
||||||
|
|
||||||
|
self.assertEqual(self.session.query(model.Role).count(), 0)
|
||||||
|
role = self.handler.get_role_administrator(self.session)
|
||||||
|
self.assertEqual(self.session.query(model.Role).count(), 1)
|
||||||
|
self.assertEqual(role.name, "Administrator")
|
||||||
|
|
||||||
|
def test_get_role_anonymous(self):
|
||||||
|
model = self.app.model
|
||||||
|
|
||||||
|
self.assertEqual(self.session.query(model.Role).count(), 0)
|
||||||
|
role = self.handler.get_role_anonymous(self.session)
|
||||||
|
self.assertEqual(self.session.query(model.Role).count(), 1)
|
||||||
|
self.assertEqual(role.name, "Anonymous")
|
||||||
|
|
||||||
|
def test_get_role_authenticated(self):
|
||||||
|
model = self.app.model
|
||||||
|
|
||||||
|
self.assertEqual(self.session.query(model.Role).count(), 0)
|
||||||
|
role = self.handler.get_role_authenticated(self.session)
|
||||||
|
self.assertEqual(self.session.query(model.Role).count(), 1)
|
||||||
|
self.assertEqual(role.name, "Authenticated")
|
||||||
|
|
||||||
|
def test_get_permissions(self):
|
||||||
|
model = self.app.model
|
||||||
|
|
||||||
|
# empty default for role
|
||||||
|
role = model.Role()
|
||||||
|
perms = self.handler.get_permissions(self.session, role)
|
||||||
|
self.assertIsInstance(perms, set)
|
||||||
|
self.assertEqual(len(perms), 0)
|
||||||
|
|
||||||
|
# empty default for user
|
||||||
|
user = model.User()
|
||||||
|
perms = self.handler.get_permissions(self.session, user)
|
||||||
|
self.assertIsInstance(perms, set)
|
||||||
|
self.assertEqual(len(perms), 0)
|
||||||
|
|
||||||
|
# role perms
|
||||||
|
myrole = model.Role(name='My Role')
|
||||||
|
self.session.add(myrole)
|
||||||
|
self.handler.grant_permission(myrole, 'foo')
|
||||||
|
self.session.commit()
|
||||||
|
perms = self.handler.get_permissions(self.session, myrole)
|
||||||
|
self.assertEqual(perms, {'foo'})
|
||||||
|
|
||||||
|
# user perms
|
||||||
|
myuser = model.User(username='myuser')
|
||||||
|
self.session.add(myuser)
|
||||||
|
self.session.commit()
|
||||||
|
perms = self.handler.get_permissions(self.session, myuser)
|
||||||
|
self.assertEqual(len(perms), 0)
|
||||||
|
myuser.roles.append(myrole)
|
||||||
|
self.session.commit()
|
||||||
|
perms = self.handler.get_permissions(self.session, myuser)
|
||||||
|
self.assertEqual(perms, {'foo'})
|
||||||
|
|
||||||
|
# invalid principal
|
||||||
|
perms = self.handler.get_permissions(self.session, RuntimeError)
|
||||||
|
self.assertEqual(perms, set())
|
||||||
|
|
||||||
|
# missing principal
|
||||||
|
perms = self.handler.get_permissions(self.session, None)
|
||||||
|
self.assertEqual(perms, set())
|
||||||
|
|
||||||
|
def test_has_permission(self):
|
||||||
|
model = self.app.model
|
||||||
|
|
||||||
|
# false default for role
|
||||||
|
role = model.Role()
|
||||||
|
self.assertFalse(self.handler.has_permission(self.session, role, 'foo'))
|
||||||
|
|
||||||
|
# empty default for user
|
||||||
|
user = model.User()
|
||||||
|
self.assertFalse(self.handler.has_permission(self.session, user, 'foo'))
|
||||||
|
|
||||||
|
# role perms
|
||||||
|
myrole = model.Role(name='My Role')
|
||||||
|
self.session.add(myrole)
|
||||||
|
self.session.commit()
|
||||||
|
self.assertFalse(self.handler.has_permission(self.session, myrole, 'foo'))
|
||||||
|
self.handler.grant_permission(myrole, 'foo')
|
||||||
|
self.session.commit()
|
||||||
|
self.assertTrue(self.handler.has_permission(self.session, myrole, 'foo'))
|
||||||
|
|
||||||
|
# user perms
|
||||||
|
myuser = model.User(username='myuser')
|
||||||
|
self.session.add(myuser)
|
||||||
|
self.session.commit()
|
||||||
|
self.assertFalse(self.handler.has_permission(self.session, myuser, 'foo'))
|
||||||
|
myuser.roles.append(myrole)
|
||||||
|
self.session.commit()
|
||||||
|
self.assertTrue(self.handler.has_permission(self.session, myuser, 'foo'))
|
||||||
|
|
||||||
|
# invalid principal
|
||||||
|
self.assertFalse(self.handler.has_permission(self.session, RuntimeError, 'foo'))
|
||||||
|
|
||||||
|
# missing principal
|
||||||
|
self.assertFalse(self.handler.has_permission(self.session, None, 'foo'))
|
||||||
|
|
||||||
|
def test_grant_permission(self):
|
||||||
|
model = self.app.model
|
||||||
|
myrole = model.Role(name='My Role')
|
||||||
|
self.session.add(myrole)
|
||||||
|
self.session.commit()
|
||||||
|
|
||||||
|
# no perms yet
|
||||||
|
self.assertEqual(self.session.query(model.Permission).count(), 0)
|
||||||
|
|
||||||
|
# grant one perm, and confirm
|
||||||
|
self.handler.grant_permission(myrole, 'foo')
|
||||||
|
self.session.commit()
|
||||||
|
self.assertEqual(self.session.query(model.Permission).count(), 1)
|
||||||
|
perm = self.session.query(model.Permission).one()
|
||||||
|
self.assertIs(perm.role, myrole)
|
||||||
|
self.assertEqual(perm.permission, 'foo')
|
||||||
|
|
||||||
|
# grant same perm again, confirm just one exists
|
||||||
|
self.handler.grant_permission(myrole, 'foo')
|
||||||
|
self.session.commit()
|
||||||
|
self.assertEqual(self.session.query(model.Permission).count(), 1)
|
||||||
|
perm = self.session.query(model.Permission).one()
|
||||||
|
self.assertIs(perm.role, myrole)
|
||||||
|
self.assertEqual(perm.permission, 'foo')
|
||||||
|
|
||||||
|
def test_revoke_permission(self):
|
||||||
|
model = self.app.model
|
||||||
|
myrole = model.Role(name='My Role')
|
||||||
|
self.session.add(myrole)
|
||||||
|
self.handler.grant_permission(myrole, 'foo')
|
||||||
|
self.session.commit()
|
||||||
|
|
||||||
|
# just the one perm
|
||||||
|
self.assertEqual(self.session.query(model.Permission).count(), 1)
|
||||||
|
|
||||||
|
# revoke it, then confirm
|
||||||
|
self.handler.revoke_permission(myrole, 'foo')
|
||||||
|
self.session.commit()
|
||||||
|
self.assertEqual(self.session.query(model.Permission).count(), 0)
|
||||||
|
|
||||||
|
# revoke again, confirm
|
||||||
|
self.handler.revoke_permission(myrole, 'foo')
|
||||||
|
self.session.commit()
|
||||||
|
self.assertEqual(self.session.query(model.Permission).count(), 0)
|
||||||
|
|
52
tests/test_people.py
Normal file
52
tests/test_people.py
Normal file
|
@ -0,0 +1,52 @@
|
||||||
|
# -*- coding: utf-8; -*-
|
||||||
|
|
||||||
|
from unittest import TestCase
|
||||||
|
|
||||||
|
from wuttjamaican import people as mod
|
||||||
|
from wuttjamaican.conf import WuttaConfig
|
||||||
|
|
||||||
|
try:
|
||||||
|
import sqlalchemy as sa
|
||||||
|
except ImportError:
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
|
||||||
|
|
||||||
|
class TestPeopleHandler(TestCase):
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
self.config = WuttaConfig()
|
||||||
|
self.app = self.config.get_app()
|
||||||
|
self.handler = mod.PeopleHandler(self.config)
|
||||||
|
|
||||||
|
self.engine = sa.create_engine('sqlite://')
|
||||||
|
self.app.model.Base.metadata.create_all(bind=self.engine)
|
||||||
|
self.session = self.make_session()
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
self.session.close()
|
||||||
|
self.app.model.Base.metadata.drop_all(bind=self.engine)
|
||||||
|
|
||||||
|
def make_session(self):
|
||||||
|
return self.app.make_session(bind=self.engine)
|
||||||
|
|
||||||
|
def test_get_person(self):
|
||||||
|
model = self.app.model
|
||||||
|
myperson = model.Person(full_name='Barny Rubble')
|
||||||
|
self.session.add(myperson)
|
||||||
|
self.session.commit()
|
||||||
|
|
||||||
|
# empty obj is ignored
|
||||||
|
person = self.handler.get_person(None)
|
||||||
|
self.assertIsNone(person)
|
||||||
|
|
||||||
|
# person is returned as-is
|
||||||
|
person = self.handler.get_person(myperson)
|
||||||
|
self.assertIs(person, myperson)
|
||||||
|
|
||||||
|
# find person from user
|
||||||
|
myuser = model.User(username='barney', person=myperson)
|
||||||
|
self.session.add(myuser)
|
||||||
|
self.session.commit()
|
||||||
|
person = self.handler.get_person(myuser)
|
||||||
|
self.assertIs(person, myperson)
|
Loading…
Reference in a new issue