3
0
Fork 0

Compare commits

...

4 commits

Author SHA1 Message Date
Lance Edgar ca997807e4 bump: version 0.7.0 → 0.8.0 2024-07-14 23:22:31 -05:00
Lance Edgar e899d06151 feat: flesh out the auth handler; add people handler
can handle the basics now: authentication, perm checks etc.
2024-07-14 23:22:11 -05:00
Lance Edgar 43ca404837 feat: add model for Person; tie to User 2024-07-14 15:47:39 -05:00
Lance Edgar 60d3fcd13b fix: add migration for auth tables
having now fixed the constraint naming convention
2024-07-14 14:45:52 -05:00
18 changed files with 1322 additions and 61 deletions

View file

@ -5,6 +5,17 @@ All notable changes to WuttJamaican will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).
## v0.8.0 (2024-07-14)
### Feat
- flesh out the auth handler; add people handler
- add model for Person; tie to User
### Fix
- add migration for auth tables
## v0.7.0 (2024-07-14)
### Feat

View file

@ -17,5 +17,6 @@
db.model.base
db.sess
exc
people
testing
util

View file

@ -0,0 +1,6 @@
``wuttjamaican.people``
=======================
.. automodule:: wuttjamaican.people
:members:

View file

@ -109,6 +109,13 @@ Glossary
Most :term:`apps<app>` will have at least one :term:`app
database`.
db session
The "session" is a SQLAlchemy abstraction for an open database
connection, essentially.
In practice this generally refers to a
:class:`~wuttjamaican.db.sess.Session` instance.
entry point
This refers to a "setuptools-style" entry point specifically,
which is a mechanism used to register "plugins" and the like.

View file

@ -6,7 +6,7 @@ build-backend = "hatchling.build"
[project]
name = "WuttJamaican"
version = "0.7.0"
version = "0.8.0"
description = "Base package for Wutta Framework"
readme = "README.md"
authors = [{name = "Lance Edgar", email = "lance@edbob.org"}]
@ -32,7 +32,7 @@ dependencies = [
[project.optional-dependencies]
db = ["SQLAlchemy<2", "alembic"]
db = ["SQLAlchemy<2", "alembic", "passlib"]
docs = ["Sphinx", "sphinxcontrib-programoutput", "furo"]
tests = ["pytest-cov", "tox"]

View file

@ -66,6 +66,8 @@ class AppHandler:
"""
default_app_title = "WuttJamaican"
default_model_spec = 'wuttjamaican.db.model'
default_auth_handler_spec = 'wuttjamaican.auth:AuthHandler'
default_people_handler_spec = 'wuttjamaican.people:PeopleHandler'
def __init__(self, config):
self.config = config
@ -241,6 +243,16 @@ class AppHandler:
"""
return make_uuid()
def get_session(self, obj):
"""
Returns the SQLAlchemy session with which the given object is
associated. Simple convenience wrapper around
:func:`sqlalchemy:sqlalchemy.orm.object_session()`.
"""
from sqlalchemy import orm
return orm.object_session(obj)
def short_session(self, **kwargs):
"""
Returns a context manager for a short-lived database session.
@ -279,6 +291,51 @@ class AppHandler:
return get_setting(session, name)
##############################
# getters for other handlers
##############################
def get_auth_handler(self, **kwargs):
"""
Get the configured :term:`auth handler`.
:rtype: :class:`~wuttjamaican.auth.AuthHandler`
"""
if 'auth' not in self.handlers:
spec = self.config.get(f'{self.appname}.auth.handler',
default=self.default_auth_handler_spec)
factory = self.load_object(spec)
self.handlers['auth'] = factory(self.config, **kwargs)
return self.handlers['auth']
def get_people_handler(self, **kwargs):
"""
Get the configured "people" :term:`handler`.
:rtype: :class:`~wuttjamaican.people.PeopleHandler`
"""
if 'people' not in self.handlers:
spec = self.config.get(f'{self.appname}.people.handler',
default=self.default_people_handler_spec)
factory = self.load_object(spec)
self.handlers['people'] = factory(self.config, **kwargs)
return self.handlers['people']
##############################
# convenience delegators
##############################
def get_person(self, obj, **kwargs):
"""
Convenience method to locate a
:class:`~wuttjamaican.db.model.base.Person` for the given
object.
This delegates to the "people" handler method,
:meth:`~wuttjamaican.people.PeopleHandler.get_person()`.
"""
return self.get_people_handler().get_person(obj, **kwargs)
class AppProvider:
"""

View file

@ -29,6 +29,16 @@ This defines the default :term:`auth handler`.
from wuttjamaican.app import GenericHandler
# nb. this only works if passlib is installed (part of 'db' extra)
try:
from passlib.context import CryptContext
except ImportError: # pragma: no cover
pass
else:
password_context = CryptContext(schemes=['bcrypt'])
class AuthHandler(GenericHandler):
"""
Base class and default implementation for the :term:`auth
@ -37,8 +47,436 @@ class AuthHandler(GenericHandler):
This is responsible for "authentication and authorization" - for
instance:
* create new users, roles
* authenticate user from login credentials
* check which permissions a user/role has
* create/modify users, roles
* grant/revoke role permissions
* determine which permissions a user has
* identify user from login credentials
"""
def authenticate_user(self, session, username, password, **kwargs):
"""
Authenticate the given user credentials, and if successful,
return the :class:`~wuttjamaican.db.model.auth.User`.
Default logic will (try to) locate a user with matching
username, then confirm the supplied password is also a match.
Custom handlers can authenticate against anything else, using
the given credentials. But they still must return a "native"
``User`` object for the app to consider the authentication
successful. The handler may auto-create the user if needed.
Generally speaking the credentials will have come directly
from a user login attempt in the web app etc. Again the
default logic assumes a "username" but in practice it may be
an email address etc. - whatever the user entered.
:param session: Open :term:`db session`.
:param username: Usually a string, but also may be a
:class:`~wuttjamaican.db.model.auth.User` instance, in
which case no user lookup will occur. (However the user is
still authenticated otherwise, i.e. the password must be
correct etc.)
:param password: Password as string.
:returns: :class:`~wuttjamaican.db.model.auth.User` instance,
or ``None``.
"""
model = self.app.model
if isinstance(username, model.User):
user = username
else:
user = session.query(model.User)\
.filter_by(username=username)\
.first()
if user and user.active and user.password:
if password_context.verify(password, user.password):
return user
def get_role(self, session, key, **kwargs):
"""
Locate and return a :class:`~wuttjamaican.db.model.auth.Role`
per the given key, if possible.
:param session: Open :term:`db session`.
:param key: Value to use when searching for the role. Can be
a UUID or name of a role.
:returns: :class:`~wuttjamaican.db.model.auth.Role` instance;
or ``None``.
"""
model = self.app.model
if not key:
return
# try to match on Role.uuid
role = session.get(model.Role, key)
if role:
return role
# try to match on Role.name
role = session.query(model.Role)\
.filter_by(name=key)\
.first()
if role:
return role
# try settings; if value then recurse
key = self.config.get(f'{self.appname}.role.{key}',
session=session)
if key:
return self.get_role(session, key)
def get_user(self, obj, **kwargs):
"""
Return the :class:`~wuttjamaican.db.model.auth.User`
associated with the given object, if one can be found.
This method should accept "any" type of ``obj`` and inspect it
to determine if/how a user can be found. It should return the
"first, most obvious" user in the event that the given object
is associated with multiple users.
The default logic only knows how to navigate the Person/User
relationship, so if ``obj`` is a
:class:`~wuttjamaican.db.model.base.Person` then it will
return the "first" user account for the person (according to
:attr:`~wuttjamaican.db.model.base.Person.user`).
:returns: :class:`~wuttjamaican.db.model.auth.User` or ``None``.
"""
model = self.app.model
if isinstance(obj, model.User):
return obj
person = self.app.get_person(obj)
if person:
return person.user
def make_user(self, session=None, **kwargs):
"""
Make and return a new
:class:`~wuttjamaican.db.model.auth.User`.
This is mostly a simple wrapper around the
:class:`~wuttjamaican.db.model.auth.User` constructor. All
``kwargs`` are passed on to the constructor as-is, for
instance. It also will add the user to the session, if
applicable.
This method also adds one other convenience:
If there is no ``username`` specified in the ``kwargs`` then
it will call :meth:`make_unique_username()` to automatically
provide one. (Note that the ``kwargs`` will be passed along
to that call as well.)
:param session: Open :term:`db session`, if applicable.
:returns: The new :class:`~wuttjamaican.db.model.auth.User`
instance.
"""
model = self.app.model
if session and 'username' not in kwargs:
kwargs['username'] = self.make_unique_username(session, **kwargs)
user = model.User(**kwargs)
if session:
session.add(user)
return user
def delete_user(self, user, **kwargs):
"""
Delete the given user account. Use with caution! As this
generally cannot be undone.
Default behavior simply deletes the user account. Depending
on the DB schema and data present, this may cause an error
(i.e. if the user is still referenced by other tables).
:param user: :class:`~wuttjamaican.db.model.auth.User` to
delete.
"""
session = self.app.get_session(user)
session.delete(user)
def make_preferred_username(self, session, **kwargs):
"""
Generate a "preferred" username, using data from ``kwargs`` as
hints.
Note that ``kwargs`` should be of the same sort that might be
passed to the :class:`~wuttjamaican.db.model.auth.User`
constructor.
So far this logic is rather simple:
If ``kwargs`` contains ``person`` then a username will be
constructed using the name data from the person
(e.g. ``'john.doe'``).
In all other cases it will return ``'newuser'``.
.. note::
This method does not confirm if the username it generates
is actually "available" for a new user. See
:meth:`make_unique_username()` for that.
:param session: Open :term:`db session`.
:returns: Generated username as string.
"""
person = kwargs.get('person')
if person:
first = (person.first_name or '').strip().lower()
last = (person.last_name or '').strip().lower()
if first and last:
return f'{first}.{last}'
if first:
return first
if last:
return last
return 'newuser'
def make_unique_username(self, session, **kwargs):
"""
Generate a *unique* username, using data from ``kwargs`` as
hints.
Note that ``kwargs`` should be of the same sort that might be
passed to the :class:`~wuttjamaican.db.model.auth.User`
constructor.
This method is a convenience which does two things:
First it calls :meth:`make_preferred_username()` to obtain the
"preferred" username. (It passes all ``kwargs`` along when it
makes that call.)
Then it checks to see if the resulting username is already
taken. If it is, then a "counter" is appended to the
username, and incremented until a username can be found which
is *not* yet taken.
It returns the first "available" (hence unique) username which
is found. Note that it is considered unique and therefore
available *at the time*; however this method does not
"reserve" the username in any way. It is assumed that you
would create the user yourself once you have the username.
:param session: Open :term:`db session`.
:returns: Username as string.
"""
model = self.app.model
original_username = self.make_preferred_username(session, **kwargs)
username = original_username
# check for unique username
counter = 1
while True:
users = session.query(model.User)\
.filter(model.User.username == username)\
.count()
if not users:
break
username = f"{original_username}{counter:02d}"
counter += 1
return username
def set_user_password(self, user, password, **kwargs):
"""
Set a user's password.
This will update the
:attr:`~wuttjamaican.db.model.auth.User.password` attribute
for the user. The value will be hashed using ``bcrypt``.
:param user: :class:`~wuttjamaican.db.model.auth.User` instance.
:param password: New password in plain text.
"""
user.password = password_context.hash(password)
def get_role_administrator(self, session, **kwargs):
"""
Returns the special "Administrator" role.
"""
return self._special_role(session, 'd937fa8a965611dfa0dd001143047286', "Administrator")
def get_role_anonymous(self, session, **kwargs):
"""
Returns the special "Anonymous" (aka. "Guest") role.
"""
return self._special_role(session, 'f8a27c98965a11dfaff7001143047286', "Anonymous")
def get_role_authenticated(self, session, **kwargs):
"""
Returns the special "Authenticated" role.
"""
return self._special_role(session, 'b765a9cc331a11e6ac2a3ca9f40bc550', "Authenticated")
def get_permissions(self, session, principal,
include_anonymous=True,
include_authenticated=True):
"""
Return a set of permission names, which represents all
permissions effectively granted to the given user or role.
:param session: Open :term:`db session`.
:param principal: :class:`~wuttjamaican.db.model.auth.User` or
:class:`~wuttjamaican.db.model.auth.Role` instance. Can
also be ``None``, in which case the "Anonymous" role will
be assumed.
:param include_anonymous: Whether the "Anonymous" role should
be included when checking permissions. If ``False``, the
Anonymous permissions will *not* be checked.
:param include_authenticated: Whether the "Authenticated" role
should be included when checking permissions.
:returns: Set of permission names.
:rtype: set
"""
# we will use any `roles` attribute which may be present. in
# practice we would be assuming a User in this case
if hasattr(principal, 'roles'):
roles = [role
for role in principal.roles
if self._role_is_pertinent(role)]
# here our User assumption gets a little more explicit
if include_authenticated:
roles.append(self.get_role_authenticated(session))
# otherwise a non-null principal is assumed to be a Role
elif principal is not None:
roles = [principal]
# fallback assumption is "no roles"
else:
roles = []
# maybe include anonymous role
if include_anonymous:
roles.append(self.get_role_anonymous(session))
# build the permissions cache
cache = set()
for role in roles:
if hasattr(role, 'permissions'):
cache.update(role.permissions)
return cache
def has_permission(self, session, principal, permission,
include_anonymous=True,
include_authenticated=True):
"""
Check if the given user or role has been granted the given
permission.
.. note::
While this method is perfectly usable, it is a bit "heavy"
if you need to make multiple permission checks for the same
user. To optimize, call :meth:`get_permissions()` and keep
the result, then instead of calling ``has_permission()``
just check if a given permission is contained in the cached
result set.
(The logic just described is exactly what this method does,
except it will not keep the result set, hence calling it
multiple times for same user is not optimal.)
:param session: Open :term:`db session`.
:param principal: Either a
:class:`~wuttjamaican.db.model.auth.User` or
:class:`~wuttjamaican.db.model.auth.Role` instance. It is
also expected that this may sometimes be ``None``, in which
case the "Anonymous" role will be assumed.
:param permission: Name of the permission for which to check.
:param include_anonymous: Whether the "Anonymous" role should
be included when checking permissions. If ``False``, then
Anonymous permissions will *not* be checked.
:param include_authenticated: Whether the "Authenticated" role
should be included when checking permissions.
:returns: Boolean indicating if the permission is granted.
"""
perms = self.get_permissions(session, principal,
include_anonymous=include_anonymous,
include_authenticated=include_authenticated)
return permission in perms
def grant_permission(self, role, permission, **kwargs):
"""
Grant a permission to the role. If the role already has the
permission, nothing is done.
:param role: :class:`~wuttjamaican.db.model.auth.Role`
instance.
:param permission: Name of the permission as string.
"""
if permission not in role.permissions:
role.permissions.append(permission)
def revoke_permission(self, role, permission, **kwargs):
"""
Revoke a permission from the role. If the role does not have
the permission, nothing is done.
:param role: A :class:`~rattail.db.model.users.Role` instance.
:param permission: Name of the permission as string.
"""
if permission in role.permissions:
role.permissions.remove(permission)
##############################
# internal methods
##############################
def _role_is_pertinent(self, role):
"""
Check the role to ensure it is "pertinent" for the current app.
The idea behind this is for sake of a multi-node system, where
users and roles are synced between nodes. Some roles may be
defined for only certain types of nodes and hence not
"pertinent" for all nodes.
As of now there is no actual support for that, but this stub
method exists for when it will.
"""
return True
def _special_role(self, session, uuid, name):
"""
Fetch a "special" role, creating if needed.
"""
model = self.app.model
role = session.get(model.Role, uuid)
if not role:
role = model.Role(uuid=uuid, name=name)
session.add(role)
return role

View file

@ -0,0 +1,45 @@
"""add people
Revision ID: 3abcc44f7f91
Revises: d686f7abe3e0
Create Date: 2024-07-14 15:14:30.552682
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = '3abcc44f7f91'
down_revision: Union[str, None] = 'd686f7abe3e0'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# person
op.create_table('person',
sa.Column('uuid', sa.String(length=32), nullable=False),
sa.Column('full_name', sa.String(length=100), nullable=False),
sa.Column('first_name', sa.String(length=50), nullable=True),
sa.Column('middle_name', sa.String(length=50), nullable=True),
sa.Column('last_name', sa.String(length=50), nullable=True),
sa.PrimaryKeyConstraint('uuid', name=op.f('pk_person'))
)
# user
op.add_column('user', sa.Column('person_uuid', sa.String(length=32), nullable=True))
op.create_foreign_key(op.f('fk_user_person_uuid_person'), 'user', 'person', ['person_uuid'], ['uuid'])
def downgrade() -> None:
# user
op.drop_constraint(op.f('fk_user_person_uuid_person'), 'user', type_='foreignkey')
op.drop_column('user', 'person_uuid')
# person
op.drop_table('person')

View file

@ -0,0 +1,73 @@
"""add users, roles
Revision ID: d686f7abe3e0
Revises: fc3a3bcaa069
Create Date: 2024-07-14 13:27:22.703093
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = 'd686f7abe3e0'
down_revision: Union[str, None] = 'fc3a3bcaa069'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# role
op.create_table('role',
sa.Column('uuid', sa.String(length=32), nullable=False),
sa.Column('name', sa.String(length=100), nullable=False),
sa.Column('notes', sa.Text(), nullable=True),
sa.PrimaryKeyConstraint('uuid'),
sa.UniqueConstraint('name', name=op.f('uq_role_name'))
)
# user
op.create_table('user',
sa.Column('uuid', sa.String(length=32), nullable=False),
sa.Column('username', sa.String(length=25), nullable=False),
sa.Column('password', sa.String(length=60), nullable=True),
sa.Column('active', sa.Boolean(), nullable=False),
sa.PrimaryKeyConstraint('uuid'),
sa.UniqueConstraint('username', name=op.f('uq_user_username'))
)
# permission
op.create_table('permission',
sa.Column('role_uuid', sa.String(length=32), nullable=False),
sa.Column('permission', sa.String(length=254), nullable=False),
sa.ForeignKeyConstraint(['role_uuid'], ['role.uuid'], name=op.f('fk_permission_role_uuid_role')),
sa.PrimaryKeyConstraint('role_uuid', 'permission')
)
# user_x_role
op.create_table('user_x_role',
sa.Column('uuid', sa.String(length=32), nullable=False),
sa.Column('user_uuid', sa.String(length=32), nullable=False),
sa.Column('role_uuid', sa.String(length=32), nullable=False),
sa.ForeignKeyConstraint(['role_uuid'], ['role.uuid'], name=op.f('fk_user_x_role_role_uuid_role')),
sa.ForeignKeyConstraint(['user_uuid'], ['user.uuid'], name=op.f('fk_user_x_role_user_uuid_user')),
sa.PrimaryKeyConstraint('uuid')
)
def downgrade() -> None:
# user_x_role
op.drop_table('user_x_role')
# permission
op.drop_table('permission')
# user
op.drop_table('user')
# role
op.drop_table('role')

View file

@ -31,11 +31,12 @@ The ``wuttjamaican.db.model`` namespace contains the following:
* :func:`~wuttjamaican.db.model.base.uuid_fk_column()`
* :class:`~wuttjamaican.db.model.base.Base`
* :class:`~wuttjamaican.db.model.base.Setting`
* :class:`~wuttjamaican.db.model.base.Person`
* :class:`~wuttjamaican.db.model.auth.Role`
* :class:`~wuttjamaican.db.model.auth.Permission`
* :class:`~wuttjamaican.db.model.auth.User`
* :class:`~wuttjamaican.db.model.auth.UserRole`
"""
from .base import Base, Setting, uuid_column, uuid_fk_column
from .base import uuid_column, uuid_fk_column, Base, Setting, Person
from .auth import Role, Permission, User, UserRole

View file

@ -65,16 +65,10 @@ class Role(Base):
See also :attr:`user_refs`.
"""
__tablename__ = 'role'
__table_args__ = (
sa.UniqueConstraint('name',
# TODO
# name='role_uq_name',
),
)
uuid = uuid_column()
name = sa.Column(sa.String(length=100), nullable=False, doc="""
name = sa.Column(sa.String(length=100), nullable=False, unique=True, doc="""
Name for the role. Each role must have a name, which must be
unique.
""")
@ -86,6 +80,8 @@ class Role(Base):
permission_refs = orm.relationship(
'Permission',
back_populates='role',
cascade='all, delete-orphan',
cascade_backrefs=False,
doc="""
List of :class:`Permission` references for the role.
@ -101,10 +97,9 @@ class Role(Base):
user_refs = orm.relationship(
'UserRole',
# TODO
# cascade='all, delete-orphan',
# cascade_backrefs=False,
back_populates='role',
cascade='all, delete-orphan',
cascade_backrefs=False,
doc="""
List of :class:`UserRole` instances belonging to the role.
@ -127,17 +122,12 @@ class Permission(Base):
Represents a permission granted to a role.
"""
__tablename__ = 'permission'
__table_args__ = (
sa.ForeignKeyConstraint(['role_uuid'], ['role.uuid'],
# TODO
# name='permission_fk_role',
),
)
role_uuid = uuid_fk_column(primary_key=True, nullable=False)
role_uuid = uuid_fk_column('role.uuid', primary_key=True, nullable=False)
role = orm.relationship(
Role,
back_populates='permission_refs',
cascade_backrefs=False,
doc="""
Reference to the :class:`Role` for which the permission is
granted.
@ -157,18 +147,18 @@ class User(Base):
This may or may not correspond to a real person, i.e. some users
may exist solely for automated tasks.
.. attribute:: roles
List of :class:`Role` instances to which the user belongs.
See also :attr:`role_refs`.
"""
__tablename__ = 'user'
__table_args__ = (
sa.UniqueConstraint('username',
# TODO
# name='user_uq_username',
),
)
uuid = uuid_column()
username = sa.Column(sa.String(length=25), nullable=False, doc="""
username = sa.Column(sa.String(length=25), nullable=False, unique=True, doc="""
Account username. This is required and must be unique.
""")
@ -176,6 +166,18 @@ class User(Base):
Hashed password for login. (The raw password is not stored.)
""")
person_uuid = uuid_fk_column('person.uuid', nullable=True)
person = orm.relationship(
'Person',
# TODO: seems like this is not needed?
# uselist=False,
back_populates='users',
cascade_backrefs=False,
doc="""
Reference to the :class:`~wuttjamaican.db.model.base.Person`
whose user account this is.
""")
active = sa.Column(sa.Boolean(), nullable=False, default=True, doc="""
Flag indicating whether the user account is "active" - it is
``True`` by default.
@ -186,44 +188,53 @@ class User(Base):
role_refs = orm.relationship(
'UserRole',
back_populates='user',
cascade_backrefs=False,
# TODO
# cascade='all, delete-orphan',
doc="""
List of :class:`UserRole` records.
List of :class:`UserRole` instances belonging to the user.
See also :attr:`roles`.
""")
roles = association_proxy(
'role_refs', 'role',
creator=lambda r: UserRole(role=r),
# TODO
# getset_factory=getset_factory,
)
def __str__(self):
if self.person:
name = str(self.person)
if name:
return name
return self.username or ""
class UserRole(Base):
"""
Represents the association between a user and a role.
Represents the association between a user and a role; i.e. the
user "belongs" or "is assigned" to the role.
"""
__tablename__ = 'user_x_role'
__table_args__ = (
sa.ForeignKeyConstraint(['user_uuid'], ['user.uuid'],
# TODO
# name='user_x_role_fk_user',
),
sa.ForeignKeyConstraint(['role_uuid'], ['role.uuid'],
# TODO
# name='user_x_role_fk_role',
),
)
uuid = uuid_column()
user_uuid = uuid_fk_column(nullable=False)
user_uuid = uuid_fk_column('user.uuid', nullable=False)
user = orm.relationship(
User,
back_populates='role_refs',
cascade_backrefs=False,
doc="""
Reference to the :class:`User` involved.
""")
role_uuid = uuid_fk_column(nullable=False)
role_uuid = uuid_fk_column('role.uuid', nullable=False)
role = orm.relationship(
Role,
back_populates='user_refs',
cascade_backrefs=False,
doc="""
Reference to the :class:`Role` involved.
""")

View file

@ -34,7 +34,19 @@ from sqlalchemy import orm
from wuttjamaican.util import make_uuid
Base = orm.declarative_base()
# nb. this convention comes from upstream docs
# https://docs.sqlalchemy.org/en/14/core/constraints.html#constraint-naming-conventions
naming_convention = {
'ix': 'ix_%(column_0_label)s',
'uq': 'uq_%(table_name)s_%(column_0_name)s',
'ck': 'ck_%(table_name)s_%(constraint_name)s',
'fk': 'fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s',
'pk': 'pk_%(table_name)s',
}
metadata = sa.MetaData(naming_convention=naming_convention)
Base = orm.declarative_base(metadata=metadata)
def uuid_column(*args, **kwargs):
@ -47,11 +59,14 @@ def uuid_column(*args, **kwargs):
return sa.Column(sa.String(length=32), *args, **kwargs)
def uuid_fk_column(*args, **kwargs):
def uuid_fk_column(target_column, *args, **kwargs):
"""
Returns a UUID column for use as a foreign key to another table.
:param target_column: Name of the table column on the remote side,
e.g. ``'user.uuid'``.
"""
return sa.Column(sa.String(length=32), *args, **kwargs)
return sa.Column(sa.String(length=32), sa.ForeignKey(target_column), *args, **kwargs)
class Setting(Base):
@ -70,3 +85,77 @@ class Setting(Base):
def __str__(self):
return self.name or ""
class Person(Base):
"""
Represents a person.
The use for this table in the base framework, is to associate with
a :class:`~wuttjamaican.db.model.auth.User` to provide first and
last name etc. (However a user does not have to be associated
with any person.)
But this table could also be used as a basis for a Customer or
Employee relationship etc.
"""
__tablename__ = 'person'
uuid = uuid_column()
full_name = sa.Column(sa.String(length=100), nullable=False, doc="""
Full name for the person. Note that this is *required*.
""")
first_name = sa.Column(sa.String(length=50), nullable=True, doc="""
The person's first name.
""")
middle_name = sa.Column(sa.String(length=50), nullable=True, doc="""
The person's middle name or initial.
""")
last_name = sa.Column(sa.String(length=50), nullable=True, doc="""
The person's last name.
""")
users = orm.relationship(
'User',
back_populates='person',
cascade_backrefs=False,
doc="""
List of :class:`~wuttjamaican.db.model.auth.User` accounts for
the person. Typically there is only one user account per
person, but technically multiple are supported.
""")
def __str__(self):
return self.full_name or ""
@property
def user(self):
"""
Reference to the "first"
:class:`~wuttjamaican.db.model.auth.User` account for the
person, or ``None``.
.. warning::
Note that the database schema supports multiple users per
person, but this property logic ignores that and will only
ever return "one or none". That might be fine in 99% of
cases, but if multiple accounts exist for a person, the one
returned is indeterminate.
See :attr:`users` to access the full list.
"""
# TODO: i'm not crazy about the ambiguity here re: number of
# user accounts a person may have. in particular it's not
# clear *which* user account would be returned, as there is no
# sequence ordinal defined etc. a better approach might be to
# force callers to assume the possibility of multiple
# user accounts per person? (if so, remove this property)
if self.users:
return self.users[0]

View file

@ -0,0 +1,66 @@
# -*- coding: utf-8; -*-
################################################################################
#
# WuttJamaican -- Base package for Wutta Framework
# Copyright © 2023-2024 Lance Edgar
#
# This file is part of Wutta Framework.
#
# Wutta Framework is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# Wutta Framework is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License along with
# Wutta Framework. If not, see <http://www.gnu.org/licenses/>.
#
################################################################################
"""
People Handler
This is a :term:`handler` to manage "people" in the DB.
"""
from wuttjamaican.app import GenericHandler
class PeopleHandler(GenericHandler):
"""
Base class and default implementation for the "people"
:term:`handler`.
This is responsible for managing
:class:`~wuttjamaican.db.model.base.Person` records, and related
things.
"""
def get_person(self, obj, **kwargs):
"""
Return the :class:`~wuttjamaican.db.model.base.Person`
associated with the given object, if one can be found.
This method should accept "any" type of ``obj`` and inspect it
to determine if/how a person can be found. It should return
the "first, most obvious" person in the event that the object
is associated with multiple people.
This is a rather fundamental method, in that it is called by
several other methods, both within this handler as well as
others. There is also a shortcut to it, accessible via
:meth:`wuttjamaican.app.AppHandler.get_person()`.
"""
model = self.app.model
if isinstance(obj, model.Person):
person = obj
return person
elif isinstance(obj, model.User):
user = obj
if user.person:
return user.person

View file

@ -5,6 +5,7 @@ from unittest import TestCase
try:
import sqlalchemy as sa
from wuttjamaican.db.model import auth as model
from wuttjamaican.db.model.base import Person
except ImportError:
pass
else:
@ -29,8 +30,16 @@ else:
class TestUser(TestCase):
def test_basic(self):
def test_str(self):
user = model.User()
self.assertEqual(str(user), "")
user.username = 'barney'
self.assertEqual(str(user), "barney")
def test_str_with_person(self):
user = model.User()
self.assertEqual(str(user), "")
person = Person(full_name="Barney Rubble")
user.person = person
self.assertEqual(str(user), "Barney Rubble")

View file

@ -5,6 +5,7 @@ from unittest import TestCase
try:
import sqlalchemy as sa
from wuttjamaican.db.model import base as model
from wuttjamaican.db.model.auth import User
except ImportError:
pass
else:
@ -20,7 +21,7 @@ else:
class TestUUIDFKColumn(TestCase):
def test_basic(self):
column = model.uuid_column()
column = model.uuid_fk_column('foo.bar')
self.assertIsInstance(column, sa.Column)
self.assertIsInstance(column.type, sa.String)
self.assertEqual(column.type.length, 32)
@ -32,3 +33,19 @@ else:
self.assertEqual(str(setting), "")
setting.name = 'foo'
self.assertEqual(str(setting), "foo")
class TestPerson(TestCase):
def test_basic(self):
person = model.Person()
self.assertEqual(str(person), "")
person.full_name = "Barney Rubble"
self.assertEqual(str(person), "Barney Rubble")
def test_users(self):
person = model.Person()
self.assertIsNone(person.user)
user = User()
person.users.append(user)
self.assertIs(person.user, user)

View file

@ -101,17 +101,17 @@ class TestAppHandler(TestCase):
from wuttjamaican.db import model
except ImportError:
pytest.skip("test not relevant without sqlalchemy")
else:
self.assertNotIn('model', self.app.__dict__)
self.assertIs(self.app.model, model)
self.assertNotIn('model', self.app.__dict__)
self.assertIs(self.app.model, model)
def test_get_model(self):
try:
from wuttjamaican.db import model
except ImportError:
pytest.skip("test not relevant without sqlalchemy")
else:
self.assertIs(self.app.get_model(), model)
self.assertIs(self.app.get_model(), model)
def test_get_title(self):
self.assertEqual(self.app.get_title(), 'WuttJamaican')
@ -120,6 +120,44 @@ class TestAppHandler(TestCase):
uuid = self.app.make_uuid()
self.assertEqual(len(uuid), 32)
def test_get_session(self):
try:
import sqlalchemy as sa
from sqlalchemy import orm
except ImportError:
pytest.skip("test not relevant without sqlalchemy")
model = self.app.model
user = model.User()
self.assertIsNone(self.app.get_session(user))
Session = orm.sessionmaker()
engine = sa.create_engine('sqlite://')
mysession = Session(bind=engine)
mysession.add(user)
session = self.app.get_session(user)
self.assertIs(session, mysession)
def test_get_person(self):
people = self.app.get_people_handler()
with patch.object(people, 'get_person') as get_person:
get_person.return_value = 'foo'
person = self.app.get_person('bar')
get_person.assert_called_once_with('bar')
self.assertEqual(person, 'foo')
def test_get_auth_handler(self):
from wuttjamaican.auth import AuthHandler
auth = self.app.get_auth_handler()
self.assertIsInstance(auth, AuthHandler)
def test_get_people_handler(self):
from wuttjamaican.people import PeopleHandler
people = self.app.get_people_handler()
self.assertIsInstance(people, PeopleHandler)
class TestAppProvider(TestCase):

View file

@ -5,13 +5,353 @@ from unittest import TestCase
from wuttjamaican import auth as mod
from wuttjamaican.conf import WuttaConfig
try:
import sqlalchemy as sa
except ImportError:
pass
else:
class TestAuthHandler(TestCase):
def setUp(self):
self.config = WuttaConfig()
self.app = self.config.get_app()
class TestAuthHandler(TestCase):
def test_basic(self):
handler = mod.AuthHandler(self.config)
self.assertIs(handler.app, self.app)
def setUp(self):
self.config = WuttaConfig()
self.app = self.config.get_app()
self.handler = mod.AuthHandler(self.config)
self.engine = sa.create_engine('sqlite://')
self.app.model.Base.metadata.create_all(bind=self.engine)
self.session = self.make_session()
def tearDown(self):
self.session.close()
self.app.model.Base.metadata.drop_all(bind=self.engine)
def make_session(self):
return self.app.make_session(bind=self.engine)
def test_authenticate_user(self):
model = self.app.model
barney = model.User(username='barney')
self.handler.set_user_password(barney, 'goodpass')
self.session.add(barney)
self.session.commit()
# login ok
user = self.handler.authenticate_user(self.session, 'barney', 'goodpass')
self.assertIs(user, barney)
# can also pass user instead of username
user = self.handler.authenticate_user(self.session, barney, 'goodpass')
self.assertIs(user, barney)
# bad password
user = self.handler.authenticate_user(self.session, 'barney', 'BADPASS')
self.assertIsNone(user)
# bad username
user = self.handler.authenticate_user(self.session, 'NOBODY', 'goodpass')
self.assertIsNone(user)
# inactive user
user = self.handler.authenticate_user(self.session, 'barney', 'goodpass')
self.assertIs(user, barney)
barney.active = False
user = self.handler.authenticate_user(self.session, 'barney', 'goodpass')
self.assertIsNone(user)
def test_get_role(self):
model = self.app.model
myrole = model.Role(name="My Role")
self.session.add(myrole)
self.session.commit()
# empty key is ignored
role = self.handler.get_role(self.session, None)
self.assertIsNone(role)
# key may be uuid
role = self.handler.get_role(self.session, myrole.uuid)
self.assertIs(role, myrole)
# key may be name
role = self.handler.get_role(self.session, myrole.name)
self.assertIs(role, myrole)
# key may be represented within a setting
self.config.usedb = True
role = self.handler.get_role(self.session, 'mykey')
self.assertIsNone(role)
setting = model.Setting(name='wutta.role.mykey', value=myrole.uuid)
self.session.add(setting)
self.session.commit()
role = self.handler.get_role(self.session, 'mykey')
self.assertIs(role, myrole)
def test_get_user(self):
model = self.app.model
myuser = model.User(username='myuser')
self.session.add(myuser)
self.session.commit()
# empty obj is ignored
user = self.handler.get_user(None)
self.assertIsNone(user)
# user is returned as-is
user = self.handler.get_user(myuser)
self.assertIs(user, myuser)
# find user from person
myperson = model.Person(full_name='My Name')
self.session.add(myperson)
user.person = myperson
self.session.commit()
user = self.handler.get_user(myperson)
self.assertIs(user, myuser)
def test_make_user(self):
model = self.app.model
# empty user
user = self.handler.make_user()
self.assertIsInstance(user, model.User)
self.assertIsNone(user.username)
# user is added to session
user = self.handler.make_user(session=self.session)
self.assertIn(user, self.session)
self.session.rollback()
self.assertNotIn(user, self.session)
# default username
# nb. this behavior requires a session
user = self.handler.make_user(session=self.session)
self.assertEqual(user.username, 'newuser')
def test_delete_user(self):
model = self.app.model
# basics
myuser = model.User(username='myuser')
self.session.add(myuser)
self.session.commit()
user = self.session.query(model.User).one()
self.assertIs(user, myuser)
self.handler.delete_user(user)
self.session.commit()
self.assertEqual(self.session.query(model.User).count(), 0)
def test_make_preferred_username(self):
model = self.app.model
# default
name = self.handler.make_preferred_username(self.session)
self.assertEqual(name, 'newuser')
# person/first+last
person = model.Person(first_name='Barney', last_name='Rubble')
name = self.handler.make_preferred_username(self.session, person=person)
self.assertEqual(name, 'barney.rubble')
# person/first
person = model.Person(first_name='Barney')
name = self.handler.make_preferred_username(self.session, person=person)
self.assertEqual(name, 'barney')
# person/last
person = model.Person(last_name='Rubble')
name = self.handler.make_preferred_username(self.session, person=person)
self.assertEqual(name, 'rubble')
def test_make_unique_username(self):
model = self.app.model
# default
name = self.handler.make_unique_username(self.session)
self.assertEqual(name, 'newuser')
user = model.User(username=name)
self.session.add(user)
self.session.commit()
# counter invoked if name exists
name = self.handler.make_unique_username(self.session)
self.assertEqual(name, 'newuser01')
user = model.User(username=name)
self.session.add(user)
self.session.commit()
# starts by getting preferred name
person = model.Person(first_name='Barney', last_name='Rubble')
name = self.handler.make_unique_username(self.session, person=person)
self.assertEqual(name, 'barney.rubble')
user = model.User(username=name)
self.session.add(user)
self.session.commit()
# counter invoked if name exists
name = self.handler.make_unique_username(self.session, person=person)
self.assertEqual(name, 'barney.rubble01')
def test_set_user_password(self):
model = self.app.model
myuser = model.User(username='myuser')
self.session.add(myuser)
# basics
self.assertIsNone(myuser.password)
self.handler.set_user_password(myuser, 'goodpass')
self.session.commit()
self.assertIsNotNone(myuser.password)
# nb. password is hashed
self.assertNotEqual(myuser.password, 'goodpass')
# confirm login works with new password
user = self.handler.authenticate_user(self.session, 'myuser', 'goodpass')
self.assertIs(user, myuser)
def test_get_role_administrator(self):
model = self.app.model
self.assertEqual(self.session.query(model.Role).count(), 0)
role = self.handler.get_role_administrator(self.session)
self.assertEqual(self.session.query(model.Role).count(), 1)
self.assertEqual(role.name, "Administrator")
def test_get_role_anonymous(self):
model = self.app.model
self.assertEqual(self.session.query(model.Role).count(), 0)
role = self.handler.get_role_anonymous(self.session)
self.assertEqual(self.session.query(model.Role).count(), 1)
self.assertEqual(role.name, "Anonymous")
def test_get_role_authenticated(self):
model = self.app.model
self.assertEqual(self.session.query(model.Role).count(), 0)
role = self.handler.get_role_authenticated(self.session)
self.assertEqual(self.session.query(model.Role).count(), 1)
self.assertEqual(role.name, "Authenticated")
def test_get_permissions(self):
model = self.app.model
# empty default for role
role = model.Role()
perms = self.handler.get_permissions(self.session, role)
self.assertIsInstance(perms, set)
self.assertEqual(len(perms), 0)
# empty default for user
user = model.User()
perms = self.handler.get_permissions(self.session, user)
self.assertIsInstance(perms, set)
self.assertEqual(len(perms), 0)
# role perms
myrole = model.Role(name='My Role')
self.session.add(myrole)
self.handler.grant_permission(myrole, 'foo')
self.session.commit()
perms = self.handler.get_permissions(self.session, myrole)
self.assertEqual(perms, {'foo'})
# user perms
myuser = model.User(username='myuser')
self.session.add(myuser)
self.session.commit()
perms = self.handler.get_permissions(self.session, myuser)
self.assertEqual(len(perms), 0)
myuser.roles.append(myrole)
self.session.commit()
perms = self.handler.get_permissions(self.session, myuser)
self.assertEqual(perms, {'foo'})
# invalid principal
perms = self.handler.get_permissions(self.session, RuntimeError)
self.assertEqual(perms, set())
# missing principal
perms = self.handler.get_permissions(self.session, None)
self.assertEqual(perms, set())
def test_has_permission(self):
model = self.app.model
# false default for role
role = model.Role()
self.assertFalse(self.handler.has_permission(self.session, role, 'foo'))
# empty default for user
user = model.User()
self.assertFalse(self.handler.has_permission(self.session, user, 'foo'))
# role perms
myrole = model.Role(name='My Role')
self.session.add(myrole)
self.session.commit()
self.assertFalse(self.handler.has_permission(self.session, myrole, 'foo'))
self.handler.grant_permission(myrole, 'foo')
self.session.commit()
self.assertTrue(self.handler.has_permission(self.session, myrole, 'foo'))
# user perms
myuser = model.User(username='myuser')
self.session.add(myuser)
self.session.commit()
self.assertFalse(self.handler.has_permission(self.session, myuser, 'foo'))
myuser.roles.append(myrole)
self.session.commit()
self.assertTrue(self.handler.has_permission(self.session, myuser, 'foo'))
# invalid principal
self.assertFalse(self.handler.has_permission(self.session, RuntimeError, 'foo'))
# missing principal
self.assertFalse(self.handler.has_permission(self.session, None, 'foo'))
def test_grant_permission(self):
model = self.app.model
myrole = model.Role(name='My Role')
self.session.add(myrole)
self.session.commit()
# no perms yet
self.assertEqual(self.session.query(model.Permission).count(), 0)
# grant one perm, and confirm
self.handler.grant_permission(myrole, 'foo')
self.session.commit()
self.assertEqual(self.session.query(model.Permission).count(), 1)
perm = self.session.query(model.Permission).one()
self.assertIs(perm.role, myrole)
self.assertEqual(perm.permission, 'foo')
# grant same perm again, confirm just one exists
self.handler.grant_permission(myrole, 'foo')
self.session.commit()
self.assertEqual(self.session.query(model.Permission).count(), 1)
perm = self.session.query(model.Permission).one()
self.assertIs(perm.role, myrole)
self.assertEqual(perm.permission, 'foo')
def test_revoke_permission(self):
model = self.app.model
myrole = model.Role(name='My Role')
self.session.add(myrole)
self.handler.grant_permission(myrole, 'foo')
self.session.commit()
# just the one perm
self.assertEqual(self.session.query(model.Permission).count(), 1)
# revoke it, then confirm
self.handler.revoke_permission(myrole, 'foo')
self.session.commit()
self.assertEqual(self.session.query(model.Permission).count(), 0)
# revoke again, confirm
self.handler.revoke_permission(myrole, 'foo')
self.session.commit()
self.assertEqual(self.session.query(model.Permission).count(), 0)

52
tests/test_people.py Normal file
View file

@ -0,0 +1,52 @@
# -*- coding: utf-8; -*-
from unittest import TestCase
from wuttjamaican import people as mod
from wuttjamaican.conf import WuttaConfig
try:
import sqlalchemy as sa
except ImportError:
pass
else:
class TestPeopleHandler(TestCase):
def setUp(self):
self.config = WuttaConfig()
self.app = self.config.get_app()
self.handler = mod.PeopleHandler(self.config)
self.engine = sa.create_engine('sqlite://')
self.app.model.Base.metadata.create_all(bind=self.engine)
self.session = self.make_session()
def tearDown(self):
self.session.close()
self.app.model.Base.metadata.drop_all(bind=self.engine)
def make_session(self):
return self.app.make_session(bind=self.engine)
def test_get_person(self):
model = self.app.model
myperson = model.Person(full_name='Barny Rubble')
self.session.add(myperson)
self.session.commit()
# empty obj is ignored
person = self.handler.get_person(None)
self.assertIsNone(person)
# person is returned as-is
person = self.handler.get_person(myperson)
self.assertIs(person, myperson)
# find person from user
myuser = model.User(username='barney', person=myperson)
self.session.add(myuser)
self.session.commit()
person = self.handler.get_person(myuser)
self.assertIs(person, myperson)