Compare commits
4 commits
1995095627
...
ca997807e4
Author | SHA1 | Date | |
---|---|---|---|
![]() |
ca997807e4 | ||
![]() |
e899d06151 | ||
![]() |
43ca404837 | ||
![]() |
60d3fcd13b |
11
CHANGELOG.md
11
CHANGELOG.md
|
@ -5,6 +5,17 @@ All notable changes to WuttJamaican will be documented in this file.
|
|||
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
|
||||
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).
|
||||
|
||||
## v0.8.0 (2024-07-14)
|
||||
|
||||
### Feat
|
||||
|
||||
- flesh out the auth handler; add people handler
|
||||
- add model for Person; tie to User
|
||||
|
||||
### Fix
|
||||
|
||||
- add migration for auth tables
|
||||
|
||||
## v0.7.0 (2024-07-14)
|
||||
|
||||
### Feat
|
||||
|
|
|
@ -17,5 +17,6 @@
|
|||
db.model.base
|
||||
db.sess
|
||||
exc
|
||||
people
|
||||
testing
|
||||
util
|
||||
|
|
6
docs/api/wuttjamaican/people.rst
Normal file
6
docs/api/wuttjamaican/people.rst
Normal file
|
@ -0,0 +1,6 @@
|
|||
|
||||
``wuttjamaican.people``
|
||||
=======================
|
||||
|
||||
.. automodule:: wuttjamaican.people
|
||||
:members:
|
|
@ -109,6 +109,13 @@ Glossary
|
|||
Most :term:`apps<app>` will have at least one :term:`app
|
||||
database`.
|
||||
|
||||
db session
|
||||
The "session" is a SQLAlchemy abstraction for an open database
|
||||
connection, essentially.
|
||||
|
||||
In practice this generally refers to a
|
||||
:class:`~wuttjamaican.db.sess.Session` instance.
|
||||
|
||||
entry point
|
||||
This refers to a "setuptools-style" entry point specifically,
|
||||
which is a mechanism used to register "plugins" and the like.
|
||||
|
|
|
@ -6,7 +6,7 @@ build-backend = "hatchling.build"
|
|||
|
||||
[project]
|
||||
name = "WuttJamaican"
|
||||
version = "0.7.0"
|
||||
version = "0.8.0"
|
||||
description = "Base package for Wutta Framework"
|
||||
readme = "README.md"
|
||||
authors = [{name = "Lance Edgar", email = "lance@edbob.org"}]
|
||||
|
@ -32,7 +32,7 @@ dependencies = [
|
|||
|
||||
|
||||
[project.optional-dependencies]
|
||||
db = ["SQLAlchemy<2", "alembic"]
|
||||
db = ["SQLAlchemy<2", "alembic", "passlib"]
|
||||
docs = ["Sphinx", "sphinxcontrib-programoutput", "furo"]
|
||||
tests = ["pytest-cov", "tox"]
|
||||
|
||||
|
|
|
@ -66,6 +66,8 @@ class AppHandler:
|
|||
"""
|
||||
default_app_title = "WuttJamaican"
|
||||
default_model_spec = 'wuttjamaican.db.model'
|
||||
default_auth_handler_spec = 'wuttjamaican.auth:AuthHandler'
|
||||
default_people_handler_spec = 'wuttjamaican.people:PeopleHandler'
|
||||
|
||||
def __init__(self, config):
|
||||
self.config = config
|
||||
|
@ -241,6 +243,16 @@ class AppHandler:
|
|||
"""
|
||||
return make_uuid()
|
||||
|
||||
def get_session(self, obj):
|
||||
"""
|
||||
Returns the SQLAlchemy session with which the given object is
|
||||
associated. Simple convenience wrapper around
|
||||
:func:`sqlalchemy:sqlalchemy.orm.object_session()`.
|
||||
"""
|
||||
from sqlalchemy import orm
|
||||
|
||||
return orm.object_session(obj)
|
||||
|
||||
def short_session(self, **kwargs):
|
||||
"""
|
||||
Returns a context manager for a short-lived database session.
|
||||
|
@ -279,6 +291,51 @@ class AppHandler:
|
|||
|
||||
return get_setting(session, name)
|
||||
|
||||
##############################
|
||||
# getters for other handlers
|
||||
##############################
|
||||
|
||||
def get_auth_handler(self, **kwargs):
|
||||
"""
|
||||
Get the configured :term:`auth handler`.
|
||||
|
||||
:rtype: :class:`~wuttjamaican.auth.AuthHandler`
|
||||
"""
|
||||
if 'auth' not in self.handlers:
|
||||
spec = self.config.get(f'{self.appname}.auth.handler',
|
||||
default=self.default_auth_handler_spec)
|
||||
factory = self.load_object(spec)
|
||||
self.handlers['auth'] = factory(self.config, **kwargs)
|
||||
return self.handlers['auth']
|
||||
|
||||
def get_people_handler(self, **kwargs):
|
||||
"""
|
||||
Get the configured "people" :term:`handler`.
|
||||
|
||||
:rtype: :class:`~wuttjamaican.people.PeopleHandler`
|
||||
"""
|
||||
if 'people' not in self.handlers:
|
||||
spec = self.config.get(f'{self.appname}.people.handler',
|
||||
default=self.default_people_handler_spec)
|
||||
factory = self.load_object(spec)
|
||||
self.handlers['people'] = factory(self.config, **kwargs)
|
||||
return self.handlers['people']
|
||||
|
||||
##############################
|
||||
# convenience delegators
|
||||
##############################
|
||||
|
||||
def get_person(self, obj, **kwargs):
|
||||
"""
|
||||
Convenience method to locate a
|
||||
:class:`~wuttjamaican.db.model.base.Person` for the given
|
||||
object.
|
||||
|
||||
This delegates to the "people" handler method,
|
||||
:meth:`~wuttjamaican.people.PeopleHandler.get_person()`.
|
||||
"""
|
||||
return self.get_people_handler().get_person(obj, **kwargs)
|
||||
|
||||
|
||||
class AppProvider:
|
||||
"""
|
||||
|
|
|
@ -29,6 +29,16 @@ This defines the default :term:`auth handler`.
|
|||
from wuttjamaican.app import GenericHandler
|
||||
|
||||
|
||||
# nb. this only works if passlib is installed (part of 'db' extra)
|
||||
try:
|
||||
from passlib.context import CryptContext
|
||||
except ImportError: # pragma: no cover
|
||||
pass
|
||||
else:
|
||||
password_context = CryptContext(schemes=['bcrypt'])
|
||||
|
||||
|
||||
|
||||
class AuthHandler(GenericHandler):
|
||||
"""
|
||||
Base class and default implementation for the :term:`auth
|
||||
|
@ -37,8 +47,436 @@ class AuthHandler(GenericHandler):
|
|||
This is responsible for "authentication and authorization" - for
|
||||
instance:
|
||||
|
||||
* create new users, roles
|
||||
* authenticate user from login credentials
|
||||
* check which permissions a user/role has
|
||||
* create/modify users, roles
|
||||
* grant/revoke role permissions
|
||||
* determine which permissions a user has
|
||||
* identify user from login credentials
|
||||
"""
|
||||
|
||||
def authenticate_user(self, session, username, password, **kwargs):
|
||||
"""
|
||||
Authenticate the given user credentials, and if successful,
|
||||
return the :class:`~wuttjamaican.db.model.auth.User`.
|
||||
|
||||
Default logic will (try to) locate a user with matching
|
||||
username, then confirm the supplied password is also a match.
|
||||
|
||||
Custom handlers can authenticate against anything else, using
|
||||
the given credentials. But they still must return a "native"
|
||||
``User`` object for the app to consider the authentication
|
||||
successful. The handler may auto-create the user if needed.
|
||||
|
||||
Generally speaking the credentials will have come directly
|
||||
from a user login attempt in the web app etc. Again the
|
||||
default logic assumes a "username" but in practice it may be
|
||||
an email address etc. - whatever the user entered.
|
||||
|
||||
:param session: Open :term:`db session`.
|
||||
|
||||
:param username: Usually a string, but also may be a
|
||||
:class:`~wuttjamaican.db.model.auth.User` instance, in
|
||||
which case no user lookup will occur. (However the user is
|
||||
still authenticated otherwise, i.e. the password must be
|
||||
correct etc.)
|
||||
|
||||
:param password: Password as string.
|
||||
|
||||
:returns: :class:`~wuttjamaican.db.model.auth.User` instance,
|
||||
or ``None``.
|
||||
"""
|
||||
model = self.app.model
|
||||
|
||||
if isinstance(username, model.User):
|
||||
user = username
|
||||
else:
|
||||
user = session.query(model.User)\
|
||||
.filter_by(username=username)\
|
||||
.first()
|
||||
|
||||
if user and user.active and user.password:
|
||||
if password_context.verify(password, user.password):
|
||||
return user
|
||||
|
||||
def get_role(self, session, key, **kwargs):
|
||||
"""
|
||||
Locate and return a :class:`~wuttjamaican.db.model.auth.Role`
|
||||
per the given key, if possible.
|
||||
|
||||
:param session: Open :term:`db session`.
|
||||
|
||||
:param key: Value to use when searching for the role. Can be
|
||||
a UUID or name of a role.
|
||||
|
||||
:returns: :class:`~wuttjamaican.db.model.auth.Role` instance;
|
||||
or ``None``.
|
||||
"""
|
||||
model = self.app.model
|
||||
|
||||
if not key:
|
||||
return
|
||||
|
||||
# try to match on Role.uuid
|
||||
role = session.get(model.Role, key)
|
||||
if role:
|
||||
return role
|
||||
|
||||
# try to match on Role.name
|
||||
role = session.query(model.Role)\
|
||||
.filter_by(name=key)\
|
||||
.first()
|
||||
if role:
|
||||
return role
|
||||
|
||||
# try settings; if value then recurse
|
||||
key = self.config.get(f'{self.appname}.role.{key}',
|
||||
session=session)
|
||||
if key:
|
||||
return self.get_role(session, key)
|
||||
|
||||
def get_user(self, obj, **kwargs):
|
||||
"""
|
||||
Return the :class:`~wuttjamaican.db.model.auth.User`
|
||||
associated with the given object, if one can be found.
|
||||
|
||||
This method should accept "any" type of ``obj`` and inspect it
|
||||
to determine if/how a user can be found. It should return the
|
||||
"first, most obvious" user in the event that the given object
|
||||
is associated with multiple users.
|
||||
|
||||
The default logic only knows how to navigate the Person/User
|
||||
relationship, so if ``obj`` is a
|
||||
:class:`~wuttjamaican.db.model.base.Person` then it will
|
||||
return the "first" user account for the person (according to
|
||||
:attr:`~wuttjamaican.db.model.base.Person.user`).
|
||||
|
||||
:returns: :class:`~wuttjamaican.db.model.auth.User` or ``None``.
|
||||
"""
|
||||
model = self.app.model
|
||||
|
||||
if isinstance(obj, model.User):
|
||||
return obj
|
||||
|
||||
person = self.app.get_person(obj)
|
||||
if person:
|
||||
return person.user
|
||||
|
||||
def make_user(self, session=None, **kwargs):
|
||||
"""
|
||||
Make and return a new
|
||||
:class:`~wuttjamaican.db.model.auth.User`.
|
||||
|
||||
This is mostly a simple wrapper around the
|
||||
:class:`~wuttjamaican.db.model.auth.User` constructor. All
|
||||
``kwargs`` are passed on to the constructor as-is, for
|
||||
instance. It also will add the user to the session, if
|
||||
applicable.
|
||||
|
||||
This method also adds one other convenience:
|
||||
|
||||
If there is no ``username`` specified in the ``kwargs`` then
|
||||
it will call :meth:`make_unique_username()` to automatically
|
||||
provide one. (Note that the ``kwargs`` will be passed along
|
||||
to that call as well.)
|
||||
|
||||
:param session: Open :term:`db session`, if applicable.
|
||||
|
||||
:returns: The new :class:`~wuttjamaican.db.model.auth.User`
|
||||
instance.
|
||||
"""
|
||||
model = self.app.model
|
||||
|
||||
if session and 'username' not in kwargs:
|
||||
kwargs['username'] = self.make_unique_username(session, **kwargs)
|
||||
|
||||
user = model.User(**kwargs)
|
||||
if session:
|
||||
session.add(user)
|
||||
return user
|
||||
|
||||
def delete_user(self, user, **kwargs):
|
||||
"""
|
||||
Delete the given user account. Use with caution! As this
|
||||
generally cannot be undone.
|
||||
|
||||
Default behavior simply deletes the user account. Depending
|
||||
on the DB schema and data present, this may cause an error
|
||||
(i.e. if the user is still referenced by other tables).
|
||||
|
||||
:param user: :class:`~wuttjamaican.db.model.auth.User` to
|
||||
delete.
|
||||
"""
|
||||
session = self.app.get_session(user)
|
||||
session.delete(user)
|
||||
|
||||
def make_preferred_username(self, session, **kwargs):
|
||||
"""
|
||||
Generate a "preferred" username, using data from ``kwargs`` as
|
||||
hints.
|
||||
|
||||
Note that ``kwargs`` should be of the same sort that might be
|
||||
passed to the :class:`~wuttjamaican.db.model.auth.User`
|
||||
constructor.
|
||||
|
||||
So far this logic is rather simple:
|
||||
|
||||
If ``kwargs`` contains ``person`` then a username will be
|
||||
constructed using the name data from the person
|
||||
(e.g. ``'john.doe'``).
|
||||
|
||||
In all other cases it will return ``'newuser'``.
|
||||
|
||||
.. note::
|
||||
|
||||
This method does not confirm if the username it generates
|
||||
is actually "available" for a new user. See
|
||||
:meth:`make_unique_username()` for that.
|
||||
|
||||
:param session: Open :term:`db session`.
|
||||
|
||||
:returns: Generated username as string.
|
||||
"""
|
||||
person = kwargs.get('person')
|
||||
if person:
|
||||
first = (person.first_name or '').strip().lower()
|
||||
last = (person.last_name or '').strip().lower()
|
||||
if first and last:
|
||||
return f'{first}.{last}'
|
||||
if first:
|
||||
return first
|
||||
if last:
|
||||
return last
|
||||
|
||||
return 'newuser'
|
||||
|
||||
def make_unique_username(self, session, **kwargs):
|
||||
"""
|
||||
Generate a *unique* username, using data from ``kwargs`` as
|
||||
hints.
|
||||
|
||||
Note that ``kwargs`` should be of the same sort that might be
|
||||
passed to the :class:`~wuttjamaican.db.model.auth.User`
|
||||
constructor.
|
||||
|
||||
This method is a convenience which does two things:
|
||||
|
||||
First it calls :meth:`make_preferred_username()` to obtain the
|
||||
"preferred" username. (It passes all ``kwargs`` along when it
|
||||
makes that call.)
|
||||
|
||||
Then it checks to see if the resulting username is already
|
||||
taken. If it is, then a "counter" is appended to the
|
||||
username, and incremented until a username can be found which
|
||||
is *not* yet taken.
|
||||
|
||||
It returns the first "available" (hence unique) username which
|
||||
is found. Note that it is considered unique and therefore
|
||||
available *at the time*; however this method does not
|
||||
"reserve" the username in any way. It is assumed that you
|
||||
would create the user yourself once you have the username.
|
||||
|
||||
:param session: Open :term:`db session`.
|
||||
|
||||
:returns: Username as string.
|
||||
"""
|
||||
model = self.app.model
|
||||
|
||||
original_username = self.make_preferred_username(session, **kwargs)
|
||||
username = original_username
|
||||
|
||||
# check for unique username
|
||||
counter = 1
|
||||
while True:
|
||||
users = session.query(model.User)\
|
||||
.filter(model.User.username == username)\
|
||||
.count()
|
||||
if not users:
|
||||
break
|
||||
username = f"{original_username}{counter:02d}"
|
||||
counter += 1
|
||||
|
||||
return username
|
||||
|
||||
def set_user_password(self, user, password, **kwargs):
|
||||
"""
|
||||
Set a user's password.
|
||||
|
||||
This will update the
|
||||
:attr:`~wuttjamaican.db.model.auth.User.password` attribute
|
||||
for the user. The value will be hashed using ``bcrypt``.
|
||||
|
||||
:param user: :class:`~wuttjamaican.db.model.auth.User` instance.
|
||||
|
||||
:param password: New password in plain text.
|
||||
"""
|
||||
user.password = password_context.hash(password)
|
||||
|
||||
def get_role_administrator(self, session, **kwargs):
|
||||
"""
|
||||
Returns the special "Administrator" role.
|
||||
"""
|
||||
return self._special_role(session, 'd937fa8a965611dfa0dd001143047286', "Administrator")
|
||||
|
||||
def get_role_anonymous(self, session, **kwargs):
|
||||
"""
|
||||
Returns the special "Anonymous" (aka. "Guest") role.
|
||||
"""
|
||||
return self._special_role(session, 'f8a27c98965a11dfaff7001143047286', "Anonymous")
|
||||
|
||||
def get_role_authenticated(self, session, **kwargs):
|
||||
"""
|
||||
Returns the special "Authenticated" role.
|
||||
"""
|
||||
return self._special_role(session, 'b765a9cc331a11e6ac2a3ca9f40bc550', "Authenticated")
|
||||
|
||||
def get_permissions(self, session, principal,
|
||||
include_anonymous=True,
|
||||
include_authenticated=True):
|
||||
"""
|
||||
Return a set of permission names, which represents all
|
||||
permissions effectively granted to the given user or role.
|
||||
|
||||
:param session: Open :term:`db session`.
|
||||
|
||||
:param principal: :class:`~wuttjamaican.db.model.auth.User` or
|
||||
:class:`~wuttjamaican.db.model.auth.Role` instance. Can
|
||||
also be ``None``, in which case the "Anonymous" role will
|
||||
be assumed.
|
||||
|
||||
:param include_anonymous: Whether the "Anonymous" role should
|
||||
be included when checking permissions. If ``False``, the
|
||||
Anonymous permissions will *not* be checked.
|
||||
|
||||
:param include_authenticated: Whether the "Authenticated" role
|
||||
should be included when checking permissions.
|
||||
|
||||
:returns: Set of permission names.
|
||||
:rtype: set
|
||||
"""
|
||||
# we will use any `roles` attribute which may be present. in
|
||||
# practice we would be assuming a User in this case
|
||||
if hasattr(principal, 'roles'):
|
||||
roles = [role
|
||||
for role in principal.roles
|
||||
if self._role_is_pertinent(role)]
|
||||
|
||||
# here our User assumption gets a little more explicit
|
||||
if include_authenticated:
|
||||
roles.append(self.get_role_authenticated(session))
|
||||
|
||||
# otherwise a non-null principal is assumed to be a Role
|
||||
elif principal is not None:
|
||||
roles = [principal]
|
||||
|
||||
# fallback assumption is "no roles"
|
||||
else:
|
||||
roles = []
|
||||
|
||||
# maybe include anonymous role
|
||||
if include_anonymous:
|
||||
roles.append(self.get_role_anonymous(session))
|
||||
|
||||
# build the permissions cache
|
||||
cache = set()
|
||||
for role in roles:
|
||||
if hasattr(role, 'permissions'):
|
||||
cache.update(role.permissions)
|
||||
|
||||
return cache
|
||||
|
||||
def has_permission(self, session, principal, permission,
|
||||
include_anonymous=True,
|
||||
include_authenticated=True):
|
||||
"""
|
||||
Check if the given user or role has been granted the given
|
||||
permission.
|
||||
|
||||
.. note::
|
||||
|
||||
While this method is perfectly usable, it is a bit "heavy"
|
||||
if you need to make multiple permission checks for the same
|
||||
user. To optimize, call :meth:`get_permissions()` and keep
|
||||
the result, then instead of calling ``has_permission()``
|
||||
just check if a given permission is contained in the cached
|
||||
result set.
|
||||
|
||||
(The logic just described is exactly what this method does,
|
||||
except it will not keep the result set, hence calling it
|
||||
multiple times for same user is not optimal.)
|
||||
|
||||
:param session: Open :term:`db session`.
|
||||
|
||||
:param principal: Either a
|
||||
:class:`~wuttjamaican.db.model.auth.User` or
|
||||
:class:`~wuttjamaican.db.model.auth.Role` instance. It is
|
||||
also expected that this may sometimes be ``None``, in which
|
||||
case the "Anonymous" role will be assumed.
|
||||
|
||||
:param permission: Name of the permission for which to check.
|
||||
|
||||
:param include_anonymous: Whether the "Anonymous" role should
|
||||
be included when checking permissions. If ``False``, then
|
||||
Anonymous permissions will *not* be checked.
|
||||
|
||||
:param include_authenticated: Whether the "Authenticated" role
|
||||
should be included when checking permissions.
|
||||
|
||||
:returns: Boolean indicating if the permission is granted.
|
||||
"""
|
||||
perms = self.get_permissions(session, principal,
|
||||
include_anonymous=include_anonymous,
|
||||
include_authenticated=include_authenticated)
|
||||
return permission in perms
|
||||
|
||||
def grant_permission(self, role, permission, **kwargs):
|
||||
"""
|
||||
Grant a permission to the role. If the role already has the
|
||||
permission, nothing is done.
|
||||
|
||||
:param role: :class:`~wuttjamaican.db.model.auth.Role`
|
||||
instance.
|
||||
|
||||
:param permission: Name of the permission as string.
|
||||
"""
|
||||
if permission not in role.permissions:
|
||||
role.permissions.append(permission)
|
||||
|
||||
def revoke_permission(self, role, permission, **kwargs):
|
||||
"""
|
||||
Revoke a permission from the role. If the role does not have
|
||||
the permission, nothing is done.
|
||||
|
||||
:param role: A :class:`~rattail.db.model.users.Role` instance.
|
||||
|
||||
:param permission: Name of the permission as string.
|
||||
"""
|
||||
if permission in role.permissions:
|
||||
role.permissions.remove(permission)
|
||||
|
||||
##############################
|
||||
# internal methods
|
||||
##############################
|
||||
|
||||
def _role_is_pertinent(self, role):
|
||||
"""
|
||||
Check the role to ensure it is "pertinent" for the current app.
|
||||
|
||||
The idea behind this is for sake of a multi-node system, where
|
||||
users and roles are synced between nodes. Some roles may be
|
||||
defined for only certain types of nodes and hence not
|
||||
"pertinent" for all nodes.
|
||||
|
||||
As of now there is no actual support for that, but this stub
|
||||
method exists for when it will.
|
||||
"""
|
||||
return True
|
||||
|
||||
def _special_role(self, session, uuid, name):
|
||||
"""
|
||||
Fetch a "special" role, creating if needed.
|
||||
"""
|
||||
model = self.app.model
|
||||
role = session.get(model.Role, uuid)
|
||||
if not role:
|
||||
role = model.Role(uuid=uuid, name=name)
|
||||
session.add(role)
|
||||
return role
|
||||
|
|
|
@ -0,0 +1,45 @@
|
|||
"""add people
|
||||
|
||||
Revision ID: 3abcc44f7f91
|
||||
Revises: d686f7abe3e0
|
||||
Create Date: 2024-07-14 15:14:30.552682
|
||||
|
||||
"""
|
||||
from typing import Sequence, Union
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision: str = '3abcc44f7f91'
|
||||
down_revision: Union[str, None] = 'd686f7abe3e0'
|
||||
branch_labels: Union[str, Sequence[str], None] = None
|
||||
depends_on: Union[str, Sequence[str], None] = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
|
||||
# person
|
||||
op.create_table('person',
|
||||
sa.Column('uuid', sa.String(length=32), nullable=False),
|
||||
sa.Column('full_name', sa.String(length=100), nullable=False),
|
||||
sa.Column('first_name', sa.String(length=50), nullable=True),
|
||||
sa.Column('middle_name', sa.String(length=50), nullable=True),
|
||||
sa.Column('last_name', sa.String(length=50), nullable=True),
|
||||
sa.PrimaryKeyConstraint('uuid', name=op.f('pk_person'))
|
||||
)
|
||||
|
||||
# user
|
||||
op.add_column('user', sa.Column('person_uuid', sa.String(length=32), nullable=True))
|
||||
op.create_foreign_key(op.f('fk_user_person_uuid_person'), 'user', 'person', ['person_uuid'], ['uuid'])
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
|
||||
# user
|
||||
op.drop_constraint(op.f('fk_user_person_uuid_person'), 'user', type_='foreignkey')
|
||||
op.drop_column('user', 'person_uuid')
|
||||
|
||||
# person
|
||||
op.drop_table('person')
|
|
@ -0,0 +1,73 @@
|
|||
"""add users, roles
|
||||
|
||||
Revision ID: d686f7abe3e0
|
||||
Revises: fc3a3bcaa069
|
||||
Create Date: 2024-07-14 13:27:22.703093
|
||||
|
||||
"""
|
||||
from typing import Sequence, Union
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision: str = 'd686f7abe3e0'
|
||||
down_revision: Union[str, None] = 'fc3a3bcaa069'
|
||||
branch_labels: Union[str, Sequence[str], None] = None
|
||||
depends_on: Union[str, Sequence[str], None] = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
|
||||
# role
|
||||
op.create_table('role',
|
||||
sa.Column('uuid', sa.String(length=32), nullable=False),
|
||||
sa.Column('name', sa.String(length=100), nullable=False),
|
||||
sa.Column('notes', sa.Text(), nullable=True),
|
||||
sa.PrimaryKeyConstraint('uuid'),
|
||||
sa.UniqueConstraint('name', name=op.f('uq_role_name'))
|
||||
)
|
||||
|
||||
# user
|
||||
op.create_table('user',
|
||||
sa.Column('uuid', sa.String(length=32), nullable=False),
|
||||
sa.Column('username', sa.String(length=25), nullable=False),
|
||||
sa.Column('password', sa.String(length=60), nullable=True),
|
||||
sa.Column('active', sa.Boolean(), nullable=False),
|
||||
sa.PrimaryKeyConstraint('uuid'),
|
||||
sa.UniqueConstraint('username', name=op.f('uq_user_username'))
|
||||
)
|
||||
|
||||
# permission
|
||||
op.create_table('permission',
|
||||
sa.Column('role_uuid', sa.String(length=32), nullable=False),
|
||||
sa.Column('permission', sa.String(length=254), nullable=False),
|
||||
sa.ForeignKeyConstraint(['role_uuid'], ['role.uuid'], name=op.f('fk_permission_role_uuid_role')),
|
||||
sa.PrimaryKeyConstraint('role_uuid', 'permission')
|
||||
)
|
||||
|
||||
# user_x_role
|
||||
op.create_table('user_x_role',
|
||||
sa.Column('uuid', sa.String(length=32), nullable=False),
|
||||
sa.Column('user_uuid', sa.String(length=32), nullable=False),
|
||||
sa.Column('role_uuid', sa.String(length=32), nullable=False),
|
||||
sa.ForeignKeyConstraint(['role_uuid'], ['role.uuid'], name=op.f('fk_user_x_role_role_uuid_role')),
|
||||
sa.ForeignKeyConstraint(['user_uuid'], ['user.uuid'], name=op.f('fk_user_x_role_user_uuid_user')),
|
||||
sa.PrimaryKeyConstraint('uuid')
|
||||
)
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
|
||||
# user_x_role
|
||||
op.drop_table('user_x_role')
|
||||
|
||||
# permission
|
||||
op.drop_table('permission')
|
||||
|
||||
# user
|
||||
op.drop_table('user')
|
||||
|
||||
# role
|
||||
op.drop_table('role')
|
|
@ -31,11 +31,12 @@ The ``wuttjamaican.db.model`` namespace contains the following:
|
|||
* :func:`~wuttjamaican.db.model.base.uuid_fk_column()`
|
||||
* :class:`~wuttjamaican.db.model.base.Base`
|
||||
* :class:`~wuttjamaican.db.model.base.Setting`
|
||||
* :class:`~wuttjamaican.db.model.base.Person`
|
||||
* :class:`~wuttjamaican.db.model.auth.Role`
|
||||
* :class:`~wuttjamaican.db.model.auth.Permission`
|
||||
* :class:`~wuttjamaican.db.model.auth.User`
|
||||
* :class:`~wuttjamaican.db.model.auth.UserRole`
|
||||
"""
|
||||
|
||||
from .base import Base, Setting, uuid_column, uuid_fk_column
|
||||
from .base import uuid_column, uuid_fk_column, Base, Setting, Person
|
||||
from .auth import Role, Permission, User, UserRole
|
||||
|
|
|
@ -65,16 +65,10 @@ class Role(Base):
|
|||
See also :attr:`user_refs`.
|
||||
"""
|
||||
__tablename__ = 'role'
|
||||
__table_args__ = (
|
||||
sa.UniqueConstraint('name',
|
||||
# TODO
|
||||
# name='role_uq_name',
|
||||
),
|
||||
)
|
||||
|
||||
uuid = uuid_column()
|
||||
|
||||
name = sa.Column(sa.String(length=100), nullable=False, doc="""
|
||||
name = sa.Column(sa.String(length=100), nullable=False, unique=True, doc="""
|
||||
Name for the role. Each role must have a name, which must be
|
||||
unique.
|
||||
""")
|
||||
|
@ -86,6 +80,8 @@ class Role(Base):
|
|||
permission_refs = orm.relationship(
|
||||
'Permission',
|
||||
back_populates='role',
|
||||
cascade='all, delete-orphan',
|
||||
cascade_backrefs=False,
|
||||
doc="""
|
||||
List of :class:`Permission` references for the role.
|
||||
|
||||
|
@ -101,10 +97,9 @@ class Role(Base):
|
|||
|
||||
user_refs = orm.relationship(
|
||||
'UserRole',
|
||||
# TODO
|
||||
# cascade='all, delete-orphan',
|
||||
# cascade_backrefs=False,
|
||||
back_populates='role',
|
||||
cascade='all, delete-orphan',
|
||||
cascade_backrefs=False,
|
||||
doc="""
|
||||
List of :class:`UserRole` instances belonging to the role.
|
||||
|
||||
|
@ -127,17 +122,12 @@ class Permission(Base):
|
|||
Represents a permission granted to a role.
|
||||
"""
|
||||
__tablename__ = 'permission'
|
||||
__table_args__ = (
|
||||
sa.ForeignKeyConstraint(['role_uuid'], ['role.uuid'],
|
||||
# TODO
|
||||
# name='permission_fk_role',
|
||||
),
|
||||
)
|
||||
|
||||
role_uuid = uuid_fk_column(primary_key=True, nullable=False)
|
||||
role_uuid = uuid_fk_column('role.uuid', primary_key=True, nullable=False)
|
||||
role = orm.relationship(
|
||||
Role,
|
||||
back_populates='permission_refs',
|
||||
cascade_backrefs=False,
|
||||
doc="""
|
||||
Reference to the :class:`Role` for which the permission is
|
||||
granted.
|
||||
|
@ -157,18 +147,18 @@ class User(Base):
|
|||
|
||||
This may or may not correspond to a real person, i.e. some users
|
||||
may exist solely for automated tasks.
|
||||
|
||||
.. attribute:: roles
|
||||
|
||||
List of :class:`Role` instances to which the user belongs.
|
||||
|
||||
See also :attr:`role_refs`.
|
||||
"""
|
||||
__tablename__ = 'user'
|
||||
__table_args__ = (
|
||||
sa.UniqueConstraint('username',
|
||||
# TODO
|
||||
# name='user_uq_username',
|
||||
),
|
||||
)
|
||||
|
||||
uuid = uuid_column()
|
||||
|
||||
username = sa.Column(sa.String(length=25), nullable=False, doc="""
|
||||
username = sa.Column(sa.String(length=25), nullable=False, unique=True, doc="""
|
||||
Account username. This is required and must be unique.
|
||||
""")
|
||||
|
||||
|
@ -176,6 +166,18 @@ class User(Base):
|
|||
Hashed password for login. (The raw password is not stored.)
|
||||
""")
|
||||
|
||||
person_uuid = uuid_fk_column('person.uuid', nullable=True)
|
||||
person = orm.relationship(
|
||||
'Person',
|
||||
# TODO: seems like this is not needed?
|
||||
# uselist=False,
|
||||
back_populates='users',
|
||||
cascade_backrefs=False,
|
||||
doc="""
|
||||
Reference to the :class:`~wuttjamaican.db.model.base.Person`
|
||||
whose user account this is.
|
||||
""")
|
||||
|
||||
active = sa.Column(sa.Boolean(), nullable=False, default=True, doc="""
|
||||
Flag indicating whether the user account is "active" - it is
|
||||
``True`` by default.
|
||||
|
@ -186,44 +188,53 @@ class User(Base):
|
|||
role_refs = orm.relationship(
|
||||
'UserRole',
|
||||
back_populates='user',
|
||||
cascade_backrefs=False,
|
||||
# TODO
|
||||
# cascade='all, delete-orphan',
|
||||
doc="""
|
||||
List of :class:`UserRole` records.
|
||||
List of :class:`UserRole` instances belonging to the user.
|
||||
|
||||
See also :attr:`roles`.
|
||||
""")
|
||||
|
||||
roles = association_proxy(
|
||||
'role_refs', 'role',
|
||||
creator=lambda r: UserRole(role=r),
|
||||
# TODO
|
||||
# getset_factory=getset_factory,
|
||||
)
|
||||
|
||||
def __str__(self):
|
||||
if self.person:
|
||||
name = str(self.person)
|
||||
if name:
|
||||
return name
|
||||
return self.username or ""
|
||||
|
||||
|
||||
class UserRole(Base):
|
||||
"""
|
||||
Represents the association between a user and a role.
|
||||
Represents the association between a user and a role; i.e. the
|
||||
user "belongs" or "is assigned" to the role.
|
||||
"""
|
||||
__tablename__ = 'user_x_role'
|
||||
__table_args__ = (
|
||||
sa.ForeignKeyConstraint(['user_uuid'], ['user.uuid'],
|
||||
# TODO
|
||||
# name='user_x_role_fk_user',
|
||||
),
|
||||
sa.ForeignKeyConstraint(['role_uuid'], ['role.uuid'],
|
||||
# TODO
|
||||
# name='user_x_role_fk_role',
|
||||
),
|
||||
)
|
||||
|
||||
uuid = uuid_column()
|
||||
|
||||
user_uuid = uuid_fk_column(nullable=False)
|
||||
user_uuid = uuid_fk_column('user.uuid', nullable=False)
|
||||
user = orm.relationship(
|
||||
User,
|
||||
back_populates='role_refs',
|
||||
cascade_backrefs=False,
|
||||
doc="""
|
||||
Reference to the :class:`User` involved.
|
||||
""")
|
||||
|
||||
role_uuid = uuid_fk_column(nullable=False)
|
||||
role_uuid = uuid_fk_column('role.uuid', nullable=False)
|
||||
role = orm.relationship(
|
||||
Role,
|
||||
back_populates='user_refs',
|
||||
cascade_backrefs=False,
|
||||
doc="""
|
||||
Reference to the :class:`Role` involved.
|
||||
""")
|
||||
|
|
|
@ -34,7 +34,19 @@ from sqlalchemy import orm
|
|||
from wuttjamaican.util import make_uuid
|
||||
|
||||
|
||||
Base = orm.declarative_base()
|
||||
# nb. this convention comes from upstream docs
|
||||
# https://docs.sqlalchemy.org/en/14/core/constraints.html#constraint-naming-conventions
|
||||
naming_convention = {
|
||||
'ix': 'ix_%(column_0_label)s',
|
||||
'uq': 'uq_%(table_name)s_%(column_0_name)s',
|
||||
'ck': 'ck_%(table_name)s_%(constraint_name)s',
|
||||
'fk': 'fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s',
|
||||
'pk': 'pk_%(table_name)s',
|
||||
}
|
||||
|
||||
metadata = sa.MetaData(naming_convention=naming_convention)
|
||||
|
||||
Base = orm.declarative_base(metadata=metadata)
|
||||
|
||||
|
||||
def uuid_column(*args, **kwargs):
|
||||
|
@ -47,11 +59,14 @@ def uuid_column(*args, **kwargs):
|
|||
return sa.Column(sa.String(length=32), *args, **kwargs)
|
||||
|
||||
|
||||
def uuid_fk_column(*args, **kwargs):
|
||||
def uuid_fk_column(target_column, *args, **kwargs):
|
||||
"""
|
||||
Returns a UUID column for use as a foreign key to another table.
|
||||
|
||||
:param target_column: Name of the table column on the remote side,
|
||||
e.g. ``'user.uuid'``.
|
||||
"""
|
||||
return sa.Column(sa.String(length=32), *args, **kwargs)
|
||||
return sa.Column(sa.String(length=32), sa.ForeignKey(target_column), *args, **kwargs)
|
||||
|
||||
|
||||
class Setting(Base):
|
||||
|
@ -70,3 +85,77 @@ class Setting(Base):
|
|||
|
||||
def __str__(self):
|
||||
return self.name or ""
|
||||
|
||||
|
||||
class Person(Base):
|
||||
"""
|
||||
Represents a person.
|
||||
|
||||
The use for this table in the base framework, is to associate with
|
||||
a :class:`~wuttjamaican.db.model.auth.User` to provide first and
|
||||
last name etc. (However a user does not have to be associated
|
||||
with any person.)
|
||||
|
||||
But this table could also be used as a basis for a Customer or
|
||||
Employee relationship etc.
|
||||
"""
|
||||
__tablename__ = 'person'
|
||||
|
||||
uuid = uuid_column()
|
||||
|
||||
full_name = sa.Column(sa.String(length=100), nullable=False, doc="""
|
||||
Full name for the person. Note that this is *required*.
|
||||
""")
|
||||
|
||||
first_name = sa.Column(sa.String(length=50), nullable=True, doc="""
|
||||
The person's first name.
|
||||
""")
|
||||
|
||||
middle_name = sa.Column(sa.String(length=50), nullable=True, doc="""
|
||||
The person's middle name or initial.
|
||||
""")
|
||||
|
||||
last_name = sa.Column(sa.String(length=50), nullable=True, doc="""
|
||||
The person's last name.
|
||||
""")
|
||||
|
||||
users = orm.relationship(
|
||||
'User',
|
||||
back_populates='person',
|
||||
cascade_backrefs=False,
|
||||
doc="""
|
||||
List of :class:`~wuttjamaican.db.model.auth.User` accounts for
|
||||
the person. Typically there is only one user account per
|
||||
person, but technically multiple are supported.
|
||||
""")
|
||||
|
||||
def __str__(self):
|
||||
return self.full_name or ""
|
||||
|
||||
@property
|
||||
def user(self):
|
||||
"""
|
||||
Reference to the "first"
|
||||
:class:`~wuttjamaican.db.model.auth.User` account for the
|
||||
person, or ``None``.
|
||||
|
||||
.. warning::
|
||||
|
||||
Note that the database schema supports multiple users per
|
||||
person, but this property logic ignores that and will only
|
||||
ever return "one or none". That might be fine in 99% of
|
||||
cases, but if multiple accounts exist for a person, the one
|
||||
returned is indeterminate.
|
||||
|
||||
See :attr:`users` to access the full list.
|
||||
"""
|
||||
|
||||
# TODO: i'm not crazy about the ambiguity here re: number of
|
||||
# user accounts a person may have. in particular it's not
|
||||
# clear *which* user account would be returned, as there is no
|
||||
# sequence ordinal defined etc. a better approach might be to
|
||||
# force callers to assume the possibility of multiple
|
||||
# user accounts per person? (if so, remove this property)
|
||||
|
||||
if self.users:
|
||||
return self.users[0]
|
||||
|
|
66
src/wuttjamaican/people.py
Normal file
66
src/wuttjamaican/people.py
Normal file
|
@ -0,0 +1,66 @@
|
|||
# -*- coding: utf-8; -*-
|
||||
################################################################################
|
||||
#
|
||||
# WuttJamaican -- Base package for Wutta Framework
|
||||
# Copyright © 2023-2024 Lance Edgar
|
||||
#
|
||||
# This file is part of Wutta Framework.
|
||||
#
|
||||
# Wutta Framework is free software: you can redistribute it and/or modify it
|
||||
# under the terms of the GNU General Public License as published by the Free
|
||||
# Software Foundation, either version 3 of the License, or (at your option) any
|
||||
# later version.
|
||||
#
|
||||
# Wutta Framework is distributed in the hope that it will be useful, but
|
||||
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
|
||||
# more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License along with
|
||||
# Wutta Framework. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
################################################################################
|
||||
"""
|
||||
People Handler
|
||||
|
||||
This is a :term:`handler` to manage "people" in the DB.
|
||||
"""
|
||||
|
||||
from wuttjamaican.app import GenericHandler
|
||||
|
||||
|
||||
class PeopleHandler(GenericHandler):
|
||||
"""
|
||||
Base class and default implementation for the "people"
|
||||
:term:`handler`.
|
||||
|
||||
This is responsible for managing
|
||||
:class:`~wuttjamaican.db.model.base.Person` records, and related
|
||||
things.
|
||||
"""
|
||||
|
||||
def get_person(self, obj, **kwargs):
|
||||
"""
|
||||
Return the :class:`~wuttjamaican.db.model.base.Person`
|
||||
associated with the given object, if one can be found.
|
||||
|
||||
This method should accept "any" type of ``obj`` and inspect it
|
||||
to determine if/how a person can be found. It should return
|
||||
the "first, most obvious" person in the event that the object
|
||||
is associated with multiple people.
|
||||
|
||||
This is a rather fundamental method, in that it is called by
|
||||
several other methods, both within this handler as well as
|
||||
others. There is also a shortcut to it, accessible via
|
||||
:meth:`wuttjamaican.app.AppHandler.get_person()`.
|
||||
"""
|
||||
model = self.app.model
|
||||
|
||||
if isinstance(obj, model.Person):
|
||||
person = obj
|
||||
return person
|
||||
|
||||
elif isinstance(obj, model.User):
|
||||
user = obj
|
||||
if user.person:
|
||||
return user.person
|
|
@ -5,6 +5,7 @@ from unittest import TestCase
|
|||
try:
|
||||
import sqlalchemy as sa
|
||||
from wuttjamaican.db.model import auth as model
|
||||
from wuttjamaican.db.model.base import Person
|
||||
except ImportError:
|
||||
pass
|
||||
else:
|
||||
|
@ -29,8 +30,16 @@ else:
|
|||
|
||||
class TestUser(TestCase):
|
||||
|
||||
def test_basic(self):
|
||||
def test_str(self):
|
||||
user = model.User()
|
||||
self.assertEqual(str(user), "")
|
||||
user.username = 'barney'
|
||||
self.assertEqual(str(user), "barney")
|
||||
|
||||
def test_str_with_person(self):
|
||||
user = model.User()
|
||||
self.assertEqual(str(user), "")
|
||||
|
||||
person = Person(full_name="Barney Rubble")
|
||||
user.person = person
|
||||
self.assertEqual(str(user), "Barney Rubble")
|
||||
|
|
|
@ -5,6 +5,7 @@ from unittest import TestCase
|
|||
try:
|
||||
import sqlalchemy as sa
|
||||
from wuttjamaican.db.model import base as model
|
||||
from wuttjamaican.db.model.auth import User
|
||||
except ImportError:
|
||||
pass
|
||||
else:
|
||||
|
@ -20,7 +21,7 @@ else:
|
|||
class TestUUIDFKColumn(TestCase):
|
||||
|
||||
def test_basic(self):
|
||||
column = model.uuid_column()
|
||||
column = model.uuid_fk_column('foo.bar')
|
||||
self.assertIsInstance(column, sa.Column)
|
||||
self.assertIsInstance(column.type, sa.String)
|
||||
self.assertEqual(column.type.length, 32)
|
||||
|
@ -32,3 +33,19 @@ else:
|
|||
self.assertEqual(str(setting), "")
|
||||
setting.name = 'foo'
|
||||
self.assertEqual(str(setting), "foo")
|
||||
|
||||
class TestPerson(TestCase):
|
||||
|
||||
def test_basic(self):
|
||||
person = model.Person()
|
||||
self.assertEqual(str(person), "")
|
||||
person.full_name = "Barney Rubble"
|
||||
self.assertEqual(str(person), "Barney Rubble")
|
||||
|
||||
def test_users(self):
|
||||
person = model.Person()
|
||||
self.assertIsNone(person.user)
|
||||
|
||||
user = User()
|
||||
person.users.append(user)
|
||||
self.assertIs(person.user, user)
|
||||
|
|
|
@ -101,17 +101,17 @@ class TestAppHandler(TestCase):
|
|||
from wuttjamaican.db import model
|
||||
except ImportError:
|
||||
pytest.skip("test not relevant without sqlalchemy")
|
||||
else:
|
||||
self.assertNotIn('model', self.app.__dict__)
|
||||
self.assertIs(self.app.model, model)
|
||||
|
||||
self.assertNotIn('model', self.app.__dict__)
|
||||
self.assertIs(self.app.model, model)
|
||||
|
||||
def test_get_model(self):
|
||||
try:
|
||||
from wuttjamaican.db import model
|
||||
except ImportError:
|
||||
pytest.skip("test not relevant without sqlalchemy")
|
||||
else:
|
||||
self.assertIs(self.app.get_model(), model)
|
||||
|
||||
self.assertIs(self.app.get_model(), model)
|
||||
|
||||
def test_get_title(self):
|
||||
self.assertEqual(self.app.get_title(), 'WuttJamaican')
|
||||
|
@ -120,6 +120,44 @@ class TestAppHandler(TestCase):
|
|||
uuid = self.app.make_uuid()
|
||||
self.assertEqual(len(uuid), 32)
|
||||
|
||||
def test_get_session(self):
|
||||
try:
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy import orm
|
||||
except ImportError:
|
||||
pytest.skip("test not relevant without sqlalchemy")
|
||||
|
||||
model = self.app.model
|
||||
user = model.User()
|
||||
self.assertIsNone(self.app.get_session(user))
|
||||
|
||||
Session = orm.sessionmaker()
|
||||
engine = sa.create_engine('sqlite://')
|
||||
mysession = Session(bind=engine)
|
||||
mysession.add(user)
|
||||
session = self.app.get_session(user)
|
||||
self.assertIs(session, mysession)
|
||||
|
||||
def test_get_person(self):
|
||||
people = self.app.get_people_handler()
|
||||
with patch.object(people, 'get_person') as get_person:
|
||||
get_person.return_value = 'foo'
|
||||
person = self.app.get_person('bar')
|
||||
get_person.assert_called_once_with('bar')
|
||||
self.assertEqual(person, 'foo')
|
||||
|
||||
def test_get_auth_handler(self):
|
||||
from wuttjamaican.auth import AuthHandler
|
||||
|
||||
auth = self.app.get_auth_handler()
|
||||
self.assertIsInstance(auth, AuthHandler)
|
||||
|
||||
def test_get_people_handler(self):
|
||||
from wuttjamaican.people import PeopleHandler
|
||||
|
||||
people = self.app.get_people_handler()
|
||||
self.assertIsInstance(people, PeopleHandler)
|
||||
|
||||
|
||||
class TestAppProvider(TestCase):
|
||||
|
||||
|
|
|
@ -5,13 +5,353 @@ from unittest import TestCase
|
|||
from wuttjamaican import auth as mod
|
||||
from wuttjamaican.conf import WuttaConfig
|
||||
|
||||
try:
|
||||
import sqlalchemy as sa
|
||||
except ImportError:
|
||||
pass
|
||||
else:
|
||||
|
||||
class TestAuthHandler(TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.config = WuttaConfig()
|
||||
self.app = self.config.get_app()
|
||||
class TestAuthHandler(TestCase):
|
||||
|
||||
def test_basic(self):
|
||||
handler = mod.AuthHandler(self.config)
|
||||
self.assertIs(handler.app, self.app)
|
||||
def setUp(self):
|
||||
self.config = WuttaConfig()
|
||||
self.app = self.config.get_app()
|
||||
self.handler = mod.AuthHandler(self.config)
|
||||
|
||||
self.engine = sa.create_engine('sqlite://')
|
||||
self.app.model.Base.metadata.create_all(bind=self.engine)
|
||||
self.session = self.make_session()
|
||||
|
||||
def tearDown(self):
|
||||
self.session.close()
|
||||
self.app.model.Base.metadata.drop_all(bind=self.engine)
|
||||
|
||||
def make_session(self):
|
||||
return self.app.make_session(bind=self.engine)
|
||||
|
||||
def test_authenticate_user(self):
|
||||
model = self.app.model
|
||||
barney = model.User(username='barney')
|
||||
self.handler.set_user_password(barney, 'goodpass')
|
||||
self.session.add(barney)
|
||||
self.session.commit()
|
||||
|
||||
# login ok
|
||||
user = self.handler.authenticate_user(self.session, 'barney', 'goodpass')
|
||||
self.assertIs(user, barney)
|
||||
|
||||
# can also pass user instead of username
|
||||
user = self.handler.authenticate_user(self.session, barney, 'goodpass')
|
||||
self.assertIs(user, barney)
|
||||
|
||||
# bad password
|
||||
user = self.handler.authenticate_user(self.session, 'barney', 'BADPASS')
|
||||
self.assertIsNone(user)
|
||||
|
||||
# bad username
|
||||
user = self.handler.authenticate_user(self.session, 'NOBODY', 'goodpass')
|
||||
self.assertIsNone(user)
|
||||
|
||||
# inactive user
|
||||
user = self.handler.authenticate_user(self.session, 'barney', 'goodpass')
|
||||
self.assertIs(user, barney)
|
||||
barney.active = False
|
||||
user = self.handler.authenticate_user(self.session, 'barney', 'goodpass')
|
||||
self.assertIsNone(user)
|
||||
|
||||
def test_get_role(self):
|
||||
model = self.app.model
|
||||
myrole = model.Role(name="My Role")
|
||||
self.session.add(myrole)
|
||||
self.session.commit()
|
||||
|
||||
# empty key is ignored
|
||||
role = self.handler.get_role(self.session, None)
|
||||
self.assertIsNone(role)
|
||||
|
||||
# key may be uuid
|
||||
role = self.handler.get_role(self.session, myrole.uuid)
|
||||
self.assertIs(role, myrole)
|
||||
|
||||
# key may be name
|
||||
role = self.handler.get_role(self.session, myrole.name)
|
||||
self.assertIs(role, myrole)
|
||||
|
||||
# key may be represented within a setting
|
||||
self.config.usedb = True
|
||||
role = self.handler.get_role(self.session, 'mykey')
|
||||
self.assertIsNone(role)
|
||||
setting = model.Setting(name='wutta.role.mykey', value=myrole.uuid)
|
||||
self.session.add(setting)
|
||||
self.session.commit()
|
||||
role = self.handler.get_role(self.session, 'mykey')
|
||||
self.assertIs(role, myrole)
|
||||
|
||||
def test_get_user(self):
|
||||
model = self.app.model
|
||||
myuser = model.User(username='myuser')
|
||||
self.session.add(myuser)
|
||||
self.session.commit()
|
||||
|
||||
# empty obj is ignored
|
||||
user = self.handler.get_user(None)
|
||||
self.assertIsNone(user)
|
||||
|
||||
# user is returned as-is
|
||||
user = self.handler.get_user(myuser)
|
||||
self.assertIs(user, myuser)
|
||||
|
||||
# find user from person
|
||||
myperson = model.Person(full_name='My Name')
|
||||
self.session.add(myperson)
|
||||
user.person = myperson
|
||||
self.session.commit()
|
||||
user = self.handler.get_user(myperson)
|
||||
self.assertIs(user, myuser)
|
||||
|
||||
def test_make_user(self):
|
||||
model = self.app.model
|
||||
|
||||
# empty user
|
||||
user = self.handler.make_user()
|
||||
self.assertIsInstance(user, model.User)
|
||||
self.assertIsNone(user.username)
|
||||
|
||||
# user is added to session
|
||||
user = self.handler.make_user(session=self.session)
|
||||
self.assertIn(user, self.session)
|
||||
self.session.rollback()
|
||||
self.assertNotIn(user, self.session)
|
||||
|
||||
# default username
|
||||
# nb. this behavior requires a session
|
||||
user = self.handler.make_user(session=self.session)
|
||||
self.assertEqual(user.username, 'newuser')
|
||||
|
||||
def test_delete_user(self):
|
||||
model = self.app.model
|
||||
|
||||
# basics
|
||||
myuser = model.User(username='myuser')
|
||||
self.session.add(myuser)
|
||||
self.session.commit()
|
||||
user = self.session.query(model.User).one()
|
||||
self.assertIs(user, myuser)
|
||||
self.handler.delete_user(user)
|
||||
self.session.commit()
|
||||
self.assertEqual(self.session.query(model.User).count(), 0)
|
||||
|
||||
def test_make_preferred_username(self):
|
||||
model = self.app.model
|
||||
|
||||
# default
|
||||
name = self.handler.make_preferred_username(self.session)
|
||||
self.assertEqual(name, 'newuser')
|
||||
|
||||
# person/first+last
|
||||
person = model.Person(first_name='Barney', last_name='Rubble')
|
||||
name = self.handler.make_preferred_username(self.session, person=person)
|
||||
self.assertEqual(name, 'barney.rubble')
|
||||
|
||||
# person/first
|
||||
person = model.Person(first_name='Barney')
|
||||
name = self.handler.make_preferred_username(self.session, person=person)
|
||||
self.assertEqual(name, 'barney')
|
||||
|
||||
# person/last
|
||||
person = model.Person(last_name='Rubble')
|
||||
name = self.handler.make_preferred_username(self.session, person=person)
|
||||
self.assertEqual(name, 'rubble')
|
||||
|
||||
def test_make_unique_username(self):
|
||||
model = self.app.model
|
||||
|
||||
# default
|
||||
name = self.handler.make_unique_username(self.session)
|
||||
self.assertEqual(name, 'newuser')
|
||||
user = model.User(username=name)
|
||||
self.session.add(user)
|
||||
self.session.commit()
|
||||
|
||||
# counter invoked if name exists
|
||||
name = self.handler.make_unique_username(self.session)
|
||||
self.assertEqual(name, 'newuser01')
|
||||
user = model.User(username=name)
|
||||
self.session.add(user)
|
||||
self.session.commit()
|
||||
|
||||
# starts by getting preferred name
|
||||
person = model.Person(first_name='Barney', last_name='Rubble')
|
||||
name = self.handler.make_unique_username(self.session, person=person)
|
||||
self.assertEqual(name, 'barney.rubble')
|
||||
user = model.User(username=name)
|
||||
self.session.add(user)
|
||||
self.session.commit()
|
||||
|
||||
# counter invoked if name exists
|
||||
name = self.handler.make_unique_username(self.session, person=person)
|
||||
self.assertEqual(name, 'barney.rubble01')
|
||||
|
||||
def test_set_user_password(self):
|
||||
model = self.app.model
|
||||
myuser = model.User(username='myuser')
|
||||
self.session.add(myuser)
|
||||
|
||||
# basics
|
||||
self.assertIsNone(myuser.password)
|
||||
self.handler.set_user_password(myuser, 'goodpass')
|
||||
self.session.commit()
|
||||
self.assertIsNotNone(myuser.password)
|
||||
# nb. password is hashed
|
||||
self.assertNotEqual(myuser.password, 'goodpass')
|
||||
|
||||
# confirm login works with new password
|
||||
user = self.handler.authenticate_user(self.session, 'myuser', 'goodpass')
|
||||
self.assertIs(user, myuser)
|
||||
|
||||
def test_get_role_administrator(self):
|
||||
model = self.app.model
|
||||
|
||||
self.assertEqual(self.session.query(model.Role).count(), 0)
|
||||
role = self.handler.get_role_administrator(self.session)
|
||||
self.assertEqual(self.session.query(model.Role).count(), 1)
|
||||
self.assertEqual(role.name, "Administrator")
|
||||
|
||||
def test_get_role_anonymous(self):
|
||||
model = self.app.model
|
||||
|
||||
self.assertEqual(self.session.query(model.Role).count(), 0)
|
||||
role = self.handler.get_role_anonymous(self.session)
|
||||
self.assertEqual(self.session.query(model.Role).count(), 1)
|
||||
self.assertEqual(role.name, "Anonymous")
|
||||
|
||||
def test_get_role_authenticated(self):
|
||||
model = self.app.model
|
||||
|
||||
self.assertEqual(self.session.query(model.Role).count(), 0)
|
||||
role = self.handler.get_role_authenticated(self.session)
|
||||
self.assertEqual(self.session.query(model.Role).count(), 1)
|
||||
self.assertEqual(role.name, "Authenticated")
|
||||
|
||||
def test_get_permissions(self):
|
||||
model = self.app.model
|
||||
|
||||
# empty default for role
|
||||
role = model.Role()
|
||||
perms = self.handler.get_permissions(self.session, role)
|
||||
self.assertIsInstance(perms, set)
|
||||
self.assertEqual(len(perms), 0)
|
||||
|
||||
# empty default for user
|
||||
user = model.User()
|
||||
perms = self.handler.get_permissions(self.session, user)
|
||||
self.assertIsInstance(perms, set)
|
||||
self.assertEqual(len(perms), 0)
|
||||
|
||||
# role perms
|
||||
myrole = model.Role(name='My Role')
|
||||
self.session.add(myrole)
|
||||
self.handler.grant_permission(myrole, 'foo')
|
||||
self.session.commit()
|
||||
perms = self.handler.get_permissions(self.session, myrole)
|
||||
self.assertEqual(perms, {'foo'})
|
||||
|
||||
# user perms
|
||||
myuser = model.User(username='myuser')
|
||||
self.session.add(myuser)
|
||||
self.session.commit()
|
||||
perms = self.handler.get_permissions(self.session, myuser)
|
||||
self.assertEqual(len(perms), 0)
|
||||
myuser.roles.append(myrole)
|
||||
self.session.commit()
|
||||
perms = self.handler.get_permissions(self.session, myuser)
|
||||
self.assertEqual(perms, {'foo'})
|
||||
|
||||
# invalid principal
|
||||
perms = self.handler.get_permissions(self.session, RuntimeError)
|
||||
self.assertEqual(perms, set())
|
||||
|
||||
# missing principal
|
||||
perms = self.handler.get_permissions(self.session, None)
|
||||
self.assertEqual(perms, set())
|
||||
|
||||
def test_has_permission(self):
|
||||
model = self.app.model
|
||||
|
||||
# false default for role
|
||||
role = model.Role()
|
||||
self.assertFalse(self.handler.has_permission(self.session, role, 'foo'))
|
||||
|
||||
# empty default for user
|
||||
user = model.User()
|
||||
self.assertFalse(self.handler.has_permission(self.session, user, 'foo'))
|
||||
|
||||
# role perms
|
||||
myrole = model.Role(name='My Role')
|
||||
self.session.add(myrole)
|
||||
self.session.commit()
|
||||
self.assertFalse(self.handler.has_permission(self.session, myrole, 'foo'))
|
||||
self.handler.grant_permission(myrole, 'foo')
|
||||
self.session.commit()
|
||||
self.assertTrue(self.handler.has_permission(self.session, myrole, 'foo'))
|
||||
|
||||
# user perms
|
||||
myuser = model.User(username='myuser')
|
||||
self.session.add(myuser)
|
||||
self.session.commit()
|
||||
self.assertFalse(self.handler.has_permission(self.session, myuser, 'foo'))
|
||||
myuser.roles.append(myrole)
|
||||
self.session.commit()
|
||||
self.assertTrue(self.handler.has_permission(self.session, myuser, 'foo'))
|
||||
|
||||
# invalid principal
|
||||
self.assertFalse(self.handler.has_permission(self.session, RuntimeError, 'foo'))
|
||||
|
||||
# missing principal
|
||||
self.assertFalse(self.handler.has_permission(self.session, None, 'foo'))
|
||||
|
||||
def test_grant_permission(self):
|
||||
model = self.app.model
|
||||
myrole = model.Role(name='My Role')
|
||||
self.session.add(myrole)
|
||||
self.session.commit()
|
||||
|
||||
# no perms yet
|
||||
self.assertEqual(self.session.query(model.Permission).count(), 0)
|
||||
|
||||
# grant one perm, and confirm
|
||||
self.handler.grant_permission(myrole, 'foo')
|
||||
self.session.commit()
|
||||
self.assertEqual(self.session.query(model.Permission).count(), 1)
|
||||
perm = self.session.query(model.Permission).one()
|
||||
self.assertIs(perm.role, myrole)
|
||||
self.assertEqual(perm.permission, 'foo')
|
||||
|
||||
# grant same perm again, confirm just one exists
|
||||
self.handler.grant_permission(myrole, 'foo')
|
||||
self.session.commit()
|
||||
self.assertEqual(self.session.query(model.Permission).count(), 1)
|
||||
perm = self.session.query(model.Permission).one()
|
||||
self.assertIs(perm.role, myrole)
|
||||
self.assertEqual(perm.permission, 'foo')
|
||||
|
||||
def test_revoke_permission(self):
|
||||
model = self.app.model
|
||||
myrole = model.Role(name='My Role')
|
||||
self.session.add(myrole)
|
||||
self.handler.grant_permission(myrole, 'foo')
|
||||
self.session.commit()
|
||||
|
||||
# just the one perm
|
||||
self.assertEqual(self.session.query(model.Permission).count(), 1)
|
||||
|
||||
# revoke it, then confirm
|
||||
self.handler.revoke_permission(myrole, 'foo')
|
||||
self.session.commit()
|
||||
self.assertEqual(self.session.query(model.Permission).count(), 0)
|
||||
|
||||
# revoke again, confirm
|
||||
self.handler.revoke_permission(myrole, 'foo')
|
||||
self.session.commit()
|
||||
self.assertEqual(self.session.query(model.Permission).count(), 0)
|
||||
|
|
52
tests/test_people.py
Normal file
52
tests/test_people.py
Normal file
|
@ -0,0 +1,52 @@
|
|||
# -*- coding: utf-8; -*-
|
||||
|
||||
from unittest import TestCase
|
||||
|
||||
from wuttjamaican import people as mod
|
||||
from wuttjamaican.conf import WuttaConfig
|
||||
|
||||
try:
|
||||
import sqlalchemy as sa
|
||||
except ImportError:
|
||||
pass
|
||||
else:
|
||||
|
||||
|
||||
class TestPeopleHandler(TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.config = WuttaConfig()
|
||||
self.app = self.config.get_app()
|
||||
self.handler = mod.PeopleHandler(self.config)
|
||||
|
||||
self.engine = sa.create_engine('sqlite://')
|
||||
self.app.model.Base.metadata.create_all(bind=self.engine)
|
||||
self.session = self.make_session()
|
||||
|
||||
def tearDown(self):
|
||||
self.session.close()
|
||||
self.app.model.Base.metadata.drop_all(bind=self.engine)
|
||||
|
||||
def make_session(self):
|
||||
return self.app.make_session(bind=self.engine)
|
||||
|
||||
def test_get_person(self):
|
||||
model = self.app.model
|
||||
myperson = model.Person(full_name='Barny Rubble')
|
||||
self.session.add(myperson)
|
||||
self.session.commit()
|
||||
|
||||
# empty obj is ignored
|
||||
person = self.handler.get_person(None)
|
||||
self.assertIsNone(person)
|
||||
|
||||
# person is returned as-is
|
||||
person = self.handler.get_person(myperson)
|
||||
self.assertIs(person, myperson)
|
||||
|
||||
# find person from user
|
||||
myuser = model.User(username='barney', person=myperson)
|
||||
self.session.add(myuser)
|
||||
self.session.commit()
|
||||
person = self.handler.get_person(myuser)
|
||||
self.assertIs(person, myperson)
|
Loading…
Reference in a new issue