feat: add support for association proxy fields in model forms
This commit is contained in:
parent
e1122caa9c
commit
48f96220b6
4 changed files with 338 additions and 4 deletions
|
|
@ -30,6 +30,7 @@ exclude_patterns = ["_build", "Thumbs.db", ".DS_Store"]
|
|||
intersphinx_mapping = {
|
||||
"alembic": ("https://alembic.sqlalchemy.org/en/latest/", None),
|
||||
"colander": ("https://docs.pylonsproject.org/projects/colander/en/latest/", None),
|
||||
"colanderalchemy": ("https://colanderalchemy.readthedocs.io/en/latest/", None),
|
||||
"deform": ("https://docs.pylonsproject.org/projects/deform/en/latest/", None),
|
||||
"fanstatic": ("https://www.fanstatic.org/en/latest/", None),
|
||||
"pyramid": ("https://docs.pylonsproject.org/projects/pyramid/en/latest/", None),
|
||||
|
|
|
|||
|
|
@ -45,7 +45,7 @@ dependencies = [
|
|||
"SQLAlchemy-Utils",
|
||||
"waitress",
|
||||
"WebHelpers2",
|
||||
"WuttJamaican[db]>=0.28.1",
|
||||
"WuttJamaican[db]>=0.28.7",
|
||||
"zope.sqlalchemy>=1.5",
|
||||
|
||||
# nb. this must be pinned for now, until pyramid can remove
|
||||
|
|
|
|||
|
|
@ -827,7 +827,7 @@ class Form: # pylint: disable=too-many-instance-attributes,too-many-public-meth
|
|||
includes.append(key)
|
||||
|
||||
# make initial schema with ColanderAlchemy magic
|
||||
schema = SQLAlchemySchemaNode(self.model_class, includes=includes)
|
||||
schema = WuttaSchemaNode(self.model_class, includes=includes)
|
||||
|
||||
# fill in the blanks if anything got missed
|
||||
for key in fields:
|
||||
|
|
@ -1144,7 +1144,7 @@ class Form: # pylint: disable=too-many-instance-attributes,too-many-public-meth
|
|||
# render static text if field not in deform/schema
|
||||
# TODO: need to abstract this somehow
|
||||
if self.model_instance:
|
||||
value = self.model_instance[fieldname]
|
||||
value = self.app.get_value(self.model_instance, fieldname)
|
||||
html = str(value) if value is not None else ""
|
||||
else:
|
||||
html = ""
|
||||
|
|
@ -1407,3 +1407,160 @@ class Form: # pylint: disable=too-many-instance-attributes,too-many-public-meth
|
|||
if field.error:
|
||||
return field.error.messages()
|
||||
return []
|
||||
|
||||
|
||||
class WuttaSchemaNode(SQLAlchemySchemaNode): # pylint: disable=abstract-method
|
||||
"""
|
||||
Custom schema node type based on ColanderAlchemy, but adding
|
||||
support for association proxy fields.
|
||||
|
||||
This class is used under the hood but you will not normally
|
||||
interact with it directly.
|
||||
|
||||
It's a subclass of
|
||||
:class:`colanderalchemy:colanderalchemy.SQLAlchemySchemaNode`.
|
||||
"""
|
||||
|
||||
def add_nodes(self, includes, excludes, overrides):
|
||||
super().add_nodes(includes, excludes, overrides)
|
||||
|
||||
for name in includes or []:
|
||||
prop = self.inspector.attrs.get(name, name)
|
||||
if isinstance(prop, str):
|
||||
|
||||
# add node for association proxy field
|
||||
if column := get_association_proxy_column(self.inspector, name):
|
||||
name_overrides_copy = overrides.get(name, {}).copy()
|
||||
node = self.get_schema_from_column(column, name_overrides_copy)
|
||||
if node is not None:
|
||||
self.add(node)
|
||||
|
||||
def dictify(self, obj): # pylint: disable=empty-docstring
|
||||
""" """
|
||||
dct = super().dictify(obj)
|
||||
|
||||
# loop thru all fields to add the association proxies
|
||||
for node in self:
|
||||
name = node.name
|
||||
if name in dct:
|
||||
continue # value already set
|
||||
|
||||
# we only care about association proxies here
|
||||
if not get_association_proxy_column(self.inspector, name):
|
||||
continue
|
||||
|
||||
dct[name] = getattr(obj, name)
|
||||
|
||||
# special handling when value is None
|
||||
if dct[name] is None:
|
||||
|
||||
# nb. colander/deform know how to behave when using
|
||||
# their dedicated colander.null value, but ``None``
|
||||
# seems to cause issues for string fields, so swap
|
||||
# that out here if applicable
|
||||
if isinstance(node.typ, colander.String):
|
||||
dct[name] = colander.null
|
||||
|
||||
return dct
|
||||
|
||||
def objectify(self, dict_, context=None): # pylint: disable=empty-docstring
|
||||
""" """
|
||||
context = super().objectify(dict_, context=context)
|
||||
|
||||
for attr in dict_:
|
||||
if self.inspector.has_property(attr):
|
||||
continue # upstream logic handles these
|
||||
|
||||
# try to process association proxy field
|
||||
if get_association_proxy_column(self.inspector, attr):
|
||||
value = dict_[attr]
|
||||
if value is colander.null:
|
||||
# `colander.null` is never an appropriate
|
||||
# value to be placed on an SQLAlchemy object
|
||||
# so we translate it into `None`.
|
||||
value = None
|
||||
setattr(context, attr, value)
|
||||
|
||||
return context
|
||||
|
||||
|
||||
def get_association_proxy(mapper, field):
|
||||
"""
|
||||
Return the association proxy "descriptor" corresponding to the
|
||||
given field name, if it exists.
|
||||
|
||||
:param mapper: SQLAlchemy mapper for the main class.
|
||||
|
||||
:param field: Field name on the main class, which may (or may not)
|
||||
be proxied via association.
|
||||
|
||||
:returns:
|
||||
:class:`~sqlalchemy:sqlalchemy.ext.associationproxy.AssociationProxy`
|
||||
instance, or ``None``.
|
||||
|
||||
Using the ``User.first_name`` (which proxies to
|
||||
``User.person.first_name``) example, this would return the proxy
|
||||
descriptor for ``User.first_name``.
|
||||
"""
|
||||
try:
|
||||
desc = getattr(mapper.all_orm_descriptors, field)
|
||||
except AttributeError:
|
||||
pass
|
||||
else:
|
||||
if desc.extension_type.name == "ASSOCIATION_PROXY":
|
||||
return desc
|
||||
return None
|
||||
|
||||
|
||||
def get_association_proxy_target(mapper, field):
|
||||
"""
|
||||
Return the relationship property involved in the association proxy
|
||||
for the given field, if applicable.
|
||||
|
||||
:param mapper: SQLAlchemy mapper for the main class.
|
||||
|
||||
:param field: Field name on the main class, which may (or may not)
|
||||
be proxied via association.
|
||||
|
||||
:returns: :class:`~sqlalchemy:sqlalchemy.orm.RelationshipProperty`
|
||||
instance, or ``None``.
|
||||
|
||||
Using the ``User.first_name`` (which proxies to
|
||||
``User.person.first_name``) example, this would return the
|
||||
``User.person`` relationship property.
|
||||
"""
|
||||
if proxy := get_association_proxy(mapper, field):
|
||||
proxy_target = mapper.get_property(proxy.target_collection)
|
||||
if (
|
||||
isinstance(proxy_target, orm.RelationshipProperty)
|
||||
and not proxy_target.uselist
|
||||
):
|
||||
return proxy_target
|
||||
return None
|
||||
|
||||
|
||||
def get_association_proxy_column(mapper, field):
|
||||
"""
|
||||
Return the target column property involved in the association
|
||||
proxy for the given field, if applicable.
|
||||
|
||||
:param mapper: SQLAlchemy mapper for the main class.
|
||||
|
||||
:param field: Field name on the main class, which may (or may not)
|
||||
be proxied via association.
|
||||
|
||||
:returns: :class:`~sqlalchemy:sqlalchemy.orm.ColumnProperty`
|
||||
instance, or ``None``.
|
||||
|
||||
Using the ``User.first_name`` (which proxies to
|
||||
``User.person.first_name``) example, this would return the
|
||||
``Person.first_name`` column property.
|
||||
"""
|
||||
if proxy_target := get_association_proxy_target(mapper, field):
|
||||
if proxy_target.mapper.has_property(field):
|
||||
prop = proxy_target.mapper.get_property(field)
|
||||
if isinstance(prop, orm.ColumnProperty) and isinstance(
|
||||
prop.columns[0], sa.Column
|
||||
):
|
||||
return prop
|
||||
return None
|
||||
|
|
|
|||
|
|
@ -4,11 +4,16 @@ import os
|
|||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy import orm
|
||||
from sqlalchemy.ext.associationproxy import AssociationProxy
|
||||
|
||||
import colander
|
||||
import deform
|
||||
import colander
|
||||
from colanderalchemy import SQLAlchemySchemaNode
|
||||
from pyramid.renderers import get_renderer
|
||||
|
||||
from wuttjamaican.testing import DataTestCase
|
||||
|
||||
from wuttaweb.forms import base as mod, widgets
|
||||
from wuttaweb.grids import Grid
|
||||
from wuttaweb.testing import WebTestCase
|
||||
|
|
@ -734,3 +739,174 @@ class TestForm(WebTestCase):
|
|||
self.assertNotIn("foo", dform)
|
||||
self.assertIn("bar", schema)
|
||||
self.assertIn("bar", dform)
|
||||
|
||||
|
||||
class TestWuttaSchemaNode(WebTestCase):
|
||||
|
||||
def test_add_nodes(self):
|
||||
model = self.app.model
|
||||
|
||||
# ColanderAlchemy does not add nodes for association proxy fields
|
||||
schema = SQLAlchemySchemaNode(
|
||||
model.User, includes=["username", "first_name", "last_name"]
|
||||
)
|
||||
self.assertIn("username", schema)
|
||||
self.assertNotIn("first_name", schema)
|
||||
self.assertNotIn("last_name", schema)
|
||||
|
||||
# but our custom class handles them okay
|
||||
schema = mod.WuttaSchemaNode(
|
||||
model.User, includes=["username", "first_name", "last_name"]
|
||||
)
|
||||
self.assertIn("username", schema)
|
||||
self.assertIn("first_name", schema)
|
||||
self.assertIn("last_name", schema)
|
||||
|
||||
def test_dictify(self):
|
||||
model = self.app.model
|
||||
|
||||
person = model.Person(first_name="Fred", last_name="Flintstone")
|
||||
user = model.User(username="freddie", person=person)
|
||||
user.foo_field = "bar"
|
||||
|
||||
foo_field = colander.SchemaNode(colander.String())
|
||||
|
||||
# ColanderAlchemy does not include association proxy fields
|
||||
schema = SQLAlchemySchemaNode(
|
||||
model.User, includes=["username", "first_name", "last_name", foo_field]
|
||||
)
|
||||
dct = schema.dictify(user)
|
||||
self.assertIn("username", dct)
|
||||
self.assertEqual(dct["username"], "freddie")
|
||||
self.assertNotIn("first_name", dct)
|
||||
self.assertNotIn("last_name", dct)
|
||||
self.assertNotIn("foo_field", dct)
|
||||
|
||||
# but our custom class handles them okay
|
||||
schema = mod.WuttaSchemaNode(
|
||||
model.User, includes=["username", "first_name", "last_name", foo_field]
|
||||
)
|
||||
dct = schema.dictify(user)
|
||||
self.assertIn("username", dct)
|
||||
self.assertEqual(dct["username"], "freddie")
|
||||
self.assertIn("first_name", dct)
|
||||
self.assertEqual(dct["first_name"], "Fred")
|
||||
self.assertIn("last_name", dct)
|
||||
self.assertEqual(dct["last_name"], "Flintstone")
|
||||
# nb. foo_field is not assn proxy, so not auto-dictified
|
||||
self.assertNotIn("foo_field", dct)
|
||||
|
||||
# also note special handling for null values on string proxy fields
|
||||
user.last_name = None
|
||||
dct = schema.dictify(user)
|
||||
self.assertIn("first_name", dct)
|
||||
self.assertIs(dct["first_name"], "Fred")
|
||||
self.assertIn("last_name", dct)
|
||||
self.assertIsNotNone(dct["last_name"])
|
||||
self.assertIs(dct["last_name"], colander.null)
|
||||
|
||||
def test_objectify(self):
|
||||
model = self.app.model
|
||||
|
||||
# ColanderAlchemy does not set association proxy fields
|
||||
schema = SQLAlchemySchemaNode(
|
||||
model.User,
|
||||
includes=["username", "first_name", "last_name"],
|
||||
)
|
||||
user = schema.objectify(
|
||||
{
|
||||
"username": "freddie",
|
||||
"first_name": "Fred",
|
||||
"last_name": "Flintstone",
|
||||
}
|
||||
)
|
||||
self.assertIsInstance(user, model.User)
|
||||
self.assertEqual(user.username, "freddie")
|
||||
self.assertIsNone(user.person)
|
||||
self.assertIsNone(user.first_name)
|
||||
self.assertIsNone(user.last_name)
|
||||
|
||||
# but our custom class handles them okay
|
||||
schema = mod.WuttaSchemaNode(
|
||||
model.User,
|
||||
includes=["username", "first_name", "last_name"],
|
||||
)
|
||||
user = schema.objectify(
|
||||
{
|
||||
"username": "freddie",
|
||||
"first_name": "Fred",
|
||||
"last_name": "Flintstone",
|
||||
}
|
||||
)
|
||||
self.assertIsInstance(user, model.User)
|
||||
self.assertEqual(user.username, "freddie")
|
||||
self.assertIsInstance(user.person, model.Person)
|
||||
self.assertEqual(user.person.first_name, "Fred")
|
||||
self.assertEqual(user.person.last_name, "Flintstone")
|
||||
self.assertEqual(user.first_name, "Fred")
|
||||
self.assertEqual(user.last_name, "Flintstone")
|
||||
|
||||
# also note special handling for null values on string proxy fields
|
||||
user = schema.objectify(
|
||||
{
|
||||
"username": "freddie",
|
||||
"first_name": "Fred",
|
||||
"last_name": colander.null,
|
||||
}
|
||||
)
|
||||
self.assertIsInstance(user, model.User)
|
||||
self.assertEqual(user.username, "freddie")
|
||||
self.assertIsInstance(user.person, model.Person)
|
||||
self.assertEqual(user.person.first_name, "Fred")
|
||||
self.assertIsNone(user.person.last_name)
|
||||
self.assertEqual(user.first_name, "Fred")
|
||||
self.assertIsNone(user.last_name)
|
||||
|
||||
|
||||
class TestGetAssociationProxy(DataTestCase):
|
||||
|
||||
def test_basic(self):
|
||||
model = self.app.model
|
||||
mapper = sa.inspect(model.User)
|
||||
|
||||
# typical
|
||||
proxy = mod.get_association_proxy(mapper, "first_name")
|
||||
self.assertIsInstance(proxy, AssociationProxy)
|
||||
self.assertEqual(proxy.target_collection, "person")
|
||||
self.assertEqual(proxy.value_attr, "first_name")
|
||||
|
||||
# no such proxy
|
||||
proxy = mod.get_association_proxy(mapper, "blarg")
|
||||
self.assertIsNone(proxy)
|
||||
|
||||
|
||||
class TestGetAssociationProxyTarget(DataTestCase):
|
||||
|
||||
def test_basic(self):
|
||||
model = self.app.model
|
||||
mapper = sa.inspect(model.User)
|
||||
|
||||
# typical
|
||||
target = mod.get_association_proxy_target(mapper, "first_name")
|
||||
self.assertIsInstance(target, orm.RelationshipProperty)
|
||||
self.assertIs(target, mapper.get_property("person"))
|
||||
|
||||
# no such proxy
|
||||
target = mod.get_association_proxy_target(mapper, "blarg")
|
||||
self.assertIsNone(target)
|
||||
|
||||
|
||||
class TestGetAssociationProxyColumn(DataTestCase):
|
||||
|
||||
def test_basic(self):
|
||||
model = self.app.model
|
||||
mapper = sa.inspect(model.User)
|
||||
|
||||
# typical
|
||||
column = mod.get_association_proxy_column(mapper, "first_name")
|
||||
self.assertIsInstance(column, orm.ColumnProperty)
|
||||
self.assertIs(column, sa.inspect(model.Person).get_property("first_name"))
|
||||
|
||||
# no such proxy
|
||||
column = mod.get_association_proxy_column(mapper, "blarg")
|
||||
self.assertIsNone(column)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue