From ee3a789682c2b53537f626a6afb907bdc6533ef5 Mon Sep 17 00:00:00 2001 From: Lance Edgar Date: Fri, 20 Mar 2026 17:20:02 -0500 Subject: [PATCH] feat: add basic support for merging 2 records, w/ preview including basic logic for merging Person or User records --- src/wuttaweb/diffs.py | 76 ++- src/wuttaweb/templates/master/index.mako | 16 +- src/wuttaweb/templates/master/merge.mako | 88 ++++ src/wuttaweb/views/master.py | 365 +++++++++++++- src/wuttaweb/views/people.py | 31 +- src/wuttaweb/views/users.py | 114 ++++- tests/test_diffs.py | 88 ++++ tests/views/test_master.py | 591 ++++++++++++++++++++--- tests/views/test_people.py | 40 ++ tests/views/test_users.py | 218 ++++++++- 10 files changed, 1554 insertions(+), 73 deletions(-) create mode 100644 src/wuttaweb/templates/master/merge.mako diff --git a/src/wuttaweb/diffs.py b/src/wuttaweb/diffs.py index 23eaa7e..f4b4baa 100644 --- a/src/wuttaweb/diffs.py +++ b/src/wuttaweb/diffs.py @@ -2,7 +2,7 @@ ################################################################################ # # wuttaweb -- Web App for Wutta Framework -# Copyright © 2024-2025 Lance Edgar +# Copyright © 2024-2026 Lance Edgar # # This file is part of Wutta Framework. # @@ -63,6 +63,80 @@ class WebDiff(Diff): return HTML.literal(html) +class MergeDiff(WebDiff): + """ + Special diff class for use when merging 2 records. While based on + :class:`WebDiff`, this class uses a different signature for the + constructor. + + It shows the "removing" record, the "keeping" record, and also the + "final" record showing the calculated result of the merge, with + special highlighting where values would change on the kept record. + + :param config: The app :term:`config object`. + + :param removing_data: Dict of data for the "removing" record. + + :param new_data: Dict of data for the "keeping" record. + + :param new_data: Dict of "final" data for the kept record. + + :param \\**kwargs: Remaining kwargs are passed as-is to the + :class:`WebDiff` constructor. + """ + + def __init__(self, config, removing_data, keeping_data, final_data, **kwargs): + super().__init__(config, removing_data, keeping_data, **kwargs) + self.removing_data = removing_data + self.keeping_data = keeping_data + self.final_data = final_data + self.columns = ["field name", "removing", "keeping", "final"] + + def render_field_row(self, field): + keep_diff = self.keeping_data.get(field) != self.removing_data.get(field) + final_diff = self.final_data.get(field) != self.keeping_data.get(field) + + # TODO: there is a fair bit of duplication here, compared to + # base class. should maybe clean that up someday.. + + kw = {} + if self.cell_padding: + kw["style"] = f"padding: {self.cell_padding}" + td_field = HTML.tag("td", class_="field", c=field, **kw) + + td_old_value = HTML.tag( + "td", + c=self.render_old_value(field), + **self.get_old_value_attrs(keep_diff), + ) + + td_new_value = HTML.tag( + "td", + c=self.render_new_value(field), + **self.get_new_value_attrs(keep_diff), + ) + + td_final_value = HTML.tag( + "td", + c=self.render_final_value(field), + **self.get_final_value_attrs(final_diff), + ) + + return HTML.tag("tr", c=[td_field, td_old_value, td_new_value, td_final_value]) + + def get_final_value_attrs( + self, is_diff + ): # pylint: disable=missing-function-docstring + attrs = {} + if is_diff: + attrs["class_"] = "has-background-warning" + return self.get_cell_attrs(**attrs) + + def render_final_value(self, field): # pylint: disable=missing-function-docstring + value = repr(self.final_data.get(field)) + return self.render_cell_value(value) + + class VersionDiff(WebDiff): """ Special diff class for use with version history views. While diff --git a/src/wuttaweb/templates/master/index.mako b/src/wuttaweb/templates/master/index.mako index fbc87a3..9b3c3c7 100644 --- a/src/wuttaweb/templates/master/index.mako +++ b/src/wuttaweb/templates/master/index.mako @@ -29,8 +29,9 @@ <%def name="modify_vue_vars()"> ${parent.modify_vue_vars()} - % if master.deletable_bulk and master.has_perm('delete_bulk'): - - % endif + % endif + + % if master.mergeable and master.has_perm("merge"): + + ${grid.vue_component}Data.mergeSubmitting = false + + % endif + + <%def name="make_vue_components()"> diff --git a/src/wuttaweb/templates/master/merge.mako b/src/wuttaweb/templates/master/merge.mako new file mode 100644 index 0000000..790387b --- /dev/null +++ b/src/wuttaweb/templates/master/merge.mako @@ -0,0 +1,88 @@ +## -*- coding: utf-8; -*- +<%inherit file="/page.mako" /> + +<%def name="title()">Merge 2 ${model_title_plural} + +<%def name="page_content()"> +
+ +

+ You are about to merge + two ${model_title} records, possibly updating and/or deleting + various other records. +

+ +

+ This tool can show you some basics but is not able to give you + the full picture of the implications of this merge. +

+ +

+ You are urged to proceed with + caution! Ideally try the merge on a test site first. +

+ +
+ + + ${h.link_to(str(removing) or "(no title)", master.get_action_url('view', removing))} + + + + ${h.link_to(str(keeping) or "(no title)", master.get_action_url('view', keeping))} + + +
+ + ${diff.render_html()} + + + +
+ +
+ +
+ ${h.form(request.current_route_url(), **{"@submit": "swapSubmitting = true"})} + ${h.csrf_token(request)} + ${h.hidden("uuids", value=f"{keeping.uuid},{removing.uuid}")} + + {{ swapSubmitting ? "Working, please wait..." : "Swap which record is kept/removed" }} + + ${h.end_form()} +
+ +
+ ${h.form(request.current_route_url(), **{"@submit": "mergeSubmitting = true"})} + ${h.csrf_token(request)} + ${h.hidden("uuids", value=f"{removing.uuid},{keeping.uuid}")} + ${h.hidden("execute-merge", value="true")} + + {{ mergeSubmitting ? "Working, please wait..." : "Yes, perform this merge" }} + + ${h.end_form()} +
+ +
+
+ + +<%def name="modify_vue_vars()"> + ${parent.modify_vue_vars()} + + diff --git a/src/wuttaweb/views/master.py b/src/wuttaweb/views/master.py index 949a0e1..170afb6 100644 --- a/src/wuttaweb/views/master.py +++ b/src/wuttaweb/views/master.py @@ -29,6 +29,7 @@ import logging import os import threading import warnings +from uuid import UUID import sqlalchemy as sa from sqlalchemy import orm @@ -41,7 +42,7 @@ from wuttaweb.views.base import View from wuttaweb.util import get_form_data, render_csrf_token from wuttaweb.db import Session from wuttaweb.progress import SessionProgress -from wuttaweb.diffs import VersionDiff +from wuttaweb.diffs import MergeDiff, VersionDiff log = logging.getLogger(__name__) @@ -356,6 +357,35 @@ class MasterView(View): # pylint: disable=too-many-public-methods This is optional; see also :meth:`get_version_grid_columns()`. + .. attribute:: mergeable + + Boolean indicating whether the view model supports "merging two + records" - i.e. it should have a :meth:`merge()` view. Default + value is ``False``. + + .. attribute:: merge_additive_fields + + Optional list of fields for which values are "additive" in + nature when merging two records. Only relevant if + :attr:`mergeable` is true. + + See also :meth:`merge_get_additive_fields()`. + + .. attribute:: merge_coalesce_fields + + Optional list of fields for which values should be "coalesced" + when merging two records. Only relevant if :attr:`mergeable` + is true. + + See also :meth:`merge_get_coalesce_fields()`. + + .. attribute:: merge_simple_fields + + Optional list of "simple" fields when merging two records. + Only relevant if :attr:`mergeable` is true. + + See also :meth:`merge_get_simple_fields()`. + **ROW FEATURES** .. attribute:: has_rows @@ -471,6 +501,12 @@ class MasterView(View): # pylint: disable=too-many-public-methods execute_progress_template = None configurable = False + # merging + mergeable = False + merge_additive_fields = None + merge_coalesce_fields = None + merge_simple_fields = None + # row features has_rows = False row_model_class = None @@ -2391,6 +2427,7 @@ class MasterView(View): # pylint: disable=too-many-public-methods "index_title": self.get_index_title(), "index_url": self.get_index_url(), "model_title": self.get_model_title(), + "model_title_plural": self.get_model_title_plural(), "config_title": self.get_config_title(), } @@ -2573,7 +2610,9 @@ class MasterView(View): # pylint: disable=too-many-public-methods labels.update(cls.labels) return labels - def make_model_grid(self, session=None, **kwargs): + def make_model_grid( + self, session=None, **kwargs + ): # pylint: disable=too-many-branches """ Create and return a :class:`~wuttaweb.grids.base.Grid` instance for use with the :meth:`index()` view. @@ -2585,6 +2624,8 @@ class MasterView(View): # pylint: disable=too-many-public-methods * :meth:`get_grid_data()` * :meth:`configure_grid()` """ + route_prefix = self.get_route_prefix() + if "key" not in kwargs: kwargs["key"] = self.get_grid_key() @@ -2632,15 +2673,41 @@ class MasterView(View): # pylint: disable=too-many-public-methods kwargs["actions"] = actions + mergeable = self.mergeable and self.has_perm("merge") + if "tools" not in kwargs: tools = [] + # delete-bulk if self.deletable_bulk and self.has_perm("delete_bulk"): tools.append(("delete-results", self.delete_bulk_make_button())) + # merge + if mergeable: + hidden = tags.hidden("uuids", **{":value": "checkedUUIDs"}) + button = self.make_button( + '{{ mergeSubmitting ? "Working, please wait..." : "Merge 2 records" }}', + primary=True, + native_type="submit", + icon_left="object-ungroup", + **{":disabled": "mergeSubmitting || checkedRows.length != 2"}, + ) + csrf = render_csrf_token(self.request) + html = ( + tags.form( + self.request.route_url(f"{route_prefix}.merge"), + **{"@submit": "mergeSubmitting = true"}, + ) + + csrf + + hidden + + button + + tags.end_form() + ) + tools.append(("merge", html)) + kwargs["tools"] = tools - kwargs.setdefault("checkable", self.checkable) + kwargs.setdefault("checkable", self.checkable or mergeable) if hasattr(self, "grid_row_class"): kwargs.setdefault("row_class", self.grid_row_class) kwargs.setdefault("filterable", self.filterable) @@ -3170,6 +3237,281 @@ class MasterView(View): # pylint: disable=too-many-public-methods if session: session.close() + ############################## + # merge methods + ############################## + + def merge(self): + """ + View for merging two records. + + By default, this view is included only if :attr:`mergeable` is + true. It usually maps to a URL like ``/widgets/merge``. + + A POST request must be used for this view; otherwise it will + redirect to the :meth:`index()` view. The POST data must + specify a ``uuids`` param string in + ``"removing_uuid,keeping_uuid"`` format. + + The user is first shown a "diff" with the + removing/keeping/final data records, as simple preview. They + can swap removing vs. keeping if needed, and when satisfied + they can "execute" the merge. + + See also related methods, used by this one: + + * :meth:`merge_validate_and_execute()` + * :meth:`merge_get_data()` + * :meth:`merge_get_final_data()` + """ + if self.request.method != "POST": + return self.redirect(self.get_index_url()) + + session = self.Session() + model_class = self.get_model_class() + + # load records to be kept/removed + removing = keeping = None + uuids = self.request.POST.get("uuids", "").split(",") + if len(uuids) == 2: + uuid1, uuid2 = uuids + try: + uuid1 = UUID(uuid1) + uuid2 = UUID(uuid2) + except ValueError: + pass + else: + removing = session.get(model_class, uuid1) + keeping = session.get(model_class, uuid2) + + # redirect to listing if record(s) not found + if not (removing and keeping): + raise self.redirect(self.get_index_url()) + + # maybe execute merge + if self.request.POST.get("execute-merge") == "true": + if self.merge_validate_and_execute(removing, keeping): + return self.redirect(self.get_action_url("view", keeping)) + + removing_data = self.merge_get_data(removing) + keeping_data = self.merge_get_data(keeping) + diff = MergeDiff( + self.config, + removing_data, + keeping_data, + self.merge_get_final_data(removing_data, keeping_data), + ) + + context = {"removing": removing, "keeping": keeping, "diff": diff} + return self.render_to_response("merge", context) + + def merge_get_simple_fields(self): + """ + Return the list of "simple" fields for the merge. + + These "simple" fields will not have any special handling for + the merge. In other words the "removing" record values will + be ignored and the "keeping" record values will remain in + place, without modification. + + If the view class defines :attr:`merge_simple_fields`, that + list is returned as-is. Otherwise the list of columns from + :attr:`model_class` is returned. + + :returns: List of simple field names. + """ + if self.merge_simple_fields: + return list(self.merge_simple_fields) + + mapper = sa.inspect(self.get_model_class()) + fields = mapper.columns.keys() + return fields + + def merge_get_additive_fields(self): + """ + Return the list of "additive" fields for the merge. + + Values from the removing/keeping record will be conceptually + added together, for each of these fields. + + If the view class defines :attr:`merge_additive_fields`, that + list is returned as-is. Otherwise an empty list is returned. + + :returns: List of additive field names. + """ + if self.merge_additive_fields: + return list(self.merge_additive_fields) + return [] + + def merge_get_coalesce_fields(self): + """ + Return the list of "coalesce" fields for the merge. + + Values from the removing/keeping record will be conceptually + "coalesced" for each of these fields. + + If the view class defines :attr:`merge_coalesce_fields`, that + list is returned as-is. Otherwise an empty list is returned. + + :returns: List of coalesce field names. + """ + if self.merge_coalesce_fields: + return list(self.merge_coalesce_fields) + return [] + + def merge_get_all_fields(self): + """ + Return the list of *all* fields for the merge. + + This will call each of the following methods to collect all + field names, then it returns the full *sorted* list. + + * :meth:`merge_get_additive_fields()` + * :meth:`merge_get_coalesce_fields()` + * :meth:`merge_get_simple_fields()` + + :returns: Sorted list of all field names. + """ + fields = set() + fields.update(self.merge_get_simple_fields()) + fields.update(self.merge_get_additive_fields()) + fields.update(self.merge_get_coalesce_fields()) + return sorted(fields) + + def merge_get_data(self, obj): + """ + Return a data dict for the given object, which will be either + the "removing" or "keeping" record for the merge. + + By default this calls :meth:`merge_get_all_fields()` and then + for each field, calls ``getattr()`` on the object. Subclass + can override as needed for custom logic. + + :param obj: Reference to model/record instance. + + :returns: Data dict with all field values. + """ + return {f: getattr(obj, f, None) for f in self.merge_get_all_fields()} + + def merge_get_final_data(self, removing, keeping): + """ + Return the "final" data dict for the merge. + + The result will be identical to the "keeping" record, for all + "simple" fields. However the "additive" and "coalesce" fields + are handled specially per their nature, in which case those + final values may or may not match the "keeping" record. + + :param removing: Data dict for the "removing" record. + + :param keeping: Data dict for the "keeping" record. + + :returns: Data dict with all "final" field values. + + See also: + + * :meth:`merge()` + * :meth:`merge_get_additive_fields()` + * :meth:`merge_get_coalesce_fields()` + """ + final = dict(keeping) + + for field in self.merge_get_additive_fields(): + if isinstance(keeping[field], list): + final[field] = sorted(set(removing[field] + keeping[field])) + else: + final[field] = removing[field] + keeping[field] + + for field in self.merge_get_coalesce_fields(): + if removing[field] is not None and keeping[field] is None: + final[field] = removing[field] + elif removing[field] and not keeping[field]: + final[field] = removing[field] + + return final + + def merge_validate_and_execute(self, removing, keeping): + """ + Validate and execute a merge for the two given records. It is + called from :meth:`merge()`. + + This calls :meth:`merge_why_not()` and if that does not yield + a reason to prevent the merge, then calls + :meth:`merge_execute()`. + + If there was a reason not to merge, or if an error occurs + during merge execution, a flash warning/error message is set + to notify the user what happened. + + :param removing: Reference to the "removing" model instance/record. + + :param keeping: Reference to the "keeping" model instance/record. + + :returns: Boolean indicating whether merge execution completed + successfully. + """ + session = self.Session() + + # validate the merge + if reason := self.merge_why_not(removing, keeping): + warning = HTML.tag( + "p", class_="block", c="Merge cannot proceed:" + ) + HTML.tag("p", class_="block", c=reason) + self.request.session.flash(warning, "warning") + return False + + # execute the merge + removed_title = str(removing) + try: + self.merge_execute(removing, keeping) + session.flush() + except Exception as err: # pylint: disable=broad-exception-caught + session.rollback() + log.warning("merge failed", exc_info=True) + warning = HTML.tag("p", class_="block", c="Merge failed:") + HTML.tag( + "p", class_="block", c=self.app.render_error(err) + ) + self.request.session.flash(warning, "error") + return False + + self.request.session.flash(f"{removed_title} has been merged into {keeping}") + return True + + def merge_why_not(self, removing, keeping): # pylint: disable=unused-argument + """ + Can return a "reason" why the two given records should not be merged. + + This returns ``None`` by default, indicating the merge is + allowed. Subclass can override as needed for custom logic. + + See also :meth:`merge_validate_and_execute()`. + + :param removing: Reference to the "removing" model instance/record. + + :param keeping: Reference to the "keeping" model instance/record. + + :returns: Reason not to merge (as string), or ``None``. + """ + return None + + def merge_execute(self, removing, keeping): # pylint: disable=unused-argument + """ + Execute the actual merge for the two given objects. + + Default logic simply deletes the "removing" record. Subclass + can override as needed for custom logic. + + See also :meth:`merge_validate_and_execute()`. + + :param removing: Reference to the "removing" model instance/record. + + :param keeping: Reference to the "keeping" model instance/record. + """ + session = self.Session() + + # nb. default "merge" does not update kept object! + session.delete(removing) + ############################## # row methods ############################## @@ -3934,7 +4276,7 @@ class MasterView(View): # pylint: disable=too-many-public-methods cls._defaults(config) @classmethod - def _defaults(cls, config): # pylint: disable=too-many-statements + def _defaults(cls, config): # pylint: disable=too-many-statements,too-many-branches wutta_config = config.registry.settings.get("wutta_config") app = wutta_config.get_app() @@ -4031,6 +4373,21 @@ class MasterView(View): # pylint: disable=too-many-public-methods f"Delete {model_title_plural} in bulk", ) + # merge + if cls.mergeable: + config.add_wutta_permission( + permission_prefix, + f"{permission_prefix}.merge", + f"Merge 2 {model_title_plural}", + ) + config.add_route(f"{route_prefix}.merge", f"{url_prefix}/merge") + config.add_view( + cls, + attr="merge", + route_name=f"{route_prefix}.merge", + permission=f"{permission_prefix}.merge", + ) + # autocomplete if cls.has_autocomplete: config.add_route( diff --git a/src/wuttaweb/views/people.py b/src/wuttaweb/views/people.py index b5d49ba..27e1bab 100644 --- a/src/wuttaweb/views/people.py +++ b/src/wuttaweb/views/people.py @@ -2,7 +2,7 @@ ################################################################################ # # wuttaweb -- Web App for Wutta Framework -# Copyright © 2024-2025 Lance Edgar +# Copyright © 2024-2026 Lance Edgar # # This file is part of Wutta Framework. # @@ -71,6 +71,9 @@ class PersonView(MasterView): # pylint: disable=abstract-method "users", ] + mergeable = True + merge_additive_fields = ["usernames"] + def configure_grid(self, grid): # pylint: disable=empty-docstring """ """ g = grid @@ -129,6 +132,32 @@ class PersonView(MasterView): # pylint: disable=abstract-method return person + def merge_get_data(self, obj): # pylint: disable=empty-docstring + """ """ + person = obj + data = super().merge_get_data(person) + + data["usernames"] = sorted([u.username for u in person.users]) + + return data + + def merge_execute(self, removing, keeping): + """ + We override default merge logic to re-assign users if needed. + + See also parent method: + :meth:`~wuttaweb.views.master.MasterView.merge_execute()` + """ + session = self.Session() + + # reassign users + for user in list(removing.users): + user.person = keeping + session.flush() + + # continue default merge + super().merge_execute(removing, keeping) + def autocomplete_query(self, term): # pylint: disable=empty-docstring """ """ model = self.app.model diff --git a/src/wuttaweb/views/users.py b/src/wuttaweb/views/users.py index 9333b37..684d9c7 100644 --- a/src/wuttaweb/views/users.py +++ b/src/wuttaweb/views/users.py @@ -2,7 +2,7 @@ ################################################################################ # # wuttaweb -- Web App for Wutta Framework -# Copyright © 2024-2025 Lance Edgar +# Copyright © 2024-2026 Lance Edgar # # This file is part of Wutta Framework. # @@ -24,6 +24,8 @@ Views for users """ +import sqlalchemy as sa + from wuttjamaican.db.model import User from wuttaweb.views import MasterView from wuttaweb.forms import widgets @@ -72,6 +74,9 @@ class UserView(MasterView): # pylint: disable=abstract-method "api_tokens", ] + mergeable = True + merge_additive_fields = ["roles"] + def get_query(self, session=None): # pylint: disable=empty-docstring """ """ query = super().get_query(session=session) @@ -360,6 +365,113 @@ class UserView(MasterView): # pylint: disable=abstract-method auth.delete_api_token(token) return {} + def merge_get_simple_fields(self): # pylint: disable=empty-docstring + """ """ + fields = super().merge_get_simple_fields() + + if "password" in fields: + fields.remove("password") + + return fields + + def merge_get_additive_fields(self): # pylint: disable=empty-docstring + """ """ + fields = super().merge_get_additive_fields() + + if self.app.continuum_is_enabled(): + if "transaction_count" not in fields: + fields.append("transaction_count") + + return fields + + def merge_get_data(self, obj): # pylint: disable=empty-docstring + """ """ + data = super().merge_get_data(obj) + model_class = self.get_model_class() + session = self.Session() + user = obj + + data["roles"] = sorted([role.name for role in user.roles]) + + if self.app.continuum_is_enabled(): + import sqlalchemy_continuum as continuum # pylint: disable=import-outside-toplevel + + txncls = continuum.transaction_class(model_class) + data["transaction_count"] = ( + session.query(txncls).filter(txncls.user == user).count() + ) + + return data + + def merge_why_not(self, removing, keeping): + """ + This checks to ensure the *current* user is not the same as + the "removing" user. + + See also parent method: + :meth:`~wuttaweb.views.master.MasterView.merge_why_not()` + """ + if removing is self.request.user: + return "Cannot remove user who is currently logged in!" + return None + + def merge_execute(self, removing, keeping): + """ + The logic to merge 2 users is extended as follows: + + The "keeping" user will be assigned to all roles to which the + "removing" user belonged. + + Any upgrades created or executed by the "removing" user will + be updated to reference the "keeping" user instead. + + Any versioning (SQLAlchemy-Continuum) transactions created by + the "removing" user will be updated to reference the "keeping" + user instead. + + See also parent method: + :meth:`~wuttaweb.views.master.MasterView.merge_execute()` + """ + model = self.app.model + session = self.Session() + model_class = self.get_model_class() + + # transfer role membership + for role in list(removing.roles): + if role not in keeping.roles: + keeping.roles.append(role) + + # reassign upgrade "created by" + stmt = ( + sa.update(model.Upgrade) + .where(model.Upgrade.created_by_uuid == removing.uuid) + .values(created_by_uuid=keeping.uuid) + ) + session.execute(stmt, execution_options={"synchronize_session": False}) + + # reassign upgrade "executed by" + stmt = ( + sa.update(model.Upgrade) + .where(model.Upgrade.executed_by_uuid == removing.uuid) + .values(executed_by_uuid=keeping.uuid) + ) + session.execute(stmt, execution_options={"synchronize_session": False}) + + # reassign continuum transactions + if self.app.continuum_is_enabled(): + import sqlalchemy_continuum as continuum # pylint: disable=import-outside-toplevel + + txncls = continuum.transaction_class(model_class) + stmt = ( + sa.update(txncls) + .where(txncls.user_id == removing.uuid) + .values(user_id=keeping.uuid) + ) + session.execute(stmt, execution_options={"synchronize_session": False}) + + # continue default merge + super().merge_execute(removing, keeping) + @classmethod def defaults(cls, config): # pylint: disable=empty-docstring """ """ diff --git a/tests/test_diffs.py b/tests/test_diffs.py index 6f66893..90d2503 100644 --- a/tests/test_diffs.py +++ b/tests/test_diffs.py @@ -33,6 +33,94 @@ class TestWebDiff(WebTestCase): self.assertIn("", html) +class TestMergeDiff(WebTestCase): + + def make_diff(self, *args, **kwargs): + return mod.MergeDiff(self.config, *args, **kwargs) + + def test_get_final_value_attrs(self): + + # no diff + removing = {"foo": "bar"} + keeping = {"foo": "bar"} + final = {"foo": "bar"} + is_diff = False + diff = self.make_diff(removing, keeping, final) + attrs = diff.get_final_value_attrs(is_diff) + self.assertEqual(attrs, {}) + + # values differ + keeping = {"foo": "baz"} + final = {"foo": "baz"} + is_diff = True + diff = self.make_diff(removing, keeping, final) + attrs = diff.get_final_value_attrs(is_diff) + self.assertEqual(attrs, {"class_": "has-background-warning"}) + + def test_render_field_row(self): + + # no diffs + removing = {"foo": "bar"} + keeping = {"foo": "bar"} + final = {"foo": "bar"} + diff = self.make_diff(removing, keeping, final) + html = diff.render_field_row("foo") + self.assertTrue(html.startswith("")) + self.assertNotIn('style="padding: 0.5rem;"', html) + self.assertIn("'bar'", html) + self.assertNotIn(f'style="background-color: {diff.old_color}"', html) + self.assertNotIn("'baz'", html) + self.assertNotIn(f'style="background-color: {diff.new_color}"', html) + self.assertNotIn("'bar,baz'", html) + self.assertNotIn('class="has-background-warning"', html) + self.assertTrue(html.endswith("")) + + # remove vs. keep diffs, but not keep vs. final + removing = {"foo": "bar"} + keeping = {"foo": "baz"} + final = {"foo": "baz"} + diff = self.make_diff(removing, keeping, final) + html = diff.render_field_row("foo") + self.assertTrue(html.startswith("")) + self.assertNotIn('style="padding: 0.5rem;"', html) + self.assertIn("'bar'", html) + self.assertIn(f'style="background-color: {diff.old_color}"', html) + self.assertIn("'baz'", html) + self.assertIn(f'style="background-color: {diff.new_color}"', html) + self.assertNotIn("'bar,baz'", html) + self.assertNotIn('class="has-background-warning"', html) + self.assertTrue(html.endswith("")) + + # remove vs. keep diffs, *and* keep vs. final diffs + removing = {"foo": "bar"} + keeping = {"foo": "baz"} + final = {"foo": "bar,baz"} + diff = self.make_diff(removing, keeping, final) + html = diff.render_field_row("foo") + self.assertTrue(html.startswith("")) + self.assertNotIn('style="padding: 0.5rem;"', html) + self.assertIn("'bar'", html) + self.assertIn(f'style="background-color: {diff.old_color}"', html) + self.assertIn("'baz'", html) + self.assertIn(f'style="background-color: {diff.new_color}"', html) + self.assertIn("'bar,baz'", html) + self.assertIn('class="has-background-warning"', html) + self.assertTrue(html.endswith("")) + + # again, with cell padding + diff.cell_padding = "0.5rem" + html = diff.render_field_row("foo") + self.assertTrue(html.startswith("")) + self.assertIn('style="padding: 0.5rem"', html) + self.assertIn("'bar'", html) + self.assertIn(f"background-color: {diff.old_color}", html) + self.assertIn("'baz'", html) + self.assertIn(f"background-color: {diff.new_color}", html) + self.assertIn("'bar,baz'", html) + self.assertIn('class="has-background-warning"', html) + self.assertTrue(html.endswith("")) + + class TestVersionDiff(VersionWebTestCase): def make_diff(self, *args, **kwargs): diff --git a/tests/views/test_master.py b/tests/views/test_master.py index bb31e8c..c1b493d 100644 --- a/tests/views/test_master.py +++ b/tests/views/test_master.py @@ -10,6 +10,7 @@ from sqlalchemy import orm from pyramid import testing from pyramid.response import Response from pyramid.httpexceptions import HTTPNotFound, HTTPFound +from webhelpers2.html import HTML from wuttjamaican.conf import WuttaConfig from wuttaweb.views import master as mod @@ -36,6 +37,7 @@ class TestMasterView(WebTestCase): downloadable=True, executable=True, configurable=True, + mergeable=True, has_rows=True, rows_creatable=True, ): @@ -623,73 +625,122 @@ class TestMasterView(WebTestCase): def test_make_model_grid(self): self.pyramid_config.add_route("settings.delete_bulk", "/settings/delete-bulk") + self.pyramid_config.add_route("people.merge", "/people/merge") model = self.app.model - # no model class - with patch.multiple( - mod.MasterView, create=True, model_name="Widget", model_key="uuid" - ): - view = mod.MasterView(self.request) - grid = view.make_model_grid() - self.assertIsNone(grid.model_class) + with patch.object(mod.MasterView, "Session", return_value=self.session): - # explicit model class - with patch.multiple(mod.MasterView, create=True, model_class=model.Setting): - grid = view.make_model_grid(session=self.session) - self.assertIs(grid.model_class, model.Setting) + # no model class + with patch.multiple( + mod.MasterView, create=True, model_name="Widget", model_key="uuid" + ): + view = self.make_view() + grid = view.make_model_grid() + self.assertIsNone(grid.model_class) - # no row class by default - with patch.multiple(mod.MasterView, create=True, model_class=model.Setting): - grid = view.make_model_grid(session=self.session) - self.assertIsNone(grid.row_class) - - # can specify row class - get_row_class = MagicMock() - with patch.multiple( - mod.MasterView, - create=True, - model_class=model.Setting, - grid_row_class=get_row_class, - ): - grid = view.make_model_grid(session=self.session) - self.assertIs(grid.row_class, get_row_class) - - # no actions by default - with patch.multiple(mod.MasterView, create=True, model_class=model.Setting): - grid = view.make_model_grid(session=self.session) - self.assertEqual(grid.actions, []) - - # now let's test some more actions logic - with patch.multiple( - mod.MasterView, - create=True, - model_class=model.Setting, - viewable=True, - editable=True, - deletable=True, - ): - - # should have 3 actions now, but for lack of perms - grid = view.make_model_grid(session=self.session) - self.assertEqual(len(grid.actions), 0) - - # but root user has perms, so gets 3 actions - with patch.object(self.request, "is_root", new=True): + # explicit model class + with patch.multiple(mod.MasterView, create=True, model_class=model.Setting): + view = self.make_view() grid = view.make_model_grid(session=self.session) - self.assertEqual(len(grid.actions), 3) + self.assertIs(grid.model_class, model.Setting) - # no tools by default - with patch.multiple(mod.MasterView, create=True, model_class=model.Setting): - grid = view.make_model_grid(session=self.session) - self.assertEqual(grid.tools, {}) - - # delete-results tool added if master/perms allow - with patch.multiple( - mod.MasterView, create=True, model_class=model.Setting, deletable_bulk=True - ): - with patch.object(self.request, "is_root", new=True): + # no row class by default + with patch.multiple(mod.MasterView, create=True, model_class=model.Setting): + view = self.make_view() grid = view.make_model_grid(session=self.session) - self.assertIn("delete-results", grid.tools) + self.assertIsNone(grid.row_class) + + # can specify row class + get_row_class = MagicMock() + with patch.multiple( + mod.MasterView, + create=True, + model_class=model.Setting, + grid_row_class=get_row_class, + ): + view = self.make_view() + grid = view.make_model_grid(session=self.session) + self.assertIs(grid.row_class, get_row_class) + + # no actions by default + with patch.multiple(mod.MasterView, create=True, model_class=model.Setting): + view = self.make_view() + grid = view.make_model_grid(session=self.session) + self.assertEqual(grid.actions, []) + + # now let's test some more actions logic + with patch.multiple( + mod.MasterView, + create=True, + model_class=model.Setting, + viewable=True, + editable=True, + deletable=True, + ): + view = self.make_view() + + # should have 3 actions now, but for lack of perms + grid = view.make_model_grid(session=self.session) + self.assertEqual(len(grid.actions), 0) + + # but root user has perms, so gets 3 actions + with patch.object(self.request, "is_root", new=True): + grid = view.make_model_grid(session=self.session) + self.assertEqual(len(grid.actions), 3) + + # no tools by default + with patch.multiple(mod.MasterView, create=True, model_class=model.Setting): + view = self.make_view() + grid = view.make_model_grid(session=self.session) + self.assertEqual(grid.tools, {}) + + # delete-results tool added if master/perms allow + with patch.multiple( + mod.MasterView, + create=True, + model_class=model.Setting, + deletable_bulk=True, + ): + view = self.make_view() + with patch.object(self.request, "is_root", new=True): + grid = view.make_model_grid(session=self.session) + self.assertIn("delete-results", grid.tools) + + # merge tool added if master/perms allow + with patch.multiple( + mod.MasterView, + model_class=model.Person, + route_prefix="people", + mergeable=True, + create=True, + ): + view = self.make_view() + with patch.object(self.request, "is_root", new=True): + grid = view.make_model_grid() + self.assertIn("merge", grid.tools) + + # test checkable flag + with patch.multiple( + mod.MasterView, + model_class=model.Person, + route_prefix="people", + create=True, + ): + view = self.make_view() + + # not checkable by default + grid = view.make_model_grid() + self.assertFalse(grid.checkable) + + # but can override + grid = view.make_model_grid(checkable=True) + self.assertTrue(grid.checkable) + + # checkable is true if merge allowed + with patch.object(mod.MasterView, "mergeable", new=True): + with patch.object(self.request, "is_root", new=True): + grid = view.make_model_grid() + self.assertTrue(grid.checkable) def test_get_grid_data(self): model = self.app.model @@ -1604,6 +1655,426 @@ class TestMasterView(WebTestCase): # nb. nothing was deleted self.assertEqual(self.session.query(model.Setting).count(), 6) + def test_merge_get_simple_fields(self): + model = self.app.model + with patch.object(mod.MasterView, "model_class", new=model.Person): + view = self.make_view() + + # fields include table columns by default + fields = view.merge_get_simple_fields() + self.assertEqual( + fields, + ["uuid", "full_name", "first_name", "middle_name", "last_name"], + ) + + # but class can specify fields + view.merge_simple_fields = ["first_name", "last_name"] + fields = view.merge_get_simple_fields() + self.assertEqual( + fields, + ["first_name", "last_name"], + ) + + def test_merge_get_additive_fields(self): + model = self.app.model + with patch.object(mod.MasterView, "model_class", new=model.Person): + view = self.make_view() + + # no additive fields by default + fields = view.merge_get_additive_fields() + self.assertEqual(fields, []) + + # but class can specify fields + view.merge_additive_fields = ["usernames"] + fields = view.merge_get_additive_fields() + self.assertEqual(fields, ["usernames"]) + + def test_merge_get_coalesce_fields(self): + model = self.app.model + with patch.object(mod.MasterView, "model_class", new=model.Person): + view = self.make_view() + + # no coalesce fields by default + fields = view.merge_get_coalesce_fields() + self.assertEqual(fields, []) + + # but class can specify fields + view.merge_coalesce_fields = ["active"] + fields = view.merge_get_coalesce_fields() + self.assertEqual(fields, ["active"]) + + def test_merge_get_all_fields(self): + model = self.app.model + with patch.object(mod.MasterView, "model_class", new=model.Person): + view = self.make_view() + + # nb. "all" fields will be a sorted list + + # only column (simple) fields by default + fields = view.merge_get_all_fields() + self.assertEqual( + fields, + ["first_name", "full_name", "last_name", "middle_name", "uuid"], + ) + + # but class can specify fields + view.merge_simple_fields = ["first_name", "last_name"] + view.merge_additive_fields = ["usernames"] + view.merge_coalesce_fields = ["active"] + fields = view.merge_get_all_fields() + self.assertEqual( + fields, + ["active", "first_name", "last_name", "usernames"], + ) + + def test_merge_get_data(self): + model = self.app.model + person = model.Person(first_name="Fred", last_name="Flintstone") + with patch.object(mod.MasterView, "model_class", new=model.Person): + view = self.make_view() + + # data will include "all" fields + view.merge_simple_fields = ["first_name", "last_name", "usernames"] + data = view.merge_get_data(person) + self.assertEqual( + data, + { + "first_name": "Fred", + "last_name": "Flintstone", + # nb. person has no such attr, so null value + "usernames": None, + }, + ) + + def test_merge_get_final_data(self): + model = self.app.model + + removing = { + "first_name": "Freddie", + "last_name": "Flintstone", + "user_count": 1, + "usernames": ["freddie"], + "some_value": 42, + "active": True, + } + + keeping = { + "first_name": "Fred", + "last_name": "Flintstone", + "user_count": 1, + "usernames": ["fred"], + "some_value": None, + "active": False, + } + + with patch.object(mod.MasterView, "model_class", new=model.Person): + view = self.make_view() + view.merge_simple_fields = ["first_name", "last_name"] + view.merge_additive_fields = ["user_count", "usernames"] + view.merge_coalesce_fields = ["some_value", "active"] + + final = view.merge_get_final_data(removing, keeping) + self.assertEqual( + final, + { + "first_name": "Fred", + "last_name": "Flintstone", + "user_count": 2, + "usernames": ["fred", "freddie"], + "some_value": 42, + "active": True, + }, + ) + + def test_merge_execute(self): + model = self.app.model + + person1 = model.Person( + first_name="Freddie", last_name="Flintstone", full_name="Freddie Flintstone" + ) + self.session.add(person1) + person2 = model.Person( + first_name="Fred", last_name="Flintstone", full_name="Fred Flintstone" + ) + self.session.add(person2) + self.session.commit() + self.assertEqual(self.session.query(model.Person).count(), 2) + + with patch.object(mod.MasterView, "Session", return_value=self.session): + view = self.make_view() + + # default merge logic just deletes 'removing' person1 + view.merge_execute(person1, person2) + self.assertEqual(self.session.query(model.Person).count(), 1) + person = self.session.query(model.Person).one() + self.assertIs(person, person2) + self.assertEqual(person.first_name, "Fred") + self.assertEqual(person.full_name, "Fred Flintstone") + + def test_merge_validate_and_execute(self): + model = self.app.model + + person1 = model.Person( + first_name="Freddie", last_name="Flintstone", full_name="Freddie Flintstone" + ) + self.session.add(person1) + person2 = model.Person( + first_name="Fred", last_name="Flintstone", full_name="Fred Flintstone" + ) + self.session.add(person2) + self.session.commit() + self.assertEqual(self.session.query(model.Person).count(), 2) + self.assertIn(person1, self.session) + + with patch.object(mod.MasterView, "Session", return_value=self.session): + view = self.make_view() + + # default merge logic just deletes 'removing' person1 + result = view.merge_validate_and_execute(person1, person2) + self.assertTrue(result) + self.assertEqual(self.session.query(model.Person).count(), 1) + person = self.session.query(model.Person).one() + self.assertIs(person, person2) + self.assertEqual(person.first_name, "Fred") + self.assertEqual(person.full_name, "Fred Flintstone") + self.assertFalse(self.request.session.peek_flash("warning")) + self.assertFalse(self.request.session.peek_flash("error")) + self.assertEqual( + self.request.session.pop_flash(), + ["Freddie Flintstone has been merged into Fred Flintstone"], + ) + + # restore Freddie + self.assertNotIn(person1, self.session) + person1 = self.session.merge(person1) + self.session.commit() + self.assertEqual(self.session.query(model.Person).count(), 2) + + # merge does not validate + with patch.object(view, "merge_why_not", return_value="because i said so"): + result = view.merge_validate_and_execute(person1, person2) + self.assertFalse(result) + self.assertEqual(self.session.query(model.Person).count(), 2) + self.assertFalse(self.request.session.peek_flash()) + self.assertFalse(self.request.session.peek_flash("error")) + self.assertEqual( + self.request.session.pop_flash("warning"), + [ + HTML.literal( + '

Merge cannot proceed:

' + '

because i said so

' + ), + ], + ) + + # error executing merge + with patch.object(view, "merge_execute", side_effect=RuntimeError): + result = view.merge_validate_and_execute(person1, person2) + self.assertFalse(result) + self.assertEqual(self.session.query(model.Person).count(), 2) + self.assertFalse(self.request.session.peek_flash()) + self.assertFalse(self.request.session.peek_flash("warning")) + self.assertEqual( + self.request.session.pop_flash("error"), + [ + HTML.literal( + '

Merge failed:

' + '

RuntimeError

' + ), + ], + ) + + def test_merge(self): + self.pyramid_config.add_route("home", "/") + self.pyramid_config.add_route("login", "/auth/login") + self.pyramid_config.add_route("people", "/people/") + self.pyramid_config.add_route("people.merge", "/people/merge") + self.pyramid_config.add_route("people.view", "/people/{uuid}") + model = self.app.model + + class MergeRoute: + name = "people.merge" + + person1 = model.Person( + first_name="Freddie", last_name="Flintstone", full_name="Freddie Flintstone" + ) + self.session.add(person1) + person2 = model.Person( + first_name="Fred", last_name="Flintstone", full_name="Fred Flintstone" + ) + self.session.add(person2) + self.session.commit() + self.assertEqual(self.session.query(model.Person).count(), 2) + self.assertIn(person1, self.session) + + with patch.multiple( + mod.MasterView, + Session=MagicMock(return_value=self.session), + model_class=model.Person, + route_prefix="people", + create=True, + ): + view = self.make_view() + + # GET request will redirect to index + result = view.merge() + self.assertIsInstance(result, HTTPFound) + self.assertEqual(result.location, "http://example.com/people/") + self.assertEqual(self.session.query(model.Person).count(), 2) + + # assume POST from now on + with patch.multiple( + self.request, matched_route=MergeRoute, method="POST", create=True + ): + + # POST without 'execute-merge' flag shows user the diff + with patch.object( + self.request, + "POST", + new={"uuids": f"{person1.uuid},{person2.uuid}"}, + ): + response = view.merge() + self.assertIsInstance(response, Response) + self.assertEqual(response.status_code, 200) + self.assertEqual(self.session.query(model.Person).count(), 2) + self.assertFalse(self.request.session.peek_flash()) + self.assertFalse(self.request.session.peek_flash("warning")) + self.assertFalse(self.request.session.peek_flash("error")) + + # default merge logic deletes person1, then redirects + with patch.object( + self.request, + "POST", + new={ + "uuids": f"{person1.uuid},{person2.uuid}", + "execute-merge": "true", + }, + ): + result = view.merge() + self.assertIsInstance(result, HTTPFound) + self.assertEqual( + result.location, f"http://example.com/people/{person2.uuid}" + ) + self.assertEqual(self.session.query(model.Person).count(), 1) + self.assertNotIn(person1, self.session) + self.assertIn(person2, self.session) + person = self.session.query(model.Person).one() + self.assertIs(person, person2) + self.assertEqual(person.first_name, "Fred") + self.assertEqual(person.full_name, "Fred Flintstone") + self.assertFalse(self.request.session.peek_flash("warning")) + self.assertFalse(self.request.session.peek_flash("error")) + self.assertEqual( + self.request.session.pop_flash(), + ["Freddie Flintstone has been merged into Fred Flintstone"], + ) + + # restore Freddie + self.assertNotIn(person1, self.session) + person1 = self.session.merge(person1) + self.session.commit() + self.assertEqual(self.session.query(model.Person).count(), 2) + + # simple redirect if invalid uuids specified + with patch.object( + self.request, + "POST", + new={ + "uuids": "bogus1,bogus2", + "execute-merge": "true", + }, + ): + with self.assertRaises(HTTPFound) as cm: + view.merge() + self.assertEqual( + cm.exception.location, "http://example.com/people/" + ) + self.assertEqual(self.session.query(model.Person).count(), 2) + self.assertFalse(self.request.session.peek_flash()) + self.assertFalse(self.request.session.peek_flash("warning")) + self.assertFalse(self.request.session.peek_flash("error")) + + # simple redirect if unknown uuids specified + fake1 = self.app.make_true_uuid() + fake2 = self.app.make_true_uuid() + with patch.object( + self.request, + "POST", + new={ + "uuids": f"{fake1},{fake2}", + "execute-merge": "true", + }, + ): + with self.assertRaises(HTTPFound) as cm: + view.merge() + self.assertEqual( + cm.exception.location, "http://example.com/people/" + ) + self.assertEqual(self.session.query(model.Person).count(), 2) + self.assertFalse(self.request.session.peek_flash()) + self.assertFalse(self.request.session.peek_flash("warning")) + self.assertFalse(self.request.session.peek_flash("error")) + + # warning redirect if merge does not validate + with patch.object( + self.request, + "POST", + new={ + "uuids": f"{person1.uuid},{person2.uuid}", + "execute-merge": "true", + }, + ): + with patch.object( + view, "merge_why_not", return_value="because i said so" + ): + response = view.merge() + self.assertIsInstance(response, Response) + self.assertEqual(self.session.query(model.Person).count(), 2) + self.assertFalse(self.request.session.peek_flash()) + self.assertFalse(self.request.session.peek_flash("error")) + # TODO: since response is already rendered, the warning flash + # msg has already been popped off the stack..will have to + # avoid render_to_response() to properly test that.. + self.assertFalse(self.request.session.peek_flash("warning")) + # self.assertEqual( + # self.request.session.pop_flash("warning"), + # [ + # HTML.literal( + # '

Merge cannot proceed:

' + # '

because i said so

' + # ), + # ], + # ) + + # error redirect if merge execution fails + with patch.object( + self.request, + "POST", + new={ + "uuids": f"{person1.uuid},{person2.uuid}", + "execute-merge": "true", + }, + ): + with patch.object(view, "merge_execute", side_effect=RuntimeError): + response = view.merge() + self.assertIsInstance(response, Response) + self.assertEqual(self.session.query(model.Person).count(), 2) + self.assertFalse(self.request.session.peek_flash()) + self.assertFalse(self.request.session.peek_flash("warning")) + # TODO: since response is already rendered, the error flash + # msg has already been popped off the stack..will have to + # avoid render_to_response() to properly test that.. + self.assertFalse(self.request.session.peek_flash("error")) + # self.assertEqual( + # self.request.session.pop_flash("error"), + # [ + # HTML.literal( + # '

Merge failed:

' + # '

RuntimeError

' + # ), + # ], + # ) + def test_autocomplete(self): model = self.app.model diff --git a/tests/views/test_people.py b/tests/views/test_people.py index 29f6694..4344589 100644 --- a/tests/views/test_people.py +++ b/tests/views/test_people.py @@ -154,3 +154,43 @@ class TestPersonView(WebTestCase): response = view.make_user() # nb. this always redirects for now self.assertEqual(response.status_code, 302) + + def test_merge_get_data(self): + model = self.app.model + + person = model.Person(full_name="Fred Flintstone") + self.session.add(person) + user = model.User(username="fred", person=person) + self.session.add(user) + self.session.flush() + + view = self.make_view() + data = view.merge_get_data(person) + self.assertIn("usernames", data) + self.assertEqual(data["usernames"], ["fred"]) + + def test_merge_execute(self): + model = self.app.model + + person1 = model.Person(full_name="Freddie Flintstone") + self.session.add(person1) + user1 = model.User(username="freddie", person=person1) + self.session.add(user1) + + person2 = model.Person(full_name="Fred Flintstone") + self.session.add(person2) + user2 = model.User(username="fred", person=person2) + self.session.add(user2) + + self.session.commit() + self.assertEqual(self.session.query(model.Person).count(), 2) + self.assertEqual(self.session.query(model.User).count(), 2) + + view = self.make_view() + with patch.object(view, "Session", return_value=self.session): + view.merge_execute(person1, person2) + self.assertEqual(self.session.query(model.Person).count(), 1) + self.assertEqual(self.session.query(model.User).count(), 2) + person = self.session.query(model.Person).one() + self.assertIs(person, person2) + self.assertEqual(len(person.users), 2) diff --git a/tests/views/test_users.py b/tests/views/test_users.py index 2fe08aa..b4eecb4 100644 --- a/tests/views/test_users.py +++ b/tests/views/test_users.py @@ -1,6 +1,6 @@ # -*- coding: utf-8; -*- -from unittest.mock import patch +from unittest.mock import patch, MagicMock from sqlalchemy import orm @@ -8,7 +8,7 @@ import colander from wuttaweb.grids import Grid from wuttaweb.views import users as mod -from wuttaweb.testing import WebTestCase, FunctionalTestCase +from wuttaweb.testing import WebTestCase, VersionWebTestCase, FunctionalTestCase class TestUserView(WebTestCase): @@ -496,6 +496,220 @@ class TestUserView(WebTestCase): result = view.delete_api_token() self.assertEqual(result, {"error": "API token not found"}) + def test_merge_get_simple_fields(self): + view = self.make_view() + + # password field should not be included + fields = view.merge_get_simple_fields() + self.assertNotIn("password", fields) + self.assertIn("username", fields) + self.assertIn("active", fields) + + def test_merge_get_additive_fields(self): + view = self.make_view() + + # nb. this is not a "versioned" test case, so transaction_count + # field will not be included + fields = view.merge_get_additive_fields() + self.assertNotIn("transaction_count", fields) + self.assertIn("roles", fields) + + def test_merge_get_data(self): + model = self.app.model + auth = self.app.get_auth_handler() + view = self.make_view() + + admin = auth.get_role_administrator(self.session) + user = model.User(username="fred") + user.roles.append(admin) + self.session.add(user) + self.session.commit() + + # nb. this is not a "versioned" test case, so transaction_count + # field will not be included + data = view.merge_get_data(user) + self.assertEqual(data["username"], "fred") + self.assertEqual(data["roles"], [admin.name]) + self.assertNotIn("transaction_count", data) + + def test_merge_why_not(self): + model = self.app.model + view = self.make_view() + + user1 = model.User(username="freddie") + self.session.add(user1) + user2 = model.User(username="fred") + self.session.add(user2) + self.session.commit() + + # normally no reason not to merge + self.assertIsNone(view.merge_why_not(user1, user2)) + + # can merge even if current user is involved (being kept) + with patch.object(self.request, "user", new=user2): + self.assertIsNone(view.merge_why_not(user1, user2)) + + # but cannot merge if it means removing current user + with patch.object(self.request, "user", new=user1): + reason = view.merge_why_not(user1, user2) + self.assertEqual(reason, "Cannot remove user who is currently logged in!") + + def test_merge_execute(self): + model = self.app.model + enum = self.app.enum + auth = self.app.get_auth_handler() + view = self.make_view() + + admin = auth.get_role_administrator(self.session) + user1 = model.User(username="freddie") + user1.roles.append(admin) + self.session.add(user1) + user2 = model.User(username="fred") + self.session.add(user2) + + upgrade = model.Upgrade( + description="test", + created_by=user1, + executed_by=user1, + status=enum.UpgradeStatus.SUCCESS, + ) + self.session.add(upgrade) + self.session.commit() + + with patch.object(view, "Session", return_value=self.session): + view.merge_execute(user1, user2) + self.session.commit() + + self.assertEqual(self.session.query(model.User).count(), 1) + self.assertNotIn(user1, self.session) + self.assertIn(user2, self.session) + + self.assertIn(admin, user2.roles) + + self.assertIs(upgrade.created_by, user2) + self.assertIs(upgrade.executed_by, user2) + + +class TestVersionedUserView(VersionWebTestCase): + + def make_view(self): + return mod.UserView(self.request) + + def test_merge_get_additive_fields(self): + view = self.make_view() + + # nb. contrast this to the "non-versioned" test case above + fields = view.merge_get_additive_fields() + self.assertIn("transaction_count", fields) + self.assertIn("roles", fields) + + def test_merge_get_data(self): + import sqlalchemy_continuum as continuum + + model = self.app.model + auth = self.app.get_auth_handler() + txncls = continuum.transaction_class(model.User) + + # nb. must reset the User model reference, due to nature of + # test setup/teardown + with patch.multiple( + mod.UserView, + model_class=model.User, + Session=MagicMock(return_value=self.session), + ): + view = self.make_view() + + # make admin user + admin = auth.get_role_administrator(self.session) + user = model.User(username="fred") + user.roles.append(admin) + self.session.add(user) + self.session.commit() + self.assertEqual(self.session.query(txncls).count(), 1) + + # nb. contrast this to the "non-versioned" test case above + data = view.merge_get_data(user) + self.assertEqual(data["username"], "fred") + self.assertEqual(data["roles"], [admin.name]) + self.assertEqual(data["transaction_count"], 0) + + # admin user then creates 2 records w/ 1 txn + # nb. must trick wuttaweb continuum plugin to assign author + with patch.object(self.request, "user", new=user): + person1 = model.Person(full_name="Barney Rubble") + self.session.add(person1) + person2 = model.Person(full_name="Betty Rubble") + self.session.add(person2) + self.session.commit() + + self.assertEqual(self.session.query(txncls).count(), 2) + txn1, txn2 = self.session.query(txncls).order_by(txncls.id).all() + self.assertIsNone(txn1.user) + self.assertIs(txn2.user, user) + + # nb. contrast this to the "non-versioned" test case above + data = view.merge_get_data(user) + self.assertEqual(data["username"], "fred") + self.assertEqual(data["roles"], [admin.name]) + self.assertEqual(data["transaction_count"], 1) + + def test_merge_execute(self): + import sqlalchemy_continuum as continuum + + model = self.app.model + auth = self.app.get_auth_handler() + txncls = continuum.transaction_class(model.User) + + # nb. must reset the User model reference, due to nature of + # test setup/teardown + with patch.multiple( + mod.UserView, + model_class=model.User, + Session=MagicMock(return_value=self.session), + ): + view = self.make_view() + + # make pair of users + admin = auth.get_role_administrator(self.session) + user1 = model.User(username="freddie") + user1.roles.append(admin) + self.session.add(user1) + user2 = model.User(username="fred") + self.session.add(user2) + self.session.commit() + self.assertEqual(self.session.query(model.User).count(), 2) + self.assertEqual(self.session.query(txncls).count(), 1) + + # admin user then creates 2 records w/ 1 txn + # nb. must trick wuttaweb continuum plugin to assign author + with patch.object(self.request, "user", new=user1): + person1 = model.Person(full_name="Barney Rubble") + self.session.add(person1) + person2 = model.Person(full_name="Betty Rubble") + self.session.add(person2) + self.session.commit() + + self.assertEqual(self.session.query(txncls).count(), 2) + txn1, txn2 = self.session.query(txncls).order_by(txncls.id).all() + self.assertIsNone(txn1.user) + self.assertIs(txn2.user, user1) + self.assertEqual(len(user2.roles), 0) + + # merge user1 => user2 (as user2, for 3rd txn) + with patch.object(self.request, "user", new=user2): + view.merge_execute(user1, user2) + self.session.commit() + self.assertEqual(self.session.query(txncls).count(), 3) + txn1, txn2, txn3 = self.session.query(txncls).order_by(txncls.id).all() + self.assertIs(txn3.user, user2) + self.assertEqual(self.session.query(model.User).count(), 1) + user = self.session.query(model.User).one() + self.assertIs(user, user2) + + # user2 is now admin, and author of txn2 + self.assertIn(admin, user2.roles) + self.assertIs(txn2.user, user2) + # TODO: this test seems to work fine on its own, but not in conjunction # with the next class below. will have to sort this out before adding