From 48f96220b69f7e49c031c663ef80e8d35ad123a9 Mon Sep 17 00:00:00 2001 From: Lance Edgar Date: Tue, 17 Feb 2026 14:06:38 -0600 Subject: [PATCH] feat: add support for association proxy fields in model forms --- docs/conf.py | 1 + pyproject.toml | 2 +- src/wuttaweb/forms/base.py | 161 ++++++++++++++++++++++++++++++++- tests/forms/test_base.py | 178 ++++++++++++++++++++++++++++++++++++- 4 files changed, 338 insertions(+), 4 deletions(-) diff --git a/docs/conf.py b/docs/conf.py index 7465596..10dac1c 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -30,6 +30,7 @@ exclude_patterns = ["_build", "Thumbs.db", ".DS_Store"] intersphinx_mapping = { "alembic": ("https://alembic.sqlalchemy.org/en/latest/", None), "colander": ("https://docs.pylonsproject.org/projects/colander/en/latest/", None), + "colanderalchemy": ("https://colanderalchemy.readthedocs.io/en/latest/", None), "deform": ("https://docs.pylonsproject.org/projects/deform/en/latest/", None), "fanstatic": ("https://www.fanstatic.org/en/latest/", None), "pyramid": ("https://docs.pylonsproject.org/projects/pyramid/en/latest/", None), diff --git a/pyproject.toml b/pyproject.toml index bccf7ea..07aca80 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -45,7 +45,7 @@ dependencies = [ "SQLAlchemy-Utils", "waitress", "WebHelpers2", - "WuttJamaican[db]>=0.28.1", + "WuttJamaican[db]>=0.28.7", "zope.sqlalchemy>=1.5", # nb. this must be pinned for now, until pyramid can remove diff --git a/src/wuttaweb/forms/base.py b/src/wuttaweb/forms/base.py index 0d2a42d..b1e142a 100644 --- a/src/wuttaweb/forms/base.py +++ b/src/wuttaweb/forms/base.py @@ -827,7 +827,7 @@ class Form: # pylint: disable=too-many-instance-attributes,too-many-public-meth includes.append(key) # make initial schema with ColanderAlchemy magic - schema = SQLAlchemySchemaNode(self.model_class, includes=includes) + schema = WuttaSchemaNode(self.model_class, includes=includes) # fill in the blanks if anything got missed for key in fields: @@ -1144,7 +1144,7 @@ class Form: # pylint: disable=too-many-instance-attributes,too-many-public-meth # render static text if field not in deform/schema # TODO: need to abstract this somehow if self.model_instance: - value = self.model_instance[fieldname] + value = self.app.get_value(self.model_instance, fieldname) html = str(value) if value is not None else "" else: html = "" @@ -1407,3 +1407,160 @@ class Form: # pylint: disable=too-many-instance-attributes,too-many-public-meth if field.error: return field.error.messages() return [] + + +class WuttaSchemaNode(SQLAlchemySchemaNode): # pylint: disable=abstract-method + """ + Custom schema node type based on ColanderAlchemy, but adding + support for association proxy fields. + + This class is used under the hood but you will not normally + interact with it directly. + + It's a subclass of + :class:`colanderalchemy:colanderalchemy.SQLAlchemySchemaNode`. + """ + + def add_nodes(self, includes, excludes, overrides): + super().add_nodes(includes, excludes, overrides) + + for name in includes or []: + prop = self.inspector.attrs.get(name, name) + if isinstance(prop, str): + + # add node for association proxy field + if column := get_association_proxy_column(self.inspector, name): + name_overrides_copy = overrides.get(name, {}).copy() + node = self.get_schema_from_column(column, name_overrides_copy) + if node is not None: + self.add(node) + + def dictify(self, obj): # pylint: disable=empty-docstring + """ """ + dct = super().dictify(obj) + + # loop thru all fields to add the association proxies + for node in self: + name = node.name + if name in dct: + continue # value already set + + # we only care about association proxies here + if not get_association_proxy_column(self.inspector, name): + continue + + dct[name] = getattr(obj, name) + + # special handling when value is None + if dct[name] is None: + + # nb. colander/deform know how to behave when using + # their dedicated colander.null value, but ``None`` + # seems to cause issues for string fields, so swap + # that out here if applicable + if isinstance(node.typ, colander.String): + dct[name] = colander.null + + return dct + + def objectify(self, dict_, context=None): # pylint: disable=empty-docstring + """ """ + context = super().objectify(dict_, context=context) + + for attr in dict_: + if self.inspector.has_property(attr): + continue # upstream logic handles these + + # try to process association proxy field + if get_association_proxy_column(self.inspector, attr): + value = dict_[attr] + if value is colander.null: + # `colander.null` is never an appropriate + # value to be placed on an SQLAlchemy object + # so we translate it into `None`. + value = None + setattr(context, attr, value) + + return context + + +def get_association_proxy(mapper, field): + """ + Return the association proxy "descriptor" corresponding to the + given field name, if it exists. + + :param mapper: SQLAlchemy mapper for the main class. + + :param field: Field name on the main class, which may (or may not) + be proxied via association. + + :returns: + :class:`~sqlalchemy:sqlalchemy.ext.associationproxy.AssociationProxy` + instance, or ``None``. + + Using the ``User.first_name`` (which proxies to + ``User.person.first_name``) example, this would return the proxy + descriptor for ``User.first_name``. + """ + try: + desc = getattr(mapper.all_orm_descriptors, field) + except AttributeError: + pass + else: + if desc.extension_type.name == "ASSOCIATION_PROXY": + return desc + return None + + +def get_association_proxy_target(mapper, field): + """ + Return the relationship property involved in the association proxy + for the given field, if applicable. + + :param mapper: SQLAlchemy mapper for the main class. + + :param field: Field name on the main class, which may (or may not) + be proxied via association. + + :returns: :class:`~sqlalchemy:sqlalchemy.orm.RelationshipProperty` + instance, or ``None``. + + Using the ``User.first_name`` (which proxies to + ``User.person.first_name``) example, this would return the + ``User.person`` relationship property. + """ + if proxy := get_association_proxy(mapper, field): + proxy_target = mapper.get_property(proxy.target_collection) + if ( + isinstance(proxy_target, orm.RelationshipProperty) + and not proxy_target.uselist + ): + return proxy_target + return None + + +def get_association_proxy_column(mapper, field): + """ + Return the target column property involved in the association + proxy for the given field, if applicable. + + :param mapper: SQLAlchemy mapper for the main class. + + :param field: Field name on the main class, which may (or may not) + be proxied via association. + + :returns: :class:`~sqlalchemy:sqlalchemy.orm.ColumnProperty` + instance, or ``None``. + + Using the ``User.first_name`` (which proxies to + ``User.person.first_name``) example, this would return the + ``Person.first_name`` column property. + """ + if proxy_target := get_association_proxy_target(mapper, field): + if proxy_target.mapper.has_property(field): + prop = proxy_target.mapper.get_property(field) + if isinstance(prop, orm.ColumnProperty) and isinstance( + prop.columns[0], sa.Column + ): + return prop + return None diff --git a/tests/forms/test_base.py b/tests/forms/test_base.py index f4e998d..804b360 100644 --- a/tests/forms/test_base.py +++ b/tests/forms/test_base.py @@ -4,11 +4,16 @@ import os from unittest.mock import MagicMock, patch import sqlalchemy as sa +from sqlalchemy import orm +from sqlalchemy.ext.associationproxy import AssociationProxy -import colander import deform +import colander +from colanderalchemy import SQLAlchemySchemaNode from pyramid.renderers import get_renderer +from wuttjamaican.testing import DataTestCase + from wuttaweb.forms import base as mod, widgets from wuttaweb.grids import Grid from wuttaweb.testing import WebTestCase @@ -734,3 +739,174 @@ class TestForm(WebTestCase): self.assertNotIn("foo", dform) self.assertIn("bar", schema) self.assertIn("bar", dform) + + +class TestWuttaSchemaNode(WebTestCase): + + def test_add_nodes(self): + model = self.app.model + + # ColanderAlchemy does not add nodes for association proxy fields + schema = SQLAlchemySchemaNode( + model.User, includes=["username", "first_name", "last_name"] + ) + self.assertIn("username", schema) + self.assertNotIn("first_name", schema) + self.assertNotIn("last_name", schema) + + # but our custom class handles them okay + schema = mod.WuttaSchemaNode( + model.User, includes=["username", "first_name", "last_name"] + ) + self.assertIn("username", schema) + self.assertIn("first_name", schema) + self.assertIn("last_name", schema) + + def test_dictify(self): + model = self.app.model + + person = model.Person(first_name="Fred", last_name="Flintstone") + user = model.User(username="freddie", person=person) + user.foo_field = "bar" + + foo_field = colander.SchemaNode(colander.String()) + + # ColanderAlchemy does not include association proxy fields + schema = SQLAlchemySchemaNode( + model.User, includes=["username", "first_name", "last_name", foo_field] + ) + dct = schema.dictify(user) + self.assertIn("username", dct) + self.assertEqual(dct["username"], "freddie") + self.assertNotIn("first_name", dct) + self.assertNotIn("last_name", dct) + self.assertNotIn("foo_field", dct) + + # but our custom class handles them okay + schema = mod.WuttaSchemaNode( + model.User, includes=["username", "first_name", "last_name", foo_field] + ) + dct = schema.dictify(user) + self.assertIn("username", dct) + self.assertEqual(dct["username"], "freddie") + self.assertIn("first_name", dct) + self.assertEqual(dct["first_name"], "Fred") + self.assertIn("last_name", dct) + self.assertEqual(dct["last_name"], "Flintstone") + # nb. foo_field is not assn proxy, so not auto-dictified + self.assertNotIn("foo_field", dct) + + # also note special handling for null values on string proxy fields + user.last_name = None + dct = schema.dictify(user) + self.assertIn("first_name", dct) + self.assertIs(dct["first_name"], "Fred") + self.assertIn("last_name", dct) + self.assertIsNotNone(dct["last_name"]) + self.assertIs(dct["last_name"], colander.null) + + def test_objectify(self): + model = self.app.model + + # ColanderAlchemy does not set association proxy fields + schema = SQLAlchemySchemaNode( + model.User, + includes=["username", "first_name", "last_name"], + ) + user = schema.objectify( + { + "username": "freddie", + "first_name": "Fred", + "last_name": "Flintstone", + } + ) + self.assertIsInstance(user, model.User) + self.assertEqual(user.username, "freddie") + self.assertIsNone(user.person) + self.assertIsNone(user.first_name) + self.assertIsNone(user.last_name) + + # but our custom class handles them okay + schema = mod.WuttaSchemaNode( + model.User, + includes=["username", "first_name", "last_name"], + ) + user = schema.objectify( + { + "username": "freddie", + "first_name": "Fred", + "last_name": "Flintstone", + } + ) + self.assertIsInstance(user, model.User) + self.assertEqual(user.username, "freddie") + self.assertIsInstance(user.person, model.Person) + self.assertEqual(user.person.first_name, "Fred") + self.assertEqual(user.person.last_name, "Flintstone") + self.assertEqual(user.first_name, "Fred") + self.assertEqual(user.last_name, "Flintstone") + + # also note special handling for null values on string proxy fields + user = schema.objectify( + { + "username": "freddie", + "first_name": "Fred", + "last_name": colander.null, + } + ) + self.assertIsInstance(user, model.User) + self.assertEqual(user.username, "freddie") + self.assertIsInstance(user.person, model.Person) + self.assertEqual(user.person.first_name, "Fred") + self.assertIsNone(user.person.last_name) + self.assertEqual(user.first_name, "Fred") + self.assertIsNone(user.last_name) + + +class TestGetAssociationProxy(DataTestCase): + + def test_basic(self): + model = self.app.model + mapper = sa.inspect(model.User) + + # typical + proxy = mod.get_association_proxy(mapper, "first_name") + self.assertIsInstance(proxy, AssociationProxy) + self.assertEqual(proxy.target_collection, "person") + self.assertEqual(proxy.value_attr, "first_name") + + # no such proxy + proxy = mod.get_association_proxy(mapper, "blarg") + self.assertIsNone(proxy) + + +class TestGetAssociationProxyTarget(DataTestCase): + + def test_basic(self): + model = self.app.model + mapper = sa.inspect(model.User) + + # typical + target = mod.get_association_proxy_target(mapper, "first_name") + self.assertIsInstance(target, orm.RelationshipProperty) + self.assertIs(target, mapper.get_property("person")) + + # no such proxy + target = mod.get_association_proxy_target(mapper, "blarg") + self.assertIsNone(target) + + +class TestGetAssociationProxyColumn(DataTestCase): + + def test_basic(self): + model = self.app.model + mapper = sa.inspect(model.User) + + # typical + column = mod.get_association_proxy_column(mapper, "first_name") + self.assertIsInstance(column, orm.ColumnProperty) + self.assertIs(column, sa.inspect(model.Person).get_property("first_name")) + + # no such proxy + column = mod.get_association_proxy_column(mapper, "blarg") + self.assertIsNone(column)