3
0
Fork 0

feat: add user API tokens; handler methods to manage/authenticate

This commit is contained in:
Lance Edgar 2025-08-08 22:40:52 -05:00
parent 988749d80e
commit a721e63275
6 changed files with 242 additions and 4 deletions

View file

@ -2,7 +2,7 @@
################################################################################
#
# WuttJamaican -- Base package for Wutta Framework
# Copyright © 2023-2024 Lance Edgar
# Copyright © 2023-2025 Lance Edgar
#
# This file is part of Wutta Framework.
#
@ -26,8 +26,11 @@ Auth Handler
This defines the default :term:`auth handler`.
"""
import secrets
import uuid as _uuid
from sqlalchemy import orm
from wuttjamaican.app import GenericHandler
@ -73,6 +76,8 @@ class AuthHandler(GenericHandler):
default logic assumes a "username" but in practice it may be
an email address etc. - whatever the user entered.
See also :meth:`authenticate_user_token()`.
:param session: Open :term:`db session`.
:param username: Usually a string, but also may be a
@ -91,6 +96,33 @@ class AuthHandler(GenericHandler):
if self.check_user_password(user, password):
return user
def authenticate_user_token(self, session, token):
"""
Authenticate the given user API token string, and if valid,
return the corresponding user.
See also :meth:`authenticate_user()`.
:param session: Open :term:`db session`.
:param token: Raw token string for the user.
:returns: :class:`~wuttjamaican.db.model.auth.User` instance,
or ``None``.
"""
model = self.app.model
try:
token = session.query(model.UserAPIToken)\
.filter(model.UserAPIToken.token_string == token)\
.one()
except orm.exc.NoResultFound:
pass
else:
user = token.user
if user.active:
return user
def check_user_password(self, user, password, **kwargs):
"""
Check a user's password.
@ -545,6 +577,63 @@ class AuthHandler(GenericHandler):
if permission in role.permissions:
role.permissions.remove(permission)
##############################
# API token methods
##############################
def add_api_token(self, user, description):
"""
Add and return a new API token for the user.
This calls :meth:`generate_api_token_string()` to obtain the
actual token string.
See also :meth:`delete_api_token()`.
:param user: :class:`~wuttjamaican.db.model.auth.User`
instance for which to add the token.
:param description: String description for the token.
:rtype: :class:`~wuttjamaican.db.model.auth.UserAPIToken`
"""
model = self.app.model
session = self.app.get_session(user)
# generate raw token
token_string = self.generate_api_token_string()
# persist token in DB
token = model.UserAPIToken(
description=description,
token_string=token_string)
user.api_tokens.append(token)
session.add(token)
return token
def generate_api_token_string(self):
"""
Generate a new *raw* API token string.
This is called by :meth:`add_api_token()`.
:returns: Raw API token string.
"""
return secrets.token_urlsafe()
def delete_api_token(self, token):
"""
Delete the given API token.
See also :meth:`add_api_token()`.
:param token:
:class:`~wuttjamaican.db.model.auth.UserAPIToken` instance.
"""
session = self.app.get_session(token)
session.delete(token)
##############################
# internal methods
##############################

View file

@ -0,0 +1,39 @@
"""add user_api_token
Revision ID: efdcb2c75034
Revises: 6bf900765500
Create Date: 2025-08-08 08:58:19.376105
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
import wuttjamaican.db.util
# revision identifiers, used by Alembic.
revision: str = 'efdcb2c75034'
down_revision: Union[str, None] = '6bf900765500'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# user_api_token
op.create_table('user_api_token',
sa.Column('uuid', wuttjamaican.db.util.UUID(), nullable=False),
sa.Column('user_uuid', wuttjamaican.db.util.UUID(), nullable=False),
sa.Column('description', sa.String(length=255), nullable=False),
sa.Column('token_string', sa.String(length=255), nullable=False),
sa.Column('created', sa.DateTime(timezone=True), nullable=False),
sa.ForeignKeyConstraint(['user_uuid'], ['user.uuid'], name=op.f('fk_user_api_token_user_uuid_user')),
sa.PrimaryKeyConstraint('uuid', name=op.f('pk_user_api_token'))
)
def downgrade() -> None:
# user_api_token
op.drop_table('user_api_token')

View file

@ -2,7 +2,7 @@
################################################################################
#
# WuttJamaican -- Base package for Wutta Framework
# Copyright © 2023-2024 Lance Edgar
# Copyright © 2023-2025 Lance Edgar
#
# This file is part of Wutta Framework.
#
@ -40,6 +40,7 @@ And the :term:`data models <data model>`:
* :class:`~wuttjamaican.db.model.auth.Permission`
* :class:`~wuttjamaican.db.model.auth.User`
* :class:`~wuttjamaican.db.model.auth.UserRole`
* :class:`~wuttjamaican.db.model.auth.UserAPIToken`
* :class:`~wuttjamaican.db.model.upgrades.Upgrade`
And the :term:`batch` model base/mixin classes:
@ -51,6 +52,6 @@ And the :term:`batch` model base/mixin classes:
from wuttjamaican.db.util import uuid_column, uuid_fk_column, UUID
from .base import Base, Setting, Person
from .auth import Role, Permission, User, UserRole
from .auth import Role, Permission, User, UserRole, UserAPIToken
from .upgrades import Upgrade
from .batch import BatchMixin, BatchRowMixin

View file

@ -2,7 +2,7 @@
################################################################################
#
# WuttJamaican -- Base package for Wutta Framework
# Copyright © 2023-2024 Lance Edgar
# Copyright © 2023-2025 Lance Edgar
#
# This file is part of Wutta Framework.
#
@ -39,6 +39,8 @@ So a user's permissions are "inherited" from the role(s) to which they
belong.
"""
import datetime
import sqlalchemy as sa
from sqlalchemy import orm
from sqlalchemy.ext.associationproxy import association_proxy
@ -211,6 +213,16 @@ class User(Base):
# getset_factory=getset_factory,
)
api_tokens = orm.relationship(
'UserAPIToken',
back_populates='user',
order_by='UserAPIToken.created',
cascade='all, delete-orphan',
cascade_backrefs=False,
doc="""
List of :class:`UserAPIToken` instances belonging to the user.
""")
def __str__(self):
if self.person:
name = str(self.person)
@ -246,3 +258,36 @@ class UserRole(Base):
doc="""
Reference to the :class:`Role` involved.
""")
class UserAPIToken(Base):
"""
User authentication token for use with HTTP API
"""
__tablename__ = 'user_api_token'
uuid = uuid_column()
user_uuid = uuid_fk_column('user.uuid', nullable=False)
user = orm.relationship(
User,
back_populates='api_tokens',
cascade_backrefs=False,
doc="""
Reference to the :class:`User` whose token this is.
""")
description = sa.Column(sa.String(length=255), nullable=False, doc="""
Description of the token.
""")
token_string = sa.Column(sa.String(length=255), nullable=False, doc="""
Raw token string, to be used by API clients.
""")
created = sa.Column(sa.DateTime(timezone=True), nullable=False, default=datetime.datetime.now, doc="""
Date/time when the token was created.
""")
def __str__(self):
return self.description or ""

View file

@ -43,3 +43,12 @@ else:
person = Person(full_name="Barney Rubble")
user.person = person
self.assertEqual(str(user), "Barney Rubble")
class TestUserAPIToken(TestCase):
def test_str(self):
token = model.UserAPIToken()
self.assertEqual(str(token), "")
token.description = "test token"
self.assertEqual(str(token), "test token")

View file

@ -63,6 +63,32 @@ else:
user = self.handler.authenticate_user(self.session, 'barney', 'goodpass')
self.assertIsNone(user)
def test_authenticate_user_token(self):
model = self.app.model
barney = model.User(username='barney')
self.session.add(barney)
token = self.handler.add_api_token(barney, "test token")
self.session.commit()
user = self.handler.authenticate_user_token(self.session, None)
self.assertIsNone(user)
user = self.handler.authenticate_user_token(self.session, token.token_string)
self.assertIs(user, barney)
barney.active = False
self.session.flush()
user = self.handler.authenticate_user_token(self.session, token.token_string)
self.assertIsNone(user)
barney.active = True
self.session.flush()
user = self.handler.authenticate_user_token(self.session, token.token_string)
self.assertIs(user, barney)
user = self.handler.authenticate_user_token(self.session, 'bad-token')
self.assertIsNone(user)
def test_check_user_password(self):
model = self.app.model
barney = model.User(username='barney')
@ -416,3 +442,32 @@ else:
self.handler.revoke_permission(myrole, 'foo')
self.session.commit()
self.assertEqual(self.session.query(model.Permission).count(), 0)
def test_generate_api_token_string(self):
token = self.handler.generate_api_token_string()
# TODO: not sure how to propertly test this yet...
self.assertEqual(len(token), 43)
def test_add_api_token(self):
model = self.app.model
barney = model.User(username='barney')
self.session.add(barney)
token = self.handler.add_api_token(barney, "test token")
self.assertIs(token.user, barney)
self.assertEqual(token.description, "test token")
# TODO: not sure how to propertly test this yet...
self.assertEqual(len(token.token_string), 43)
def test_delete_api_token(self):
model = self.app.model
barney = model.User(username='barney')
self.session.add(barney)
token = self.handler.add_api_token(barney, "test token")
self.session.commit()
self.session.refresh(barney)
self.assertEqual(len(barney.api_tokens), 1)
self.handler.delete_api_token(token)
self.session.refresh(barney)
self.assertEqual(len(barney.api_tokens), 0)