Compare commits
No commits in common. "ca997807e46518a53539ace73afb9815100ea4b1" and "19950956272ec1cc6f5d2fa874692a4166adbad0" have entirely different histories.
ca997807e4
...
1995095627
11
CHANGELOG.md
11
CHANGELOG.md
|
@ -5,17 +5,6 @@ All notable changes to WuttJamaican will be documented in this file.
|
||||||
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
|
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
|
||||||
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).
|
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).
|
||||||
|
|
||||||
## v0.8.0 (2024-07-14)
|
|
||||||
|
|
||||||
### Feat
|
|
||||||
|
|
||||||
- flesh out the auth handler; add people handler
|
|
||||||
- add model for Person; tie to User
|
|
||||||
|
|
||||||
### Fix
|
|
||||||
|
|
||||||
- add migration for auth tables
|
|
||||||
|
|
||||||
## v0.7.0 (2024-07-14)
|
## v0.7.0 (2024-07-14)
|
||||||
|
|
||||||
### Feat
|
### Feat
|
||||||
|
|
|
@ -17,6 +17,5 @@
|
||||||
db.model.base
|
db.model.base
|
||||||
db.sess
|
db.sess
|
||||||
exc
|
exc
|
||||||
people
|
|
||||||
testing
|
testing
|
||||||
util
|
util
|
||||||
|
|
|
@ -1,6 +0,0 @@
|
||||||
|
|
||||||
``wuttjamaican.people``
|
|
||||||
=======================
|
|
||||||
|
|
||||||
.. automodule:: wuttjamaican.people
|
|
||||||
:members:
|
|
|
@ -109,13 +109,6 @@ Glossary
|
||||||
Most :term:`apps<app>` will have at least one :term:`app
|
Most :term:`apps<app>` will have at least one :term:`app
|
||||||
database`.
|
database`.
|
||||||
|
|
||||||
db session
|
|
||||||
The "session" is a SQLAlchemy abstraction for an open database
|
|
||||||
connection, essentially.
|
|
||||||
|
|
||||||
In practice this generally refers to a
|
|
||||||
:class:`~wuttjamaican.db.sess.Session` instance.
|
|
||||||
|
|
||||||
entry point
|
entry point
|
||||||
This refers to a "setuptools-style" entry point specifically,
|
This refers to a "setuptools-style" entry point specifically,
|
||||||
which is a mechanism used to register "plugins" and the like.
|
which is a mechanism used to register "plugins" and the like.
|
||||||
|
|
|
@ -6,7 +6,7 @@ build-backend = "hatchling.build"
|
||||||
|
|
||||||
[project]
|
[project]
|
||||||
name = "WuttJamaican"
|
name = "WuttJamaican"
|
||||||
version = "0.8.0"
|
version = "0.7.0"
|
||||||
description = "Base package for Wutta Framework"
|
description = "Base package for Wutta Framework"
|
||||||
readme = "README.md"
|
readme = "README.md"
|
||||||
authors = [{name = "Lance Edgar", email = "lance@edbob.org"}]
|
authors = [{name = "Lance Edgar", email = "lance@edbob.org"}]
|
||||||
|
@ -32,7 +32,7 @@ dependencies = [
|
||||||
|
|
||||||
|
|
||||||
[project.optional-dependencies]
|
[project.optional-dependencies]
|
||||||
db = ["SQLAlchemy<2", "alembic", "passlib"]
|
db = ["SQLAlchemy<2", "alembic"]
|
||||||
docs = ["Sphinx", "sphinxcontrib-programoutput", "furo"]
|
docs = ["Sphinx", "sphinxcontrib-programoutput", "furo"]
|
||||||
tests = ["pytest-cov", "tox"]
|
tests = ["pytest-cov", "tox"]
|
||||||
|
|
||||||
|
|
|
@ -66,8 +66,6 @@ class AppHandler:
|
||||||
"""
|
"""
|
||||||
default_app_title = "WuttJamaican"
|
default_app_title = "WuttJamaican"
|
||||||
default_model_spec = 'wuttjamaican.db.model'
|
default_model_spec = 'wuttjamaican.db.model'
|
||||||
default_auth_handler_spec = 'wuttjamaican.auth:AuthHandler'
|
|
||||||
default_people_handler_spec = 'wuttjamaican.people:PeopleHandler'
|
|
||||||
|
|
||||||
def __init__(self, config):
|
def __init__(self, config):
|
||||||
self.config = config
|
self.config = config
|
||||||
|
@ -243,16 +241,6 @@ class AppHandler:
|
||||||
"""
|
"""
|
||||||
return make_uuid()
|
return make_uuid()
|
||||||
|
|
||||||
def get_session(self, obj):
|
|
||||||
"""
|
|
||||||
Returns the SQLAlchemy session with which the given object is
|
|
||||||
associated. Simple convenience wrapper around
|
|
||||||
:func:`sqlalchemy:sqlalchemy.orm.object_session()`.
|
|
||||||
"""
|
|
||||||
from sqlalchemy import orm
|
|
||||||
|
|
||||||
return orm.object_session(obj)
|
|
||||||
|
|
||||||
def short_session(self, **kwargs):
|
def short_session(self, **kwargs):
|
||||||
"""
|
"""
|
||||||
Returns a context manager for a short-lived database session.
|
Returns a context manager for a short-lived database session.
|
||||||
|
@ -291,51 +279,6 @@ class AppHandler:
|
||||||
|
|
||||||
return get_setting(session, name)
|
return get_setting(session, name)
|
||||||
|
|
||||||
##############################
|
|
||||||
# getters for other handlers
|
|
||||||
##############################
|
|
||||||
|
|
||||||
def get_auth_handler(self, **kwargs):
|
|
||||||
"""
|
|
||||||
Get the configured :term:`auth handler`.
|
|
||||||
|
|
||||||
:rtype: :class:`~wuttjamaican.auth.AuthHandler`
|
|
||||||
"""
|
|
||||||
if 'auth' not in self.handlers:
|
|
||||||
spec = self.config.get(f'{self.appname}.auth.handler',
|
|
||||||
default=self.default_auth_handler_spec)
|
|
||||||
factory = self.load_object(spec)
|
|
||||||
self.handlers['auth'] = factory(self.config, **kwargs)
|
|
||||||
return self.handlers['auth']
|
|
||||||
|
|
||||||
def get_people_handler(self, **kwargs):
|
|
||||||
"""
|
|
||||||
Get the configured "people" :term:`handler`.
|
|
||||||
|
|
||||||
:rtype: :class:`~wuttjamaican.people.PeopleHandler`
|
|
||||||
"""
|
|
||||||
if 'people' not in self.handlers:
|
|
||||||
spec = self.config.get(f'{self.appname}.people.handler',
|
|
||||||
default=self.default_people_handler_spec)
|
|
||||||
factory = self.load_object(spec)
|
|
||||||
self.handlers['people'] = factory(self.config, **kwargs)
|
|
||||||
return self.handlers['people']
|
|
||||||
|
|
||||||
##############################
|
|
||||||
# convenience delegators
|
|
||||||
##############################
|
|
||||||
|
|
||||||
def get_person(self, obj, **kwargs):
|
|
||||||
"""
|
|
||||||
Convenience method to locate a
|
|
||||||
:class:`~wuttjamaican.db.model.base.Person` for the given
|
|
||||||
object.
|
|
||||||
|
|
||||||
This delegates to the "people" handler method,
|
|
||||||
:meth:`~wuttjamaican.people.PeopleHandler.get_person()`.
|
|
||||||
"""
|
|
||||||
return self.get_people_handler().get_person(obj, **kwargs)
|
|
||||||
|
|
||||||
|
|
||||||
class AppProvider:
|
class AppProvider:
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -29,16 +29,6 @@ This defines the default :term:`auth handler`.
|
||||||
from wuttjamaican.app import GenericHandler
|
from wuttjamaican.app import GenericHandler
|
||||||
|
|
||||||
|
|
||||||
# nb. this only works if passlib is installed (part of 'db' extra)
|
|
||||||
try:
|
|
||||||
from passlib.context import CryptContext
|
|
||||||
except ImportError: # pragma: no cover
|
|
||||||
pass
|
|
||||||
else:
|
|
||||||
password_context = CryptContext(schemes=['bcrypt'])
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class AuthHandler(GenericHandler):
|
class AuthHandler(GenericHandler):
|
||||||
"""
|
"""
|
||||||
Base class and default implementation for the :term:`auth
|
Base class and default implementation for the :term:`auth
|
||||||
|
@ -47,436 +37,8 @@ class AuthHandler(GenericHandler):
|
||||||
This is responsible for "authentication and authorization" - for
|
This is responsible for "authentication and authorization" - for
|
||||||
instance:
|
instance:
|
||||||
|
|
||||||
* authenticate user from login credentials
|
* create new users, roles
|
||||||
* check which permissions a user/role has
|
|
||||||
* create/modify users, roles
|
|
||||||
* grant/revoke role permissions
|
* grant/revoke role permissions
|
||||||
|
* determine which permissions a user has
|
||||||
|
* identify user from login credentials
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def authenticate_user(self, session, username, password, **kwargs):
|
|
||||||
"""
|
|
||||||
Authenticate the given user credentials, and if successful,
|
|
||||||
return the :class:`~wuttjamaican.db.model.auth.User`.
|
|
||||||
|
|
||||||
Default logic will (try to) locate a user with matching
|
|
||||||
username, then confirm the supplied password is also a match.
|
|
||||||
|
|
||||||
Custom handlers can authenticate against anything else, using
|
|
||||||
the given credentials. But they still must return a "native"
|
|
||||||
``User`` object for the app to consider the authentication
|
|
||||||
successful. The handler may auto-create the user if needed.
|
|
||||||
|
|
||||||
Generally speaking the credentials will have come directly
|
|
||||||
from a user login attempt in the web app etc. Again the
|
|
||||||
default logic assumes a "username" but in practice it may be
|
|
||||||
an email address etc. - whatever the user entered.
|
|
||||||
|
|
||||||
:param session: Open :term:`db session`.
|
|
||||||
|
|
||||||
:param username: Usually a string, but also may be a
|
|
||||||
:class:`~wuttjamaican.db.model.auth.User` instance, in
|
|
||||||
which case no user lookup will occur. (However the user is
|
|
||||||
still authenticated otherwise, i.e. the password must be
|
|
||||||
correct etc.)
|
|
||||||
|
|
||||||
:param password: Password as string.
|
|
||||||
|
|
||||||
:returns: :class:`~wuttjamaican.db.model.auth.User` instance,
|
|
||||||
or ``None``.
|
|
||||||
"""
|
|
||||||
model = self.app.model
|
|
||||||
|
|
||||||
if isinstance(username, model.User):
|
|
||||||
user = username
|
|
||||||
else:
|
|
||||||
user = session.query(model.User)\
|
|
||||||
.filter_by(username=username)\
|
|
||||||
.first()
|
|
||||||
|
|
||||||
if user and user.active and user.password:
|
|
||||||
if password_context.verify(password, user.password):
|
|
||||||
return user
|
|
||||||
|
|
||||||
def get_role(self, session, key, **kwargs):
|
|
||||||
"""
|
|
||||||
Locate and return a :class:`~wuttjamaican.db.model.auth.Role`
|
|
||||||
per the given key, if possible.
|
|
||||||
|
|
||||||
:param session: Open :term:`db session`.
|
|
||||||
|
|
||||||
:param key: Value to use when searching for the role. Can be
|
|
||||||
a UUID or name of a role.
|
|
||||||
|
|
||||||
:returns: :class:`~wuttjamaican.db.model.auth.Role` instance;
|
|
||||||
or ``None``.
|
|
||||||
"""
|
|
||||||
model = self.app.model
|
|
||||||
|
|
||||||
if not key:
|
|
||||||
return
|
|
||||||
|
|
||||||
# try to match on Role.uuid
|
|
||||||
role = session.get(model.Role, key)
|
|
||||||
if role:
|
|
||||||
return role
|
|
||||||
|
|
||||||
# try to match on Role.name
|
|
||||||
role = session.query(model.Role)\
|
|
||||||
.filter_by(name=key)\
|
|
||||||
.first()
|
|
||||||
if role:
|
|
||||||
return role
|
|
||||||
|
|
||||||
# try settings; if value then recurse
|
|
||||||
key = self.config.get(f'{self.appname}.role.{key}',
|
|
||||||
session=session)
|
|
||||||
if key:
|
|
||||||
return self.get_role(session, key)
|
|
||||||
|
|
||||||
def get_user(self, obj, **kwargs):
|
|
||||||
"""
|
|
||||||
Return the :class:`~wuttjamaican.db.model.auth.User`
|
|
||||||
associated with the given object, if one can be found.
|
|
||||||
|
|
||||||
This method should accept "any" type of ``obj`` and inspect it
|
|
||||||
to determine if/how a user can be found. It should return the
|
|
||||||
"first, most obvious" user in the event that the given object
|
|
||||||
is associated with multiple users.
|
|
||||||
|
|
||||||
The default logic only knows how to navigate the Person/User
|
|
||||||
relationship, so if ``obj`` is a
|
|
||||||
:class:`~wuttjamaican.db.model.base.Person` then it will
|
|
||||||
return the "first" user account for the person (according to
|
|
||||||
:attr:`~wuttjamaican.db.model.base.Person.user`).
|
|
||||||
|
|
||||||
:returns: :class:`~wuttjamaican.db.model.auth.User` or ``None``.
|
|
||||||
"""
|
|
||||||
model = self.app.model
|
|
||||||
|
|
||||||
if isinstance(obj, model.User):
|
|
||||||
return obj
|
|
||||||
|
|
||||||
person = self.app.get_person(obj)
|
|
||||||
if person:
|
|
||||||
return person.user
|
|
||||||
|
|
||||||
def make_user(self, session=None, **kwargs):
|
|
||||||
"""
|
|
||||||
Make and return a new
|
|
||||||
:class:`~wuttjamaican.db.model.auth.User`.
|
|
||||||
|
|
||||||
This is mostly a simple wrapper around the
|
|
||||||
:class:`~wuttjamaican.db.model.auth.User` constructor. All
|
|
||||||
``kwargs`` are passed on to the constructor as-is, for
|
|
||||||
instance. It also will add the user to the session, if
|
|
||||||
applicable.
|
|
||||||
|
|
||||||
This method also adds one other convenience:
|
|
||||||
|
|
||||||
If there is no ``username`` specified in the ``kwargs`` then
|
|
||||||
it will call :meth:`make_unique_username()` to automatically
|
|
||||||
provide one. (Note that the ``kwargs`` will be passed along
|
|
||||||
to that call as well.)
|
|
||||||
|
|
||||||
:param session: Open :term:`db session`, if applicable.
|
|
||||||
|
|
||||||
:returns: The new :class:`~wuttjamaican.db.model.auth.User`
|
|
||||||
instance.
|
|
||||||
"""
|
|
||||||
model = self.app.model
|
|
||||||
|
|
||||||
if session and 'username' not in kwargs:
|
|
||||||
kwargs['username'] = self.make_unique_username(session, **kwargs)
|
|
||||||
|
|
||||||
user = model.User(**kwargs)
|
|
||||||
if session:
|
|
||||||
session.add(user)
|
|
||||||
return user
|
|
||||||
|
|
||||||
def delete_user(self, user, **kwargs):
|
|
||||||
"""
|
|
||||||
Delete the given user account. Use with caution! As this
|
|
||||||
generally cannot be undone.
|
|
||||||
|
|
||||||
Default behavior simply deletes the user account. Depending
|
|
||||||
on the DB schema and data present, this may cause an error
|
|
||||||
(i.e. if the user is still referenced by other tables).
|
|
||||||
|
|
||||||
:param user: :class:`~wuttjamaican.db.model.auth.User` to
|
|
||||||
delete.
|
|
||||||
"""
|
|
||||||
session = self.app.get_session(user)
|
|
||||||
session.delete(user)
|
|
||||||
|
|
||||||
def make_preferred_username(self, session, **kwargs):
|
|
||||||
"""
|
|
||||||
Generate a "preferred" username, using data from ``kwargs`` as
|
|
||||||
hints.
|
|
||||||
|
|
||||||
Note that ``kwargs`` should be of the same sort that might be
|
|
||||||
passed to the :class:`~wuttjamaican.db.model.auth.User`
|
|
||||||
constructor.
|
|
||||||
|
|
||||||
So far this logic is rather simple:
|
|
||||||
|
|
||||||
If ``kwargs`` contains ``person`` then a username will be
|
|
||||||
constructed using the name data from the person
|
|
||||||
(e.g. ``'john.doe'``).
|
|
||||||
|
|
||||||
In all other cases it will return ``'newuser'``.
|
|
||||||
|
|
||||||
.. note::
|
|
||||||
|
|
||||||
This method does not confirm if the username it generates
|
|
||||||
is actually "available" for a new user. See
|
|
||||||
:meth:`make_unique_username()` for that.
|
|
||||||
|
|
||||||
:param session: Open :term:`db session`.
|
|
||||||
|
|
||||||
:returns: Generated username as string.
|
|
||||||
"""
|
|
||||||
person = kwargs.get('person')
|
|
||||||
if person:
|
|
||||||
first = (person.first_name or '').strip().lower()
|
|
||||||
last = (person.last_name or '').strip().lower()
|
|
||||||
if first and last:
|
|
||||||
return f'{first}.{last}'
|
|
||||||
if first:
|
|
||||||
return first
|
|
||||||
if last:
|
|
||||||
return last
|
|
||||||
|
|
||||||
return 'newuser'
|
|
||||||
|
|
||||||
def make_unique_username(self, session, **kwargs):
|
|
||||||
"""
|
|
||||||
Generate a *unique* username, using data from ``kwargs`` as
|
|
||||||
hints.
|
|
||||||
|
|
||||||
Note that ``kwargs`` should be of the same sort that might be
|
|
||||||
passed to the :class:`~wuttjamaican.db.model.auth.User`
|
|
||||||
constructor.
|
|
||||||
|
|
||||||
This method is a convenience which does two things:
|
|
||||||
|
|
||||||
First it calls :meth:`make_preferred_username()` to obtain the
|
|
||||||
"preferred" username. (It passes all ``kwargs`` along when it
|
|
||||||
makes that call.)
|
|
||||||
|
|
||||||
Then it checks to see if the resulting username is already
|
|
||||||
taken. If it is, then a "counter" is appended to the
|
|
||||||
username, and incremented until a username can be found which
|
|
||||||
is *not* yet taken.
|
|
||||||
|
|
||||||
It returns the first "available" (hence unique) username which
|
|
||||||
is found. Note that it is considered unique and therefore
|
|
||||||
available *at the time*; however this method does not
|
|
||||||
"reserve" the username in any way. It is assumed that you
|
|
||||||
would create the user yourself once you have the username.
|
|
||||||
|
|
||||||
:param session: Open :term:`db session`.
|
|
||||||
|
|
||||||
:returns: Username as string.
|
|
||||||
"""
|
|
||||||
model = self.app.model
|
|
||||||
|
|
||||||
original_username = self.make_preferred_username(session, **kwargs)
|
|
||||||
username = original_username
|
|
||||||
|
|
||||||
# check for unique username
|
|
||||||
counter = 1
|
|
||||||
while True:
|
|
||||||
users = session.query(model.User)\
|
|
||||||
.filter(model.User.username == username)\
|
|
||||||
.count()
|
|
||||||
if not users:
|
|
||||||
break
|
|
||||||
username = f"{original_username}{counter:02d}"
|
|
||||||
counter += 1
|
|
||||||
|
|
||||||
return username
|
|
||||||
|
|
||||||
def set_user_password(self, user, password, **kwargs):
|
|
||||||
"""
|
|
||||||
Set a user's password.
|
|
||||||
|
|
||||||
This will update the
|
|
||||||
:attr:`~wuttjamaican.db.model.auth.User.password` attribute
|
|
||||||
for the user. The value will be hashed using ``bcrypt``.
|
|
||||||
|
|
||||||
:param user: :class:`~wuttjamaican.db.model.auth.User` instance.
|
|
||||||
|
|
||||||
:param password: New password in plain text.
|
|
||||||
"""
|
|
||||||
user.password = password_context.hash(password)
|
|
||||||
|
|
||||||
def get_role_administrator(self, session, **kwargs):
|
|
||||||
"""
|
|
||||||
Returns the special "Administrator" role.
|
|
||||||
"""
|
|
||||||
return self._special_role(session, 'd937fa8a965611dfa0dd001143047286', "Administrator")
|
|
||||||
|
|
||||||
def get_role_anonymous(self, session, **kwargs):
|
|
||||||
"""
|
|
||||||
Returns the special "Anonymous" (aka. "Guest") role.
|
|
||||||
"""
|
|
||||||
return self._special_role(session, 'f8a27c98965a11dfaff7001143047286', "Anonymous")
|
|
||||||
|
|
||||||
def get_role_authenticated(self, session, **kwargs):
|
|
||||||
"""
|
|
||||||
Returns the special "Authenticated" role.
|
|
||||||
"""
|
|
||||||
return self._special_role(session, 'b765a9cc331a11e6ac2a3ca9f40bc550', "Authenticated")
|
|
||||||
|
|
||||||
def get_permissions(self, session, principal,
|
|
||||||
include_anonymous=True,
|
|
||||||
include_authenticated=True):
|
|
||||||
"""
|
|
||||||
Return a set of permission names, which represents all
|
|
||||||
permissions effectively granted to the given user or role.
|
|
||||||
|
|
||||||
:param session: Open :term:`db session`.
|
|
||||||
|
|
||||||
:param principal: :class:`~wuttjamaican.db.model.auth.User` or
|
|
||||||
:class:`~wuttjamaican.db.model.auth.Role` instance. Can
|
|
||||||
also be ``None``, in which case the "Anonymous" role will
|
|
||||||
be assumed.
|
|
||||||
|
|
||||||
:param include_anonymous: Whether the "Anonymous" role should
|
|
||||||
be included when checking permissions. If ``False``, the
|
|
||||||
Anonymous permissions will *not* be checked.
|
|
||||||
|
|
||||||
:param include_authenticated: Whether the "Authenticated" role
|
|
||||||
should be included when checking permissions.
|
|
||||||
|
|
||||||
:returns: Set of permission names.
|
|
||||||
:rtype: set
|
|
||||||
"""
|
|
||||||
# we will use any `roles` attribute which may be present. in
|
|
||||||
# practice we would be assuming a User in this case
|
|
||||||
if hasattr(principal, 'roles'):
|
|
||||||
roles = [role
|
|
||||||
for role in principal.roles
|
|
||||||
if self._role_is_pertinent(role)]
|
|
||||||
|
|
||||||
# here our User assumption gets a little more explicit
|
|
||||||
if include_authenticated:
|
|
||||||
roles.append(self.get_role_authenticated(session))
|
|
||||||
|
|
||||||
# otherwise a non-null principal is assumed to be a Role
|
|
||||||
elif principal is not None:
|
|
||||||
roles = [principal]
|
|
||||||
|
|
||||||
# fallback assumption is "no roles"
|
|
||||||
else:
|
|
||||||
roles = []
|
|
||||||
|
|
||||||
# maybe include anonymous role
|
|
||||||
if include_anonymous:
|
|
||||||
roles.append(self.get_role_anonymous(session))
|
|
||||||
|
|
||||||
# build the permissions cache
|
|
||||||
cache = set()
|
|
||||||
for role in roles:
|
|
||||||
if hasattr(role, 'permissions'):
|
|
||||||
cache.update(role.permissions)
|
|
||||||
|
|
||||||
return cache
|
|
||||||
|
|
||||||
def has_permission(self, session, principal, permission,
|
|
||||||
include_anonymous=True,
|
|
||||||
include_authenticated=True):
|
|
||||||
"""
|
|
||||||
Check if the given user or role has been granted the given
|
|
||||||
permission.
|
|
||||||
|
|
||||||
.. note::
|
|
||||||
|
|
||||||
While this method is perfectly usable, it is a bit "heavy"
|
|
||||||
if you need to make multiple permission checks for the same
|
|
||||||
user. To optimize, call :meth:`get_permissions()` and keep
|
|
||||||
the result, then instead of calling ``has_permission()``
|
|
||||||
just check if a given permission is contained in the cached
|
|
||||||
result set.
|
|
||||||
|
|
||||||
(The logic just described is exactly what this method does,
|
|
||||||
except it will not keep the result set, hence calling it
|
|
||||||
multiple times for same user is not optimal.)
|
|
||||||
|
|
||||||
:param session: Open :term:`db session`.
|
|
||||||
|
|
||||||
:param principal: Either a
|
|
||||||
:class:`~wuttjamaican.db.model.auth.User` or
|
|
||||||
:class:`~wuttjamaican.db.model.auth.Role` instance. It is
|
|
||||||
also expected that this may sometimes be ``None``, in which
|
|
||||||
case the "Anonymous" role will be assumed.
|
|
||||||
|
|
||||||
:param permission: Name of the permission for which to check.
|
|
||||||
|
|
||||||
:param include_anonymous: Whether the "Anonymous" role should
|
|
||||||
be included when checking permissions. If ``False``, then
|
|
||||||
Anonymous permissions will *not* be checked.
|
|
||||||
|
|
||||||
:param include_authenticated: Whether the "Authenticated" role
|
|
||||||
should be included when checking permissions.
|
|
||||||
|
|
||||||
:returns: Boolean indicating if the permission is granted.
|
|
||||||
"""
|
|
||||||
perms = self.get_permissions(session, principal,
|
|
||||||
include_anonymous=include_anonymous,
|
|
||||||
include_authenticated=include_authenticated)
|
|
||||||
return permission in perms
|
|
||||||
|
|
||||||
def grant_permission(self, role, permission, **kwargs):
|
|
||||||
"""
|
|
||||||
Grant a permission to the role. If the role already has the
|
|
||||||
permission, nothing is done.
|
|
||||||
|
|
||||||
:param role: :class:`~wuttjamaican.db.model.auth.Role`
|
|
||||||
instance.
|
|
||||||
|
|
||||||
:param permission: Name of the permission as string.
|
|
||||||
"""
|
|
||||||
if permission not in role.permissions:
|
|
||||||
role.permissions.append(permission)
|
|
||||||
|
|
||||||
def revoke_permission(self, role, permission, **kwargs):
|
|
||||||
"""
|
|
||||||
Revoke a permission from the role. If the role does not have
|
|
||||||
the permission, nothing is done.
|
|
||||||
|
|
||||||
:param role: A :class:`~rattail.db.model.users.Role` instance.
|
|
||||||
|
|
||||||
:param permission: Name of the permission as string.
|
|
||||||
"""
|
|
||||||
if permission in role.permissions:
|
|
||||||
role.permissions.remove(permission)
|
|
||||||
|
|
||||||
##############################
|
|
||||||
# internal methods
|
|
||||||
##############################
|
|
||||||
|
|
||||||
def _role_is_pertinent(self, role):
|
|
||||||
"""
|
|
||||||
Check the role to ensure it is "pertinent" for the current app.
|
|
||||||
|
|
||||||
The idea behind this is for sake of a multi-node system, where
|
|
||||||
users and roles are synced between nodes. Some roles may be
|
|
||||||
defined for only certain types of nodes and hence not
|
|
||||||
"pertinent" for all nodes.
|
|
||||||
|
|
||||||
As of now there is no actual support for that, but this stub
|
|
||||||
method exists for when it will.
|
|
||||||
"""
|
|
||||||
return True
|
|
||||||
|
|
||||||
def _special_role(self, session, uuid, name):
|
|
||||||
"""
|
|
||||||
Fetch a "special" role, creating if needed.
|
|
||||||
"""
|
|
||||||
model = self.app.model
|
|
||||||
role = session.get(model.Role, uuid)
|
|
||||||
if not role:
|
|
||||||
role = model.Role(uuid=uuid, name=name)
|
|
||||||
session.add(role)
|
|
||||||
return role
|
|
||||||
|
|
|
@ -1,45 +0,0 @@
|
||||||
"""add people
|
|
||||||
|
|
||||||
Revision ID: 3abcc44f7f91
|
|
||||||
Revises: d686f7abe3e0
|
|
||||||
Create Date: 2024-07-14 15:14:30.552682
|
|
||||||
|
|
||||||
"""
|
|
||||||
from typing import Sequence, Union
|
|
||||||
|
|
||||||
from alembic import op
|
|
||||||
import sqlalchemy as sa
|
|
||||||
|
|
||||||
|
|
||||||
# revision identifiers, used by Alembic.
|
|
||||||
revision: str = '3abcc44f7f91'
|
|
||||||
down_revision: Union[str, None] = 'd686f7abe3e0'
|
|
||||||
branch_labels: Union[str, Sequence[str], None] = None
|
|
||||||
depends_on: Union[str, Sequence[str], None] = None
|
|
||||||
|
|
||||||
|
|
||||||
def upgrade() -> None:
|
|
||||||
|
|
||||||
# person
|
|
||||||
op.create_table('person',
|
|
||||||
sa.Column('uuid', sa.String(length=32), nullable=False),
|
|
||||||
sa.Column('full_name', sa.String(length=100), nullable=False),
|
|
||||||
sa.Column('first_name', sa.String(length=50), nullable=True),
|
|
||||||
sa.Column('middle_name', sa.String(length=50), nullable=True),
|
|
||||||
sa.Column('last_name', sa.String(length=50), nullable=True),
|
|
||||||
sa.PrimaryKeyConstraint('uuid', name=op.f('pk_person'))
|
|
||||||
)
|
|
||||||
|
|
||||||
# user
|
|
||||||
op.add_column('user', sa.Column('person_uuid', sa.String(length=32), nullable=True))
|
|
||||||
op.create_foreign_key(op.f('fk_user_person_uuid_person'), 'user', 'person', ['person_uuid'], ['uuid'])
|
|
||||||
|
|
||||||
|
|
||||||
def downgrade() -> None:
|
|
||||||
|
|
||||||
# user
|
|
||||||
op.drop_constraint(op.f('fk_user_person_uuid_person'), 'user', type_='foreignkey')
|
|
||||||
op.drop_column('user', 'person_uuid')
|
|
||||||
|
|
||||||
# person
|
|
||||||
op.drop_table('person')
|
|
|
@ -1,73 +0,0 @@
|
||||||
"""add users, roles
|
|
||||||
|
|
||||||
Revision ID: d686f7abe3e0
|
|
||||||
Revises: fc3a3bcaa069
|
|
||||||
Create Date: 2024-07-14 13:27:22.703093
|
|
||||||
|
|
||||||
"""
|
|
||||||
from typing import Sequence, Union
|
|
||||||
|
|
||||||
from alembic import op
|
|
||||||
import sqlalchemy as sa
|
|
||||||
|
|
||||||
|
|
||||||
# revision identifiers, used by Alembic.
|
|
||||||
revision: str = 'd686f7abe3e0'
|
|
||||||
down_revision: Union[str, None] = 'fc3a3bcaa069'
|
|
||||||
branch_labels: Union[str, Sequence[str], None] = None
|
|
||||||
depends_on: Union[str, Sequence[str], None] = None
|
|
||||||
|
|
||||||
|
|
||||||
def upgrade() -> None:
|
|
||||||
|
|
||||||
# role
|
|
||||||
op.create_table('role',
|
|
||||||
sa.Column('uuid', sa.String(length=32), nullable=False),
|
|
||||||
sa.Column('name', sa.String(length=100), nullable=False),
|
|
||||||
sa.Column('notes', sa.Text(), nullable=True),
|
|
||||||
sa.PrimaryKeyConstraint('uuid'),
|
|
||||||
sa.UniqueConstraint('name', name=op.f('uq_role_name'))
|
|
||||||
)
|
|
||||||
|
|
||||||
# user
|
|
||||||
op.create_table('user',
|
|
||||||
sa.Column('uuid', sa.String(length=32), nullable=False),
|
|
||||||
sa.Column('username', sa.String(length=25), nullable=False),
|
|
||||||
sa.Column('password', sa.String(length=60), nullable=True),
|
|
||||||
sa.Column('active', sa.Boolean(), nullable=False),
|
|
||||||
sa.PrimaryKeyConstraint('uuid'),
|
|
||||||
sa.UniqueConstraint('username', name=op.f('uq_user_username'))
|
|
||||||
)
|
|
||||||
|
|
||||||
# permission
|
|
||||||
op.create_table('permission',
|
|
||||||
sa.Column('role_uuid', sa.String(length=32), nullable=False),
|
|
||||||
sa.Column('permission', sa.String(length=254), nullable=False),
|
|
||||||
sa.ForeignKeyConstraint(['role_uuid'], ['role.uuid'], name=op.f('fk_permission_role_uuid_role')),
|
|
||||||
sa.PrimaryKeyConstraint('role_uuid', 'permission')
|
|
||||||
)
|
|
||||||
|
|
||||||
# user_x_role
|
|
||||||
op.create_table('user_x_role',
|
|
||||||
sa.Column('uuid', sa.String(length=32), nullable=False),
|
|
||||||
sa.Column('user_uuid', sa.String(length=32), nullable=False),
|
|
||||||
sa.Column('role_uuid', sa.String(length=32), nullable=False),
|
|
||||||
sa.ForeignKeyConstraint(['role_uuid'], ['role.uuid'], name=op.f('fk_user_x_role_role_uuid_role')),
|
|
||||||
sa.ForeignKeyConstraint(['user_uuid'], ['user.uuid'], name=op.f('fk_user_x_role_user_uuid_user')),
|
|
||||||
sa.PrimaryKeyConstraint('uuid')
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def downgrade() -> None:
|
|
||||||
|
|
||||||
# user_x_role
|
|
||||||
op.drop_table('user_x_role')
|
|
||||||
|
|
||||||
# permission
|
|
||||||
op.drop_table('permission')
|
|
||||||
|
|
||||||
# user
|
|
||||||
op.drop_table('user')
|
|
||||||
|
|
||||||
# role
|
|
||||||
op.drop_table('role')
|
|
|
@ -31,12 +31,11 @@ The ``wuttjamaican.db.model`` namespace contains the following:
|
||||||
* :func:`~wuttjamaican.db.model.base.uuid_fk_column()`
|
* :func:`~wuttjamaican.db.model.base.uuid_fk_column()`
|
||||||
* :class:`~wuttjamaican.db.model.base.Base`
|
* :class:`~wuttjamaican.db.model.base.Base`
|
||||||
* :class:`~wuttjamaican.db.model.base.Setting`
|
* :class:`~wuttjamaican.db.model.base.Setting`
|
||||||
* :class:`~wuttjamaican.db.model.base.Person`
|
|
||||||
* :class:`~wuttjamaican.db.model.auth.Role`
|
* :class:`~wuttjamaican.db.model.auth.Role`
|
||||||
* :class:`~wuttjamaican.db.model.auth.Permission`
|
* :class:`~wuttjamaican.db.model.auth.Permission`
|
||||||
* :class:`~wuttjamaican.db.model.auth.User`
|
* :class:`~wuttjamaican.db.model.auth.User`
|
||||||
* :class:`~wuttjamaican.db.model.auth.UserRole`
|
* :class:`~wuttjamaican.db.model.auth.UserRole`
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from .base import uuid_column, uuid_fk_column, Base, Setting, Person
|
from .base import Base, Setting, uuid_column, uuid_fk_column
|
||||||
from .auth import Role, Permission, User, UserRole
|
from .auth import Role, Permission, User, UserRole
|
||||||
|
|
|
@ -65,10 +65,16 @@ class Role(Base):
|
||||||
See also :attr:`user_refs`.
|
See also :attr:`user_refs`.
|
||||||
"""
|
"""
|
||||||
__tablename__ = 'role'
|
__tablename__ = 'role'
|
||||||
|
__table_args__ = (
|
||||||
|
sa.UniqueConstraint('name',
|
||||||
|
# TODO
|
||||||
|
# name='role_uq_name',
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
uuid = uuid_column()
|
uuid = uuid_column()
|
||||||
|
|
||||||
name = sa.Column(sa.String(length=100), nullable=False, unique=True, doc="""
|
name = sa.Column(sa.String(length=100), nullable=False, doc="""
|
||||||
Name for the role. Each role must have a name, which must be
|
Name for the role. Each role must have a name, which must be
|
||||||
unique.
|
unique.
|
||||||
""")
|
""")
|
||||||
|
@ -80,8 +86,6 @@ class Role(Base):
|
||||||
permission_refs = orm.relationship(
|
permission_refs = orm.relationship(
|
||||||
'Permission',
|
'Permission',
|
||||||
back_populates='role',
|
back_populates='role',
|
||||||
cascade='all, delete-orphan',
|
|
||||||
cascade_backrefs=False,
|
|
||||||
doc="""
|
doc="""
|
||||||
List of :class:`Permission` references for the role.
|
List of :class:`Permission` references for the role.
|
||||||
|
|
||||||
|
@ -97,9 +101,10 @@ class Role(Base):
|
||||||
|
|
||||||
user_refs = orm.relationship(
|
user_refs = orm.relationship(
|
||||||
'UserRole',
|
'UserRole',
|
||||||
|
# TODO
|
||||||
|
# cascade='all, delete-orphan',
|
||||||
|
# cascade_backrefs=False,
|
||||||
back_populates='role',
|
back_populates='role',
|
||||||
cascade='all, delete-orphan',
|
|
||||||
cascade_backrefs=False,
|
|
||||||
doc="""
|
doc="""
|
||||||
List of :class:`UserRole` instances belonging to the role.
|
List of :class:`UserRole` instances belonging to the role.
|
||||||
|
|
||||||
|
@ -122,12 +127,17 @@ class Permission(Base):
|
||||||
Represents a permission granted to a role.
|
Represents a permission granted to a role.
|
||||||
"""
|
"""
|
||||||
__tablename__ = 'permission'
|
__tablename__ = 'permission'
|
||||||
|
__table_args__ = (
|
||||||
|
sa.ForeignKeyConstraint(['role_uuid'], ['role.uuid'],
|
||||||
|
# TODO
|
||||||
|
# name='permission_fk_role',
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
role_uuid = uuid_fk_column('role.uuid', primary_key=True, nullable=False)
|
role_uuid = uuid_fk_column(primary_key=True, nullable=False)
|
||||||
role = orm.relationship(
|
role = orm.relationship(
|
||||||
Role,
|
Role,
|
||||||
back_populates='permission_refs',
|
back_populates='permission_refs',
|
||||||
cascade_backrefs=False,
|
|
||||||
doc="""
|
doc="""
|
||||||
Reference to the :class:`Role` for which the permission is
|
Reference to the :class:`Role` for which the permission is
|
||||||
granted.
|
granted.
|
||||||
|
@ -147,18 +157,18 @@ class User(Base):
|
||||||
|
|
||||||
This may or may not correspond to a real person, i.e. some users
|
This may or may not correspond to a real person, i.e. some users
|
||||||
may exist solely for automated tasks.
|
may exist solely for automated tasks.
|
||||||
|
|
||||||
.. attribute:: roles
|
|
||||||
|
|
||||||
List of :class:`Role` instances to which the user belongs.
|
|
||||||
|
|
||||||
See also :attr:`role_refs`.
|
|
||||||
"""
|
"""
|
||||||
__tablename__ = 'user'
|
__tablename__ = 'user'
|
||||||
|
__table_args__ = (
|
||||||
|
sa.UniqueConstraint('username',
|
||||||
|
# TODO
|
||||||
|
# name='user_uq_username',
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
uuid = uuid_column()
|
uuid = uuid_column()
|
||||||
|
|
||||||
username = sa.Column(sa.String(length=25), nullable=False, unique=True, doc="""
|
username = sa.Column(sa.String(length=25), nullable=False, doc="""
|
||||||
Account username. This is required and must be unique.
|
Account username. This is required and must be unique.
|
||||||
""")
|
""")
|
||||||
|
|
||||||
|
@ -166,18 +176,6 @@ class User(Base):
|
||||||
Hashed password for login. (The raw password is not stored.)
|
Hashed password for login. (The raw password is not stored.)
|
||||||
""")
|
""")
|
||||||
|
|
||||||
person_uuid = uuid_fk_column('person.uuid', nullable=True)
|
|
||||||
person = orm.relationship(
|
|
||||||
'Person',
|
|
||||||
# TODO: seems like this is not needed?
|
|
||||||
# uselist=False,
|
|
||||||
back_populates='users',
|
|
||||||
cascade_backrefs=False,
|
|
||||||
doc="""
|
|
||||||
Reference to the :class:`~wuttjamaican.db.model.base.Person`
|
|
||||||
whose user account this is.
|
|
||||||
""")
|
|
||||||
|
|
||||||
active = sa.Column(sa.Boolean(), nullable=False, default=True, doc="""
|
active = sa.Column(sa.Boolean(), nullable=False, default=True, doc="""
|
||||||
Flag indicating whether the user account is "active" - it is
|
Flag indicating whether the user account is "active" - it is
|
||||||
``True`` by default.
|
``True`` by default.
|
||||||
|
@ -188,53 +186,44 @@ class User(Base):
|
||||||
role_refs = orm.relationship(
|
role_refs = orm.relationship(
|
||||||
'UserRole',
|
'UserRole',
|
||||||
back_populates='user',
|
back_populates='user',
|
||||||
cascade_backrefs=False,
|
|
||||||
# TODO
|
|
||||||
# cascade='all, delete-orphan',
|
|
||||||
doc="""
|
doc="""
|
||||||
List of :class:`UserRole` instances belonging to the user.
|
List of :class:`UserRole` records.
|
||||||
|
|
||||||
See also :attr:`roles`.
|
|
||||||
""")
|
""")
|
||||||
|
|
||||||
roles = association_proxy(
|
|
||||||
'role_refs', 'role',
|
|
||||||
creator=lambda r: UserRole(role=r),
|
|
||||||
# TODO
|
|
||||||
# getset_factory=getset_factory,
|
|
||||||
)
|
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
if self.person:
|
|
||||||
name = str(self.person)
|
|
||||||
if name:
|
|
||||||
return name
|
|
||||||
return self.username or ""
|
return self.username or ""
|
||||||
|
|
||||||
|
|
||||||
class UserRole(Base):
|
class UserRole(Base):
|
||||||
"""
|
"""
|
||||||
Represents the association between a user and a role; i.e. the
|
Represents the association between a user and a role.
|
||||||
user "belongs" or "is assigned" to the role.
|
|
||||||
"""
|
"""
|
||||||
__tablename__ = 'user_x_role'
|
__tablename__ = 'user_x_role'
|
||||||
|
__table_args__ = (
|
||||||
|
sa.ForeignKeyConstraint(['user_uuid'], ['user.uuid'],
|
||||||
|
# TODO
|
||||||
|
# name='user_x_role_fk_user',
|
||||||
|
),
|
||||||
|
sa.ForeignKeyConstraint(['role_uuid'], ['role.uuid'],
|
||||||
|
# TODO
|
||||||
|
# name='user_x_role_fk_role',
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
uuid = uuid_column()
|
uuid = uuid_column()
|
||||||
|
|
||||||
user_uuid = uuid_fk_column('user.uuid', nullable=False)
|
user_uuid = uuid_fk_column(nullable=False)
|
||||||
user = orm.relationship(
|
user = orm.relationship(
|
||||||
User,
|
User,
|
||||||
back_populates='role_refs',
|
back_populates='role_refs',
|
||||||
cascade_backrefs=False,
|
|
||||||
doc="""
|
doc="""
|
||||||
Reference to the :class:`User` involved.
|
Reference to the :class:`User` involved.
|
||||||
""")
|
""")
|
||||||
|
|
||||||
role_uuid = uuid_fk_column('role.uuid', nullable=False)
|
role_uuid = uuid_fk_column(nullable=False)
|
||||||
role = orm.relationship(
|
role = orm.relationship(
|
||||||
Role,
|
Role,
|
||||||
back_populates='user_refs',
|
back_populates='user_refs',
|
||||||
cascade_backrefs=False,
|
|
||||||
doc="""
|
doc="""
|
||||||
Reference to the :class:`Role` involved.
|
Reference to the :class:`Role` involved.
|
||||||
""")
|
""")
|
||||||
|
|
|
@ -34,19 +34,7 @@ from sqlalchemy import orm
|
||||||
from wuttjamaican.util import make_uuid
|
from wuttjamaican.util import make_uuid
|
||||||
|
|
||||||
|
|
||||||
# nb. this convention comes from upstream docs
|
Base = orm.declarative_base()
|
||||||
# https://docs.sqlalchemy.org/en/14/core/constraints.html#constraint-naming-conventions
|
|
||||||
naming_convention = {
|
|
||||||
'ix': 'ix_%(column_0_label)s',
|
|
||||||
'uq': 'uq_%(table_name)s_%(column_0_name)s',
|
|
||||||
'ck': 'ck_%(table_name)s_%(constraint_name)s',
|
|
||||||
'fk': 'fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s',
|
|
||||||
'pk': 'pk_%(table_name)s',
|
|
||||||
}
|
|
||||||
|
|
||||||
metadata = sa.MetaData(naming_convention=naming_convention)
|
|
||||||
|
|
||||||
Base = orm.declarative_base(metadata=metadata)
|
|
||||||
|
|
||||||
|
|
||||||
def uuid_column(*args, **kwargs):
|
def uuid_column(*args, **kwargs):
|
||||||
|
@ -59,14 +47,11 @@ def uuid_column(*args, **kwargs):
|
||||||
return sa.Column(sa.String(length=32), *args, **kwargs)
|
return sa.Column(sa.String(length=32), *args, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
def uuid_fk_column(target_column, *args, **kwargs):
|
def uuid_fk_column(*args, **kwargs):
|
||||||
"""
|
"""
|
||||||
Returns a UUID column for use as a foreign key to another table.
|
Returns a UUID column for use as a foreign key to another table.
|
||||||
|
|
||||||
:param target_column: Name of the table column on the remote side,
|
|
||||||
e.g. ``'user.uuid'``.
|
|
||||||
"""
|
"""
|
||||||
return sa.Column(sa.String(length=32), sa.ForeignKey(target_column), *args, **kwargs)
|
return sa.Column(sa.String(length=32), *args, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
class Setting(Base):
|
class Setting(Base):
|
||||||
|
@ -85,77 +70,3 @@ class Setting(Base):
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
return self.name or ""
|
return self.name or ""
|
||||||
|
|
||||||
|
|
||||||
class Person(Base):
|
|
||||||
"""
|
|
||||||
Represents a person.
|
|
||||||
|
|
||||||
The use for this table in the base framework, is to associate with
|
|
||||||
a :class:`~wuttjamaican.db.model.auth.User` to provide first and
|
|
||||||
last name etc. (However a user does not have to be associated
|
|
||||||
with any person.)
|
|
||||||
|
|
||||||
But this table could also be used as a basis for a Customer or
|
|
||||||
Employee relationship etc.
|
|
||||||
"""
|
|
||||||
__tablename__ = 'person'
|
|
||||||
|
|
||||||
uuid = uuid_column()
|
|
||||||
|
|
||||||
full_name = sa.Column(sa.String(length=100), nullable=False, doc="""
|
|
||||||
Full name for the person. Note that this is *required*.
|
|
||||||
""")
|
|
||||||
|
|
||||||
first_name = sa.Column(sa.String(length=50), nullable=True, doc="""
|
|
||||||
The person's first name.
|
|
||||||
""")
|
|
||||||
|
|
||||||
middle_name = sa.Column(sa.String(length=50), nullable=True, doc="""
|
|
||||||
The person's middle name or initial.
|
|
||||||
""")
|
|
||||||
|
|
||||||
last_name = sa.Column(sa.String(length=50), nullable=True, doc="""
|
|
||||||
The person's last name.
|
|
||||||
""")
|
|
||||||
|
|
||||||
users = orm.relationship(
|
|
||||||
'User',
|
|
||||||
back_populates='person',
|
|
||||||
cascade_backrefs=False,
|
|
||||||
doc="""
|
|
||||||
List of :class:`~wuttjamaican.db.model.auth.User` accounts for
|
|
||||||
the person. Typically there is only one user account per
|
|
||||||
person, but technically multiple are supported.
|
|
||||||
""")
|
|
||||||
|
|
||||||
def __str__(self):
|
|
||||||
return self.full_name or ""
|
|
||||||
|
|
||||||
@property
|
|
||||||
def user(self):
|
|
||||||
"""
|
|
||||||
Reference to the "first"
|
|
||||||
:class:`~wuttjamaican.db.model.auth.User` account for the
|
|
||||||
person, or ``None``.
|
|
||||||
|
|
||||||
.. warning::
|
|
||||||
|
|
||||||
Note that the database schema supports multiple users per
|
|
||||||
person, but this property logic ignores that and will only
|
|
||||||
ever return "one or none". That might be fine in 99% of
|
|
||||||
cases, but if multiple accounts exist for a person, the one
|
|
||||||
returned is indeterminate.
|
|
||||||
|
|
||||||
See :attr:`users` to access the full list.
|
|
||||||
"""
|
|
||||||
|
|
||||||
# TODO: i'm not crazy about the ambiguity here re: number of
|
|
||||||
# user accounts a person may have. in particular it's not
|
|
||||||
# clear *which* user account would be returned, as there is no
|
|
||||||
# sequence ordinal defined etc. a better approach might be to
|
|
||||||
# force callers to assume the possibility of multiple
|
|
||||||
# user accounts per person? (if so, remove this property)
|
|
||||||
|
|
||||||
if self.users:
|
|
||||||
return self.users[0]
|
|
||||||
|
|
|
@ -1,66 +0,0 @@
|
||||||
# -*- coding: utf-8; -*-
|
|
||||||
################################################################################
|
|
||||||
#
|
|
||||||
# WuttJamaican -- Base package for Wutta Framework
|
|
||||||
# Copyright © 2023-2024 Lance Edgar
|
|
||||||
#
|
|
||||||
# This file is part of Wutta Framework.
|
|
||||||
#
|
|
||||||
# Wutta Framework is free software: you can redistribute it and/or modify it
|
|
||||||
# under the terms of the GNU General Public License as published by the Free
|
|
||||||
# Software Foundation, either version 3 of the License, or (at your option) any
|
|
||||||
# later version.
|
|
||||||
#
|
|
||||||
# Wutta Framework is distributed in the hope that it will be useful, but
|
|
||||||
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
||||||
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
|
|
||||||
# more details.
|
|
||||||
#
|
|
||||||
# You should have received a copy of the GNU General Public License along with
|
|
||||||
# Wutta Framework. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
#
|
|
||||||
################################################################################
|
|
||||||
"""
|
|
||||||
People Handler
|
|
||||||
|
|
||||||
This is a :term:`handler` to manage "people" in the DB.
|
|
||||||
"""
|
|
||||||
|
|
||||||
from wuttjamaican.app import GenericHandler
|
|
||||||
|
|
||||||
|
|
||||||
class PeopleHandler(GenericHandler):
|
|
||||||
"""
|
|
||||||
Base class and default implementation for the "people"
|
|
||||||
:term:`handler`.
|
|
||||||
|
|
||||||
This is responsible for managing
|
|
||||||
:class:`~wuttjamaican.db.model.base.Person` records, and related
|
|
||||||
things.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def get_person(self, obj, **kwargs):
|
|
||||||
"""
|
|
||||||
Return the :class:`~wuttjamaican.db.model.base.Person`
|
|
||||||
associated with the given object, if one can be found.
|
|
||||||
|
|
||||||
This method should accept "any" type of ``obj`` and inspect it
|
|
||||||
to determine if/how a person can be found. It should return
|
|
||||||
the "first, most obvious" person in the event that the object
|
|
||||||
is associated with multiple people.
|
|
||||||
|
|
||||||
This is a rather fundamental method, in that it is called by
|
|
||||||
several other methods, both within this handler as well as
|
|
||||||
others. There is also a shortcut to it, accessible via
|
|
||||||
:meth:`wuttjamaican.app.AppHandler.get_person()`.
|
|
||||||
"""
|
|
||||||
model = self.app.model
|
|
||||||
|
|
||||||
if isinstance(obj, model.Person):
|
|
||||||
person = obj
|
|
||||||
return person
|
|
||||||
|
|
||||||
elif isinstance(obj, model.User):
|
|
||||||
user = obj
|
|
||||||
if user.person:
|
|
||||||
return user.person
|
|
|
@ -5,7 +5,6 @@ from unittest import TestCase
|
||||||
try:
|
try:
|
||||||
import sqlalchemy as sa
|
import sqlalchemy as sa
|
||||||
from wuttjamaican.db.model import auth as model
|
from wuttjamaican.db.model import auth as model
|
||||||
from wuttjamaican.db.model.base import Person
|
|
||||||
except ImportError:
|
except ImportError:
|
||||||
pass
|
pass
|
||||||
else:
|
else:
|
||||||
|
@ -30,16 +29,8 @@ else:
|
||||||
|
|
||||||
class TestUser(TestCase):
|
class TestUser(TestCase):
|
||||||
|
|
||||||
def test_str(self):
|
def test_basic(self):
|
||||||
user = model.User()
|
user = model.User()
|
||||||
self.assertEqual(str(user), "")
|
self.assertEqual(str(user), "")
|
||||||
user.username = 'barney'
|
user.username = 'barney'
|
||||||
self.assertEqual(str(user), "barney")
|
self.assertEqual(str(user), "barney")
|
||||||
|
|
||||||
def test_str_with_person(self):
|
|
||||||
user = model.User()
|
|
||||||
self.assertEqual(str(user), "")
|
|
||||||
|
|
||||||
person = Person(full_name="Barney Rubble")
|
|
||||||
user.person = person
|
|
||||||
self.assertEqual(str(user), "Barney Rubble")
|
|
||||||
|
|
|
@ -5,7 +5,6 @@ from unittest import TestCase
|
||||||
try:
|
try:
|
||||||
import sqlalchemy as sa
|
import sqlalchemy as sa
|
||||||
from wuttjamaican.db.model import base as model
|
from wuttjamaican.db.model import base as model
|
||||||
from wuttjamaican.db.model.auth import User
|
|
||||||
except ImportError:
|
except ImportError:
|
||||||
pass
|
pass
|
||||||
else:
|
else:
|
||||||
|
@ -21,7 +20,7 @@ else:
|
||||||
class TestUUIDFKColumn(TestCase):
|
class TestUUIDFKColumn(TestCase):
|
||||||
|
|
||||||
def test_basic(self):
|
def test_basic(self):
|
||||||
column = model.uuid_fk_column('foo.bar')
|
column = model.uuid_column()
|
||||||
self.assertIsInstance(column, sa.Column)
|
self.assertIsInstance(column, sa.Column)
|
||||||
self.assertIsInstance(column.type, sa.String)
|
self.assertIsInstance(column.type, sa.String)
|
||||||
self.assertEqual(column.type.length, 32)
|
self.assertEqual(column.type.length, 32)
|
||||||
|
@ -33,19 +32,3 @@ else:
|
||||||
self.assertEqual(str(setting), "")
|
self.assertEqual(str(setting), "")
|
||||||
setting.name = 'foo'
|
setting.name = 'foo'
|
||||||
self.assertEqual(str(setting), "foo")
|
self.assertEqual(str(setting), "foo")
|
||||||
|
|
||||||
class TestPerson(TestCase):
|
|
||||||
|
|
||||||
def test_basic(self):
|
|
||||||
person = model.Person()
|
|
||||||
self.assertEqual(str(person), "")
|
|
||||||
person.full_name = "Barney Rubble"
|
|
||||||
self.assertEqual(str(person), "Barney Rubble")
|
|
||||||
|
|
||||||
def test_users(self):
|
|
||||||
person = model.Person()
|
|
||||||
self.assertIsNone(person.user)
|
|
||||||
|
|
||||||
user = User()
|
|
||||||
person.users.append(user)
|
|
||||||
self.assertIs(person.user, user)
|
|
||||||
|
|
|
@ -101,17 +101,17 @@ class TestAppHandler(TestCase):
|
||||||
from wuttjamaican.db import model
|
from wuttjamaican.db import model
|
||||||
except ImportError:
|
except ImportError:
|
||||||
pytest.skip("test not relevant without sqlalchemy")
|
pytest.skip("test not relevant without sqlalchemy")
|
||||||
|
else:
|
||||||
self.assertNotIn('model', self.app.__dict__)
|
self.assertNotIn('model', self.app.__dict__)
|
||||||
self.assertIs(self.app.model, model)
|
self.assertIs(self.app.model, model)
|
||||||
|
|
||||||
def test_get_model(self):
|
def test_get_model(self):
|
||||||
try:
|
try:
|
||||||
from wuttjamaican.db import model
|
from wuttjamaican.db import model
|
||||||
except ImportError:
|
except ImportError:
|
||||||
pytest.skip("test not relevant without sqlalchemy")
|
pytest.skip("test not relevant without sqlalchemy")
|
||||||
|
else:
|
||||||
self.assertIs(self.app.get_model(), model)
|
self.assertIs(self.app.get_model(), model)
|
||||||
|
|
||||||
def test_get_title(self):
|
def test_get_title(self):
|
||||||
self.assertEqual(self.app.get_title(), 'WuttJamaican')
|
self.assertEqual(self.app.get_title(), 'WuttJamaican')
|
||||||
|
@ -120,44 +120,6 @@ class TestAppHandler(TestCase):
|
||||||
uuid = self.app.make_uuid()
|
uuid = self.app.make_uuid()
|
||||||
self.assertEqual(len(uuid), 32)
|
self.assertEqual(len(uuid), 32)
|
||||||
|
|
||||||
def test_get_session(self):
|
|
||||||
try:
|
|
||||||
import sqlalchemy as sa
|
|
||||||
from sqlalchemy import orm
|
|
||||||
except ImportError:
|
|
||||||
pytest.skip("test not relevant without sqlalchemy")
|
|
||||||
|
|
||||||
model = self.app.model
|
|
||||||
user = model.User()
|
|
||||||
self.assertIsNone(self.app.get_session(user))
|
|
||||||
|
|
||||||
Session = orm.sessionmaker()
|
|
||||||
engine = sa.create_engine('sqlite://')
|
|
||||||
mysession = Session(bind=engine)
|
|
||||||
mysession.add(user)
|
|
||||||
session = self.app.get_session(user)
|
|
||||||
self.assertIs(session, mysession)
|
|
||||||
|
|
||||||
def test_get_person(self):
|
|
||||||
people = self.app.get_people_handler()
|
|
||||||
with patch.object(people, 'get_person') as get_person:
|
|
||||||
get_person.return_value = 'foo'
|
|
||||||
person = self.app.get_person('bar')
|
|
||||||
get_person.assert_called_once_with('bar')
|
|
||||||
self.assertEqual(person, 'foo')
|
|
||||||
|
|
||||||
def test_get_auth_handler(self):
|
|
||||||
from wuttjamaican.auth import AuthHandler
|
|
||||||
|
|
||||||
auth = self.app.get_auth_handler()
|
|
||||||
self.assertIsInstance(auth, AuthHandler)
|
|
||||||
|
|
||||||
def test_get_people_handler(self):
|
|
||||||
from wuttjamaican.people import PeopleHandler
|
|
||||||
|
|
||||||
people = self.app.get_people_handler()
|
|
||||||
self.assertIsInstance(people, PeopleHandler)
|
|
||||||
|
|
||||||
|
|
||||||
class TestAppProvider(TestCase):
|
class TestAppProvider(TestCase):
|
||||||
|
|
||||||
|
|
|
@ -5,353 +5,13 @@ from unittest import TestCase
|
||||||
from wuttjamaican import auth as mod
|
from wuttjamaican import auth as mod
|
||||||
from wuttjamaican.conf import WuttaConfig
|
from wuttjamaican.conf import WuttaConfig
|
||||||
|
|
||||||
try:
|
|
||||||
import sqlalchemy as sa
|
|
||||||
except ImportError:
|
|
||||||
pass
|
|
||||||
else:
|
|
||||||
|
|
||||||
|
class TestAuthHandler(TestCase):
|
||||||
|
|
||||||
class TestAuthHandler(TestCase):
|
def setUp(self):
|
||||||
|
self.config = WuttaConfig()
|
||||||
|
self.app = self.config.get_app()
|
||||||
|
|
||||||
def setUp(self):
|
def test_basic(self):
|
||||||
self.config = WuttaConfig()
|
handler = mod.AuthHandler(self.config)
|
||||||
self.app = self.config.get_app()
|
self.assertIs(handler.app, self.app)
|
||||||
self.handler = mod.AuthHandler(self.config)
|
|
||||||
|
|
||||||
self.engine = sa.create_engine('sqlite://')
|
|
||||||
self.app.model.Base.metadata.create_all(bind=self.engine)
|
|
||||||
self.session = self.make_session()
|
|
||||||
|
|
||||||
def tearDown(self):
|
|
||||||
self.session.close()
|
|
||||||
self.app.model.Base.metadata.drop_all(bind=self.engine)
|
|
||||||
|
|
||||||
def make_session(self):
|
|
||||||
return self.app.make_session(bind=self.engine)
|
|
||||||
|
|
||||||
def test_authenticate_user(self):
|
|
||||||
model = self.app.model
|
|
||||||
barney = model.User(username='barney')
|
|
||||||
self.handler.set_user_password(barney, 'goodpass')
|
|
||||||
self.session.add(barney)
|
|
||||||
self.session.commit()
|
|
||||||
|
|
||||||
# login ok
|
|
||||||
user = self.handler.authenticate_user(self.session, 'barney', 'goodpass')
|
|
||||||
self.assertIs(user, barney)
|
|
||||||
|
|
||||||
# can also pass user instead of username
|
|
||||||
user = self.handler.authenticate_user(self.session, barney, 'goodpass')
|
|
||||||
self.assertIs(user, barney)
|
|
||||||
|
|
||||||
# bad password
|
|
||||||
user = self.handler.authenticate_user(self.session, 'barney', 'BADPASS')
|
|
||||||
self.assertIsNone(user)
|
|
||||||
|
|
||||||
# bad username
|
|
||||||
user = self.handler.authenticate_user(self.session, 'NOBODY', 'goodpass')
|
|
||||||
self.assertIsNone(user)
|
|
||||||
|
|
||||||
# inactive user
|
|
||||||
user = self.handler.authenticate_user(self.session, 'barney', 'goodpass')
|
|
||||||
self.assertIs(user, barney)
|
|
||||||
barney.active = False
|
|
||||||
user = self.handler.authenticate_user(self.session, 'barney', 'goodpass')
|
|
||||||
self.assertIsNone(user)
|
|
||||||
|
|
||||||
def test_get_role(self):
|
|
||||||
model = self.app.model
|
|
||||||
myrole = model.Role(name="My Role")
|
|
||||||
self.session.add(myrole)
|
|
||||||
self.session.commit()
|
|
||||||
|
|
||||||
# empty key is ignored
|
|
||||||
role = self.handler.get_role(self.session, None)
|
|
||||||
self.assertIsNone(role)
|
|
||||||
|
|
||||||
# key may be uuid
|
|
||||||
role = self.handler.get_role(self.session, myrole.uuid)
|
|
||||||
self.assertIs(role, myrole)
|
|
||||||
|
|
||||||
# key may be name
|
|
||||||
role = self.handler.get_role(self.session, myrole.name)
|
|
||||||
self.assertIs(role, myrole)
|
|
||||||
|
|
||||||
# key may be represented within a setting
|
|
||||||
self.config.usedb = True
|
|
||||||
role = self.handler.get_role(self.session, 'mykey')
|
|
||||||
self.assertIsNone(role)
|
|
||||||
setting = model.Setting(name='wutta.role.mykey', value=myrole.uuid)
|
|
||||||
self.session.add(setting)
|
|
||||||
self.session.commit()
|
|
||||||
role = self.handler.get_role(self.session, 'mykey')
|
|
||||||
self.assertIs(role, myrole)
|
|
||||||
|
|
||||||
def test_get_user(self):
|
|
||||||
model = self.app.model
|
|
||||||
myuser = model.User(username='myuser')
|
|
||||||
self.session.add(myuser)
|
|
||||||
self.session.commit()
|
|
||||||
|
|
||||||
# empty obj is ignored
|
|
||||||
user = self.handler.get_user(None)
|
|
||||||
self.assertIsNone(user)
|
|
||||||
|
|
||||||
# user is returned as-is
|
|
||||||
user = self.handler.get_user(myuser)
|
|
||||||
self.assertIs(user, myuser)
|
|
||||||
|
|
||||||
# find user from person
|
|
||||||
myperson = model.Person(full_name='My Name')
|
|
||||||
self.session.add(myperson)
|
|
||||||
user.person = myperson
|
|
||||||
self.session.commit()
|
|
||||||
user = self.handler.get_user(myperson)
|
|
||||||
self.assertIs(user, myuser)
|
|
||||||
|
|
||||||
def test_make_user(self):
|
|
||||||
model = self.app.model
|
|
||||||
|
|
||||||
# empty user
|
|
||||||
user = self.handler.make_user()
|
|
||||||
self.assertIsInstance(user, model.User)
|
|
||||||
self.assertIsNone(user.username)
|
|
||||||
|
|
||||||
# user is added to session
|
|
||||||
user = self.handler.make_user(session=self.session)
|
|
||||||
self.assertIn(user, self.session)
|
|
||||||
self.session.rollback()
|
|
||||||
self.assertNotIn(user, self.session)
|
|
||||||
|
|
||||||
# default username
|
|
||||||
# nb. this behavior requires a session
|
|
||||||
user = self.handler.make_user(session=self.session)
|
|
||||||
self.assertEqual(user.username, 'newuser')
|
|
||||||
|
|
||||||
def test_delete_user(self):
|
|
||||||
model = self.app.model
|
|
||||||
|
|
||||||
# basics
|
|
||||||
myuser = model.User(username='myuser')
|
|
||||||
self.session.add(myuser)
|
|
||||||
self.session.commit()
|
|
||||||
user = self.session.query(model.User).one()
|
|
||||||
self.assertIs(user, myuser)
|
|
||||||
self.handler.delete_user(user)
|
|
||||||
self.session.commit()
|
|
||||||
self.assertEqual(self.session.query(model.User).count(), 0)
|
|
||||||
|
|
||||||
def test_make_preferred_username(self):
|
|
||||||
model = self.app.model
|
|
||||||
|
|
||||||
# default
|
|
||||||
name = self.handler.make_preferred_username(self.session)
|
|
||||||
self.assertEqual(name, 'newuser')
|
|
||||||
|
|
||||||
# person/first+last
|
|
||||||
person = model.Person(first_name='Barney', last_name='Rubble')
|
|
||||||
name = self.handler.make_preferred_username(self.session, person=person)
|
|
||||||
self.assertEqual(name, 'barney.rubble')
|
|
||||||
|
|
||||||
# person/first
|
|
||||||
person = model.Person(first_name='Barney')
|
|
||||||
name = self.handler.make_preferred_username(self.session, person=person)
|
|
||||||
self.assertEqual(name, 'barney')
|
|
||||||
|
|
||||||
# person/last
|
|
||||||
person = model.Person(last_name='Rubble')
|
|
||||||
name = self.handler.make_preferred_username(self.session, person=person)
|
|
||||||
self.assertEqual(name, 'rubble')
|
|
||||||
|
|
||||||
def test_make_unique_username(self):
|
|
||||||
model = self.app.model
|
|
||||||
|
|
||||||
# default
|
|
||||||
name = self.handler.make_unique_username(self.session)
|
|
||||||
self.assertEqual(name, 'newuser')
|
|
||||||
user = model.User(username=name)
|
|
||||||
self.session.add(user)
|
|
||||||
self.session.commit()
|
|
||||||
|
|
||||||
# counter invoked if name exists
|
|
||||||
name = self.handler.make_unique_username(self.session)
|
|
||||||
self.assertEqual(name, 'newuser01')
|
|
||||||
user = model.User(username=name)
|
|
||||||
self.session.add(user)
|
|
||||||
self.session.commit()
|
|
||||||
|
|
||||||
# starts by getting preferred name
|
|
||||||
person = model.Person(first_name='Barney', last_name='Rubble')
|
|
||||||
name = self.handler.make_unique_username(self.session, person=person)
|
|
||||||
self.assertEqual(name, 'barney.rubble')
|
|
||||||
user = model.User(username=name)
|
|
||||||
self.session.add(user)
|
|
||||||
self.session.commit()
|
|
||||||
|
|
||||||
# counter invoked if name exists
|
|
||||||
name = self.handler.make_unique_username(self.session, person=person)
|
|
||||||
self.assertEqual(name, 'barney.rubble01')
|
|
||||||
|
|
||||||
def test_set_user_password(self):
|
|
||||||
model = self.app.model
|
|
||||||
myuser = model.User(username='myuser')
|
|
||||||
self.session.add(myuser)
|
|
||||||
|
|
||||||
# basics
|
|
||||||
self.assertIsNone(myuser.password)
|
|
||||||
self.handler.set_user_password(myuser, 'goodpass')
|
|
||||||
self.session.commit()
|
|
||||||
self.assertIsNotNone(myuser.password)
|
|
||||||
# nb. password is hashed
|
|
||||||
self.assertNotEqual(myuser.password, 'goodpass')
|
|
||||||
|
|
||||||
# confirm login works with new password
|
|
||||||
user = self.handler.authenticate_user(self.session, 'myuser', 'goodpass')
|
|
||||||
self.assertIs(user, myuser)
|
|
||||||
|
|
||||||
def test_get_role_administrator(self):
|
|
||||||
model = self.app.model
|
|
||||||
|
|
||||||
self.assertEqual(self.session.query(model.Role).count(), 0)
|
|
||||||
role = self.handler.get_role_administrator(self.session)
|
|
||||||
self.assertEqual(self.session.query(model.Role).count(), 1)
|
|
||||||
self.assertEqual(role.name, "Administrator")
|
|
||||||
|
|
||||||
def test_get_role_anonymous(self):
|
|
||||||
model = self.app.model
|
|
||||||
|
|
||||||
self.assertEqual(self.session.query(model.Role).count(), 0)
|
|
||||||
role = self.handler.get_role_anonymous(self.session)
|
|
||||||
self.assertEqual(self.session.query(model.Role).count(), 1)
|
|
||||||
self.assertEqual(role.name, "Anonymous")
|
|
||||||
|
|
||||||
def test_get_role_authenticated(self):
|
|
||||||
model = self.app.model
|
|
||||||
|
|
||||||
self.assertEqual(self.session.query(model.Role).count(), 0)
|
|
||||||
role = self.handler.get_role_authenticated(self.session)
|
|
||||||
self.assertEqual(self.session.query(model.Role).count(), 1)
|
|
||||||
self.assertEqual(role.name, "Authenticated")
|
|
||||||
|
|
||||||
def test_get_permissions(self):
|
|
||||||
model = self.app.model
|
|
||||||
|
|
||||||
# empty default for role
|
|
||||||
role = model.Role()
|
|
||||||
perms = self.handler.get_permissions(self.session, role)
|
|
||||||
self.assertIsInstance(perms, set)
|
|
||||||
self.assertEqual(len(perms), 0)
|
|
||||||
|
|
||||||
# empty default for user
|
|
||||||
user = model.User()
|
|
||||||
perms = self.handler.get_permissions(self.session, user)
|
|
||||||
self.assertIsInstance(perms, set)
|
|
||||||
self.assertEqual(len(perms), 0)
|
|
||||||
|
|
||||||
# role perms
|
|
||||||
myrole = model.Role(name='My Role')
|
|
||||||
self.session.add(myrole)
|
|
||||||
self.handler.grant_permission(myrole, 'foo')
|
|
||||||
self.session.commit()
|
|
||||||
perms = self.handler.get_permissions(self.session, myrole)
|
|
||||||
self.assertEqual(perms, {'foo'})
|
|
||||||
|
|
||||||
# user perms
|
|
||||||
myuser = model.User(username='myuser')
|
|
||||||
self.session.add(myuser)
|
|
||||||
self.session.commit()
|
|
||||||
perms = self.handler.get_permissions(self.session, myuser)
|
|
||||||
self.assertEqual(len(perms), 0)
|
|
||||||
myuser.roles.append(myrole)
|
|
||||||
self.session.commit()
|
|
||||||
perms = self.handler.get_permissions(self.session, myuser)
|
|
||||||
self.assertEqual(perms, {'foo'})
|
|
||||||
|
|
||||||
# invalid principal
|
|
||||||
perms = self.handler.get_permissions(self.session, RuntimeError)
|
|
||||||
self.assertEqual(perms, set())
|
|
||||||
|
|
||||||
# missing principal
|
|
||||||
perms = self.handler.get_permissions(self.session, None)
|
|
||||||
self.assertEqual(perms, set())
|
|
||||||
|
|
||||||
def test_has_permission(self):
|
|
||||||
model = self.app.model
|
|
||||||
|
|
||||||
# false default for role
|
|
||||||
role = model.Role()
|
|
||||||
self.assertFalse(self.handler.has_permission(self.session, role, 'foo'))
|
|
||||||
|
|
||||||
# empty default for user
|
|
||||||
user = model.User()
|
|
||||||
self.assertFalse(self.handler.has_permission(self.session, user, 'foo'))
|
|
||||||
|
|
||||||
# role perms
|
|
||||||
myrole = model.Role(name='My Role')
|
|
||||||
self.session.add(myrole)
|
|
||||||
self.session.commit()
|
|
||||||
self.assertFalse(self.handler.has_permission(self.session, myrole, 'foo'))
|
|
||||||
self.handler.grant_permission(myrole, 'foo')
|
|
||||||
self.session.commit()
|
|
||||||
self.assertTrue(self.handler.has_permission(self.session, myrole, 'foo'))
|
|
||||||
|
|
||||||
# user perms
|
|
||||||
myuser = model.User(username='myuser')
|
|
||||||
self.session.add(myuser)
|
|
||||||
self.session.commit()
|
|
||||||
self.assertFalse(self.handler.has_permission(self.session, myuser, 'foo'))
|
|
||||||
myuser.roles.append(myrole)
|
|
||||||
self.session.commit()
|
|
||||||
self.assertTrue(self.handler.has_permission(self.session, myuser, 'foo'))
|
|
||||||
|
|
||||||
# invalid principal
|
|
||||||
self.assertFalse(self.handler.has_permission(self.session, RuntimeError, 'foo'))
|
|
||||||
|
|
||||||
# missing principal
|
|
||||||
self.assertFalse(self.handler.has_permission(self.session, None, 'foo'))
|
|
||||||
|
|
||||||
def test_grant_permission(self):
|
|
||||||
model = self.app.model
|
|
||||||
myrole = model.Role(name='My Role')
|
|
||||||
self.session.add(myrole)
|
|
||||||
self.session.commit()
|
|
||||||
|
|
||||||
# no perms yet
|
|
||||||
self.assertEqual(self.session.query(model.Permission).count(), 0)
|
|
||||||
|
|
||||||
# grant one perm, and confirm
|
|
||||||
self.handler.grant_permission(myrole, 'foo')
|
|
||||||
self.session.commit()
|
|
||||||
self.assertEqual(self.session.query(model.Permission).count(), 1)
|
|
||||||
perm = self.session.query(model.Permission).one()
|
|
||||||
self.assertIs(perm.role, myrole)
|
|
||||||
self.assertEqual(perm.permission, 'foo')
|
|
||||||
|
|
||||||
# grant same perm again, confirm just one exists
|
|
||||||
self.handler.grant_permission(myrole, 'foo')
|
|
||||||
self.session.commit()
|
|
||||||
self.assertEqual(self.session.query(model.Permission).count(), 1)
|
|
||||||
perm = self.session.query(model.Permission).one()
|
|
||||||
self.assertIs(perm.role, myrole)
|
|
||||||
self.assertEqual(perm.permission, 'foo')
|
|
||||||
|
|
||||||
def test_revoke_permission(self):
|
|
||||||
model = self.app.model
|
|
||||||
myrole = model.Role(name='My Role')
|
|
||||||
self.session.add(myrole)
|
|
||||||
self.handler.grant_permission(myrole, 'foo')
|
|
||||||
self.session.commit()
|
|
||||||
|
|
||||||
# just the one perm
|
|
||||||
self.assertEqual(self.session.query(model.Permission).count(), 1)
|
|
||||||
|
|
||||||
# revoke it, then confirm
|
|
||||||
self.handler.revoke_permission(myrole, 'foo')
|
|
||||||
self.session.commit()
|
|
||||||
self.assertEqual(self.session.query(model.Permission).count(), 0)
|
|
||||||
|
|
||||||
# revoke again, confirm
|
|
||||||
self.handler.revoke_permission(myrole, 'foo')
|
|
||||||
self.session.commit()
|
|
||||||
self.assertEqual(self.session.query(model.Permission).count(), 0)
|
|
||||||
|
|
|
@ -1,52 +0,0 @@
|
||||||
# -*- coding: utf-8; -*-
|
|
||||||
|
|
||||||
from unittest import TestCase
|
|
||||||
|
|
||||||
from wuttjamaican import people as mod
|
|
||||||
from wuttjamaican.conf import WuttaConfig
|
|
||||||
|
|
||||||
try:
|
|
||||||
import sqlalchemy as sa
|
|
||||||
except ImportError:
|
|
||||||
pass
|
|
||||||
else:
|
|
||||||
|
|
||||||
|
|
||||||
class TestPeopleHandler(TestCase):
|
|
||||||
|
|
||||||
def setUp(self):
|
|
||||||
self.config = WuttaConfig()
|
|
||||||
self.app = self.config.get_app()
|
|
||||||
self.handler = mod.PeopleHandler(self.config)
|
|
||||||
|
|
||||||
self.engine = sa.create_engine('sqlite://')
|
|
||||||
self.app.model.Base.metadata.create_all(bind=self.engine)
|
|
||||||
self.session = self.make_session()
|
|
||||||
|
|
||||||
def tearDown(self):
|
|
||||||
self.session.close()
|
|
||||||
self.app.model.Base.metadata.drop_all(bind=self.engine)
|
|
||||||
|
|
||||||
def make_session(self):
|
|
||||||
return self.app.make_session(bind=self.engine)
|
|
||||||
|
|
||||||
def test_get_person(self):
|
|
||||||
model = self.app.model
|
|
||||||
myperson = model.Person(full_name='Barny Rubble')
|
|
||||||
self.session.add(myperson)
|
|
||||||
self.session.commit()
|
|
||||||
|
|
||||||
# empty obj is ignored
|
|
||||||
person = self.handler.get_person(None)
|
|
||||||
self.assertIsNone(person)
|
|
||||||
|
|
||||||
# person is returned as-is
|
|
||||||
person = self.handler.get_person(myperson)
|
|
||||||
self.assertIs(person, myperson)
|
|
||||||
|
|
||||||
# find person from user
|
|
||||||
myuser = model.User(username='barney', person=myperson)
|
|
||||||
self.session.add(myuser)
|
|
||||||
self.session.commit()
|
|
||||||
person = self.handler.get_person(myuser)
|
|
||||||
self.assertIs(person, myperson)
|
|
Loading…
Reference in a new issue