diff --git a/src/wuttaweb/diffs.py b/src/wuttaweb/diffs.py
index 23eaa7e..f4b4baa 100644
--- a/src/wuttaweb/diffs.py
+++ b/src/wuttaweb/diffs.py
@@ -2,7 +2,7 @@
################################################################################
#
# wuttaweb -- Web App for Wutta Framework
-# Copyright © 2024-2025 Lance Edgar
+# Copyright © 2024-2026 Lance Edgar
#
# This file is part of Wutta Framework.
#
@@ -63,6 +63,80 @@ class WebDiff(Diff):
return HTML.literal(html)
+class MergeDiff(WebDiff):
+ """
+ Special diff class for use when merging 2 records. While based on
+ :class:`WebDiff`, this class uses a different signature for the
+ constructor.
+
+ It shows the "removing" record, the "keeping" record, and also the
+ "final" record showing the calculated result of the merge, with
+ special highlighting where values would change on the kept record.
+
+ :param config: The app :term:`config object`.
+
+ :param removing_data: Dict of data for the "removing" record.
+
+ :param new_data: Dict of data for the "keeping" record.
+
+ :param new_data: Dict of "final" data for the kept record.
+
+ :param \\**kwargs: Remaining kwargs are passed as-is to the
+ :class:`WebDiff` constructor.
+ """
+
+ def __init__(self, config, removing_data, keeping_data, final_data, **kwargs):
+ super().__init__(config, removing_data, keeping_data, **kwargs)
+ self.removing_data = removing_data
+ self.keeping_data = keeping_data
+ self.final_data = final_data
+ self.columns = ["field name", "removing", "keeping", "final"]
+
+ def render_field_row(self, field):
+ keep_diff = self.keeping_data.get(field) != self.removing_data.get(field)
+ final_diff = self.final_data.get(field) != self.keeping_data.get(field)
+
+ # TODO: there is a fair bit of duplication here, compared to
+ # base class. should maybe clean that up someday..
+
+ kw = {}
+ if self.cell_padding:
+ kw["style"] = f"padding: {self.cell_padding}"
+ td_field = HTML.tag("td", class_="field", c=field, **kw)
+
+ td_old_value = HTML.tag(
+ "td",
+ c=self.render_old_value(field),
+ **self.get_old_value_attrs(keep_diff),
+ )
+
+ td_new_value = HTML.tag(
+ "td",
+ c=self.render_new_value(field),
+ **self.get_new_value_attrs(keep_diff),
+ )
+
+ td_final_value = HTML.tag(
+ "td",
+ c=self.render_final_value(field),
+ **self.get_final_value_attrs(final_diff),
+ )
+
+ return HTML.tag("tr", c=[td_field, td_old_value, td_new_value, td_final_value])
+
+ def get_final_value_attrs(
+ self, is_diff
+ ): # pylint: disable=missing-function-docstring
+ attrs = {}
+ if is_diff:
+ attrs["class_"] = "has-background-warning"
+ return self.get_cell_attrs(**attrs)
+
+ def render_final_value(self, field): # pylint: disable=missing-function-docstring
+ value = repr(self.final_data.get(field))
+ return self.render_cell_value(value)
+
+
class VersionDiff(WebDiff):
"""
Special diff class for use with version history views. While
diff --git a/src/wuttaweb/templates/master/index.mako b/src/wuttaweb/templates/master/index.mako
index fbc87a3..9b3c3c7 100644
--- a/src/wuttaweb/templates/master/index.mako
+++ b/src/wuttaweb/templates/master/index.mako
@@ -29,8 +29,9 @@
<%def name="modify_vue_vars()">
${parent.modify_vue_vars()}
- % if master.deletable_bulk and master.has_perm('delete_bulk'):
-
- % endif
+ % endif
+
+ % if master.mergeable and master.has_perm("merge"):
+
+ ${grid.vue_component}Data.mergeSubmitting = false
+
+ % endif
+
+
%def>
<%def name="make_vue_components()">
diff --git a/src/wuttaweb/templates/master/merge.mako b/src/wuttaweb/templates/master/merge.mako
new file mode 100644
index 0000000..790387b
--- /dev/null
+++ b/src/wuttaweb/templates/master/merge.mako
@@ -0,0 +1,88 @@
+## -*- coding: utf-8; -*-
+<%inherit file="/page.mako" />
+
+<%def name="title()">Merge 2 ${model_title_plural}%def>
+
+<%def name="page_content()">
+
+
+
+ You are about to merge
+ two ${model_title} records, possibly updating and/or deleting
+ various other records.
+
+
+
+ This tool can show you some basics but is not able to give you
+ the full picture of the implications of this merge.
+
+
+
+ You are urged to proceed with
+ caution! Ideally try the merge on a test site first.
+
+
+
+
+
+ ${h.link_to(str(removing) or "(no title)", master.get_action_url('view', removing))}
+
+
+
+ ${h.link_to(str(keeping) or "(no title)", master.get_action_url('view', keeping))}
+
+
+
+
+ ${diff.render_html()}
+
+
+
+
+
+
+
+
+ ${h.form(request.current_route_url(), **{"@submit": "swapSubmitting = true"})}
+ ${h.csrf_token(request)}
+ ${h.hidden("uuids", value=f"{keeping.uuid},{removing.uuid}")}
+
+ {{ swapSubmitting ? "Working, please wait..." : "Swap which record is kept/removed" }}
+
+ ${h.end_form()}
+
+
+
+ ${h.form(request.current_route_url(), **{"@submit": "mergeSubmitting = true"})}
+ ${h.csrf_token(request)}
+ ${h.hidden("uuids", value=f"{removing.uuid},{keeping.uuid}")}
+ ${h.hidden("execute-merge", value="true")}
+
+ {{ mergeSubmitting ? "Working, please wait..." : "Yes, perform this merge" }}
+
+ ${h.end_form()}
+
+
+
+
+%def>
+
+<%def name="modify_vue_vars()">
+ ${parent.modify_vue_vars()}
+
+%def>
diff --git a/src/wuttaweb/views/master.py b/src/wuttaweb/views/master.py
index 949a0e1..170afb6 100644
--- a/src/wuttaweb/views/master.py
+++ b/src/wuttaweb/views/master.py
@@ -29,6 +29,7 @@ import logging
import os
import threading
import warnings
+from uuid import UUID
import sqlalchemy as sa
from sqlalchemy import orm
@@ -41,7 +42,7 @@ from wuttaweb.views.base import View
from wuttaweb.util import get_form_data, render_csrf_token
from wuttaweb.db import Session
from wuttaweb.progress import SessionProgress
-from wuttaweb.diffs import VersionDiff
+from wuttaweb.diffs import MergeDiff, VersionDiff
log = logging.getLogger(__name__)
@@ -356,6 +357,35 @@ class MasterView(View): # pylint: disable=too-many-public-methods
This is optional; see also :meth:`get_version_grid_columns()`.
+ .. attribute:: mergeable
+
+ Boolean indicating whether the view model supports "merging two
+ records" - i.e. it should have a :meth:`merge()` view. Default
+ value is ``False``.
+
+ .. attribute:: merge_additive_fields
+
+ Optional list of fields for which values are "additive" in
+ nature when merging two records. Only relevant if
+ :attr:`mergeable` is true.
+
+ See also :meth:`merge_get_additive_fields()`.
+
+ .. attribute:: merge_coalesce_fields
+
+ Optional list of fields for which values should be "coalesced"
+ when merging two records. Only relevant if :attr:`mergeable`
+ is true.
+
+ See also :meth:`merge_get_coalesce_fields()`.
+
+ .. attribute:: merge_simple_fields
+
+ Optional list of "simple" fields when merging two records.
+ Only relevant if :attr:`mergeable` is true.
+
+ See also :meth:`merge_get_simple_fields()`.
+
**ROW FEATURES**
.. attribute:: has_rows
@@ -471,6 +501,12 @@ class MasterView(View): # pylint: disable=too-many-public-methods
execute_progress_template = None
configurable = False
+ # merging
+ mergeable = False
+ merge_additive_fields = None
+ merge_coalesce_fields = None
+ merge_simple_fields = None
+
# row features
has_rows = False
row_model_class = None
@@ -2391,6 +2427,7 @@ class MasterView(View): # pylint: disable=too-many-public-methods
"index_title": self.get_index_title(),
"index_url": self.get_index_url(),
"model_title": self.get_model_title(),
+ "model_title_plural": self.get_model_title_plural(),
"config_title": self.get_config_title(),
}
@@ -2573,7 +2610,9 @@ class MasterView(View): # pylint: disable=too-many-public-methods
labels.update(cls.labels)
return labels
- def make_model_grid(self, session=None, **kwargs):
+ def make_model_grid(
+ self, session=None, **kwargs
+ ): # pylint: disable=too-many-branches
"""
Create and return a :class:`~wuttaweb.grids.base.Grid`
instance for use with the :meth:`index()` view.
@@ -2585,6 +2624,8 @@ class MasterView(View): # pylint: disable=too-many-public-methods
* :meth:`get_grid_data()`
* :meth:`configure_grid()`
"""
+ route_prefix = self.get_route_prefix()
+
if "key" not in kwargs:
kwargs["key"] = self.get_grid_key()
@@ -2632,15 +2673,41 @@ class MasterView(View): # pylint: disable=too-many-public-methods
kwargs["actions"] = actions
+ mergeable = self.mergeable and self.has_perm("merge")
+
if "tools" not in kwargs:
tools = []
+ # delete-bulk
if self.deletable_bulk and self.has_perm("delete_bulk"):
tools.append(("delete-results", self.delete_bulk_make_button()))
+ # merge
+ if mergeable:
+ hidden = tags.hidden("uuids", **{":value": "checkedUUIDs"})
+ button = self.make_button(
+ '{{ mergeSubmitting ? "Working, please wait..." : "Merge 2 records" }}',
+ primary=True,
+ native_type="submit",
+ icon_left="object-ungroup",
+ **{":disabled": "mergeSubmitting || checkedRows.length != 2"},
+ )
+ csrf = render_csrf_token(self.request)
+ html = (
+ tags.form(
+ self.request.route_url(f"{route_prefix}.merge"),
+ **{"@submit": "mergeSubmitting = true"},
+ )
+ + csrf
+ + hidden
+ + button
+ + tags.end_form()
+ )
+ tools.append(("merge", html))
+
kwargs["tools"] = tools
- kwargs.setdefault("checkable", self.checkable)
+ kwargs.setdefault("checkable", self.checkable or mergeable)
if hasattr(self, "grid_row_class"):
kwargs.setdefault("row_class", self.grid_row_class)
kwargs.setdefault("filterable", self.filterable)
@@ -3170,6 +3237,281 @@ class MasterView(View): # pylint: disable=too-many-public-methods
if session:
session.close()
+ ##############################
+ # merge methods
+ ##############################
+
+ def merge(self):
+ """
+ View for merging two records.
+
+ By default, this view is included only if :attr:`mergeable` is
+ true. It usually maps to a URL like ``/widgets/merge``.
+
+ A POST request must be used for this view; otherwise it will
+ redirect to the :meth:`index()` view. The POST data must
+ specify a ``uuids`` param string in
+ ``"removing_uuid,keeping_uuid"`` format.
+
+ The user is first shown a "diff" with the
+ removing/keeping/final data records, as simple preview. They
+ can swap removing vs. keeping if needed, and when satisfied
+ they can "execute" the merge.
+
+ See also related methods, used by this one:
+
+ * :meth:`merge_validate_and_execute()`
+ * :meth:`merge_get_data()`
+ * :meth:`merge_get_final_data()`
+ """
+ if self.request.method != "POST":
+ return self.redirect(self.get_index_url())
+
+ session = self.Session()
+ model_class = self.get_model_class()
+
+ # load records to be kept/removed
+ removing = keeping = None
+ uuids = self.request.POST.get("uuids", "").split(",")
+ if len(uuids) == 2:
+ uuid1, uuid2 = uuids
+ try:
+ uuid1 = UUID(uuid1)
+ uuid2 = UUID(uuid2)
+ except ValueError:
+ pass
+ else:
+ removing = session.get(model_class, uuid1)
+ keeping = session.get(model_class, uuid2)
+
+ # redirect to listing if record(s) not found
+ if not (removing and keeping):
+ raise self.redirect(self.get_index_url())
+
+ # maybe execute merge
+ if self.request.POST.get("execute-merge") == "true":
+ if self.merge_validate_and_execute(removing, keeping):
+ return self.redirect(self.get_action_url("view", keeping))
+
+ removing_data = self.merge_get_data(removing)
+ keeping_data = self.merge_get_data(keeping)
+ diff = MergeDiff(
+ self.config,
+ removing_data,
+ keeping_data,
+ self.merge_get_final_data(removing_data, keeping_data),
+ )
+
+ context = {"removing": removing, "keeping": keeping, "diff": diff}
+ return self.render_to_response("merge", context)
+
+ def merge_get_simple_fields(self):
+ """
+ Return the list of "simple" fields for the merge.
+
+ These "simple" fields will not have any special handling for
+ the merge. In other words the "removing" record values will
+ be ignored and the "keeping" record values will remain in
+ place, without modification.
+
+ If the view class defines :attr:`merge_simple_fields`, that
+ list is returned as-is. Otherwise the list of columns from
+ :attr:`model_class` is returned.
+
+ :returns: List of simple field names.
+ """
+ if self.merge_simple_fields:
+ return list(self.merge_simple_fields)
+
+ mapper = sa.inspect(self.get_model_class())
+ fields = mapper.columns.keys()
+ return fields
+
+ def merge_get_additive_fields(self):
+ """
+ Return the list of "additive" fields for the merge.
+
+ Values from the removing/keeping record will be conceptually
+ added together, for each of these fields.
+
+ If the view class defines :attr:`merge_additive_fields`, that
+ list is returned as-is. Otherwise an empty list is returned.
+
+ :returns: List of additive field names.
+ """
+ if self.merge_additive_fields:
+ return list(self.merge_additive_fields)
+ return []
+
+ def merge_get_coalesce_fields(self):
+ """
+ Return the list of "coalesce" fields for the merge.
+
+ Values from the removing/keeping record will be conceptually
+ "coalesced" for each of these fields.
+
+ If the view class defines :attr:`merge_coalesce_fields`, that
+ list is returned as-is. Otherwise an empty list is returned.
+
+ :returns: List of coalesce field names.
+ """
+ if self.merge_coalesce_fields:
+ return list(self.merge_coalesce_fields)
+ return []
+
+ def merge_get_all_fields(self):
+ """
+ Return the list of *all* fields for the merge.
+
+ This will call each of the following methods to collect all
+ field names, then it returns the full *sorted* list.
+
+ * :meth:`merge_get_additive_fields()`
+ * :meth:`merge_get_coalesce_fields()`
+ * :meth:`merge_get_simple_fields()`
+
+ :returns: Sorted list of all field names.
+ """
+ fields = set()
+ fields.update(self.merge_get_simple_fields())
+ fields.update(self.merge_get_additive_fields())
+ fields.update(self.merge_get_coalesce_fields())
+ return sorted(fields)
+
+ def merge_get_data(self, obj):
+ """
+ Return a data dict for the given object, which will be either
+ the "removing" or "keeping" record for the merge.
+
+ By default this calls :meth:`merge_get_all_fields()` and then
+ for each field, calls ``getattr()`` on the object. Subclass
+ can override as needed for custom logic.
+
+ :param obj: Reference to model/record instance.
+
+ :returns: Data dict with all field values.
+ """
+ return {f: getattr(obj, f, None) for f in self.merge_get_all_fields()}
+
+ def merge_get_final_data(self, removing, keeping):
+ """
+ Return the "final" data dict for the merge.
+
+ The result will be identical to the "keeping" record, for all
+ "simple" fields. However the "additive" and "coalesce" fields
+ are handled specially per their nature, in which case those
+ final values may or may not match the "keeping" record.
+
+ :param removing: Data dict for the "removing" record.
+
+ :param keeping: Data dict for the "keeping" record.
+
+ :returns: Data dict with all "final" field values.
+
+ See also:
+
+ * :meth:`merge()`
+ * :meth:`merge_get_additive_fields()`
+ * :meth:`merge_get_coalesce_fields()`
+ """
+ final = dict(keeping)
+
+ for field in self.merge_get_additive_fields():
+ if isinstance(keeping[field], list):
+ final[field] = sorted(set(removing[field] + keeping[field]))
+ else:
+ final[field] = removing[field] + keeping[field]
+
+ for field in self.merge_get_coalesce_fields():
+ if removing[field] is not None and keeping[field] is None:
+ final[field] = removing[field]
+ elif removing[field] and not keeping[field]:
+ final[field] = removing[field]
+
+ return final
+
+ def merge_validate_and_execute(self, removing, keeping):
+ """
+ Validate and execute a merge for the two given records. It is
+ called from :meth:`merge()`.
+
+ This calls :meth:`merge_why_not()` and if that does not yield
+ a reason to prevent the merge, then calls
+ :meth:`merge_execute()`.
+
+ If there was a reason not to merge, or if an error occurs
+ during merge execution, a flash warning/error message is set
+ to notify the user what happened.
+
+ :param removing: Reference to the "removing" model instance/record.
+
+ :param keeping: Reference to the "keeping" model instance/record.
+
+ :returns: Boolean indicating whether merge execution completed
+ successfully.
+ """
+ session = self.Session()
+
+ # validate the merge
+ if reason := self.merge_why_not(removing, keeping):
+ warning = HTML.tag(
+ "p", class_="block", c="Merge cannot proceed:"
+ ) + HTML.tag("p", class_="block", c=reason)
+ self.request.session.flash(warning, "warning")
+ return False
+
+ # execute the merge
+ removed_title = str(removing)
+ try:
+ self.merge_execute(removing, keeping)
+ session.flush()
+ except Exception as err: # pylint: disable=broad-exception-caught
+ session.rollback()
+ log.warning("merge failed", exc_info=True)
+ warning = HTML.tag("p", class_="block", c="Merge failed:") + HTML.tag(
+ "p", class_="block", c=self.app.render_error(err)
+ )
+ self.request.session.flash(warning, "error")
+ return False
+
+ self.request.session.flash(f"{removed_title} has been merged into {keeping}")
+ return True
+
+ def merge_why_not(self, removing, keeping): # pylint: disable=unused-argument
+ """
+ Can return a "reason" why the two given records should not be merged.
+
+ This returns ``None`` by default, indicating the merge is
+ allowed. Subclass can override as needed for custom logic.
+
+ See also :meth:`merge_validate_and_execute()`.
+
+ :param removing: Reference to the "removing" model instance/record.
+
+ :param keeping: Reference to the "keeping" model instance/record.
+
+ :returns: Reason not to merge (as string), or ``None``.
+ """
+ return None
+
+ def merge_execute(self, removing, keeping): # pylint: disable=unused-argument
+ """
+ Execute the actual merge for the two given objects.
+
+ Default logic simply deletes the "removing" record. Subclass
+ can override as needed for custom logic.
+
+ See also :meth:`merge_validate_and_execute()`.
+
+ :param removing: Reference to the "removing" model instance/record.
+
+ :param keeping: Reference to the "keeping" model instance/record.
+ """
+ session = self.Session()
+
+ # nb. default "merge" does not update kept object!
+ session.delete(removing)
+
##############################
# row methods
##############################
@@ -3934,7 +4276,7 @@ class MasterView(View): # pylint: disable=too-many-public-methods
cls._defaults(config)
@classmethod
- def _defaults(cls, config): # pylint: disable=too-many-statements
+ def _defaults(cls, config): # pylint: disable=too-many-statements,too-many-branches
wutta_config = config.registry.settings.get("wutta_config")
app = wutta_config.get_app()
@@ -4031,6 +4373,21 @@ class MasterView(View): # pylint: disable=too-many-public-methods
f"Delete {model_title_plural} in bulk",
)
+ # merge
+ if cls.mergeable:
+ config.add_wutta_permission(
+ permission_prefix,
+ f"{permission_prefix}.merge",
+ f"Merge 2 {model_title_plural}",
+ )
+ config.add_route(f"{route_prefix}.merge", f"{url_prefix}/merge")
+ config.add_view(
+ cls,
+ attr="merge",
+ route_name=f"{route_prefix}.merge",
+ permission=f"{permission_prefix}.merge",
+ )
+
# autocomplete
if cls.has_autocomplete:
config.add_route(
diff --git a/src/wuttaweb/views/people.py b/src/wuttaweb/views/people.py
index b5d49ba..27e1bab 100644
--- a/src/wuttaweb/views/people.py
+++ b/src/wuttaweb/views/people.py
@@ -2,7 +2,7 @@
################################################################################
#
# wuttaweb -- Web App for Wutta Framework
-# Copyright © 2024-2025 Lance Edgar
+# Copyright © 2024-2026 Lance Edgar
#
# This file is part of Wutta Framework.
#
@@ -71,6 +71,9 @@ class PersonView(MasterView): # pylint: disable=abstract-method
"users",
]
+ mergeable = True
+ merge_additive_fields = ["usernames"]
+
def configure_grid(self, grid): # pylint: disable=empty-docstring
""" """
g = grid
@@ -129,6 +132,32 @@ class PersonView(MasterView): # pylint: disable=abstract-method
return person
+ def merge_get_data(self, obj): # pylint: disable=empty-docstring
+ """ """
+ person = obj
+ data = super().merge_get_data(person)
+
+ data["usernames"] = sorted([u.username for u in person.users])
+
+ return data
+
+ def merge_execute(self, removing, keeping):
+ """
+ We override default merge logic to re-assign users if needed.
+
+ See also parent method:
+ :meth:`~wuttaweb.views.master.MasterView.merge_execute()`
+ """
+ session = self.Session()
+
+ # reassign users
+ for user in list(removing.users):
+ user.person = keeping
+ session.flush()
+
+ # continue default merge
+ super().merge_execute(removing, keeping)
+
def autocomplete_query(self, term): # pylint: disable=empty-docstring
""" """
model = self.app.model
diff --git a/src/wuttaweb/views/users.py b/src/wuttaweb/views/users.py
index 9333b37..684d9c7 100644
--- a/src/wuttaweb/views/users.py
+++ b/src/wuttaweb/views/users.py
@@ -2,7 +2,7 @@
################################################################################
#
# wuttaweb -- Web App for Wutta Framework
-# Copyright © 2024-2025 Lance Edgar
+# Copyright © 2024-2026 Lance Edgar
#
# This file is part of Wutta Framework.
#
@@ -24,6 +24,8 @@
Views for users
"""
+import sqlalchemy as sa
+
from wuttjamaican.db.model import User
from wuttaweb.views import MasterView
from wuttaweb.forms import widgets
@@ -72,6 +74,9 @@ class UserView(MasterView): # pylint: disable=abstract-method
"api_tokens",
]
+ mergeable = True
+ merge_additive_fields = ["roles"]
+
def get_query(self, session=None): # pylint: disable=empty-docstring
""" """
query = super().get_query(session=session)
@@ -360,6 +365,113 @@ class UserView(MasterView): # pylint: disable=abstract-method
auth.delete_api_token(token)
return {}
+ def merge_get_simple_fields(self): # pylint: disable=empty-docstring
+ """ """
+ fields = super().merge_get_simple_fields()
+
+ if "password" in fields:
+ fields.remove("password")
+
+ return fields
+
+ def merge_get_additive_fields(self): # pylint: disable=empty-docstring
+ """ """
+ fields = super().merge_get_additive_fields()
+
+ if self.app.continuum_is_enabled():
+ if "transaction_count" not in fields:
+ fields.append("transaction_count")
+
+ return fields
+
+ def merge_get_data(self, obj): # pylint: disable=empty-docstring
+ """ """
+ data = super().merge_get_data(obj)
+ model_class = self.get_model_class()
+ session = self.Session()
+ user = obj
+
+ data["roles"] = sorted([role.name for role in user.roles])
+
+ if self.app.continuum_is_enabled():
+ import sqlalchemy_continuum as continuum # pylint: disable=import-outside-toplevel
+
+ txncls = continuum.transaction_class(model_class)
+ data["transaction_count"] = (
+ session.query(txncls).filter(txncls.user == user).count()
+ )
+
+ return data
+
+ def merge_why_not(self, removing, keeping):
+ """
+ This checks to ensure the *current* user is not the same as
+ the "removing" user.
+
+ See also parent method:
+ :meth:`~wuttaweb.views.master.MasterView.merge_why_not()`
+ """
+ if removing is self.request.user:
+ return "Cannot remove user who is currently logged in!"
+ return None
+
+ def merge_execute(self, removing, keeping):
+ """
+ The logic to merge 2 users is extended as follows:
+
+ The "keeping" user will be assigned to all roles to which the
+ "removing" user belonged.
+
+ Any upgrades created or executed by the "removing" user will
+ be updated to reference the "keeping" user instead.
+
+ Any versioning (SQLAlchemy-Continuum) transactions created by
+ the "removing" user will be updated to reference the "keeping"
+ user instead.
+
+ See also parent method:
+ :meth:`~wuttaweb.views.master.MasterView.merge_execute()`
+ """
+ model = self.app.model
+ session = self.Session()
+ model_class = self.get_model_class()
+
+ # transfer role membership
+ for role in list(removing.roles):
+ if role not in keeping.roles:
+ keeping.roles.append(role)
+
+ # reassign upgrade "created by"
+ stmt = (
+ sa.update(model.Upgrade)
+ .where(model.Upgrade.created_by_uuid == removing.uuid)
+ .values(created_by_uuid=keeping.uuid)
+ )
+ session.execute(stmt, execution_options={"synchronize_session": False})
+
+ # reassign upgrade "executed by"
+ stmt = (
+ sa.update(model.Upgrade)
+ .where(model.Upgrade.executed_by_uuid == removing.uuid)
+ .values(executed_by_uuid=keeping.uuid)
+ )
+ session.execute(stmt, execution_options={"synchronize_session": False})
+
+ # reassign continuum transactions
+ if self.app.continuum_is_enabled():
+ import sqlalchemy_continuum as continuum # pylint: disable=import-outside-toplevel
+
+ txncls = continuum.transaction_class(model_class)
+ stmt = (
+ sa.update(txncls)
+ .where(txncls.user_id == removing.uuid)
+ .values(user_id=keeping.uuid)
+ )
+ session.execute(stmt, execution_options={"synchronize_session": False})
+
+ # continue default merge
+ super().merge_execute(removing, keeping)
+
@classmethod
def defaults(cls, config): # pylint: disable=empty-docstring
""" """
diff --git a/tests/test_diffs.py b/tests/test_diffs.py
index 6f66893..90d2503 100644
--- a/tests/test_diffs.py
+++ b/tests/test_diffs.py
@@ -33,6 +33,94 @@ class TestWebDiff(WebTestCase):
self.assertIn("", html)
+class TestMergeDiff(WebTestCase):
+
+ def make_diff(self, *args, **kwargs):
+ return mod.MergeDiff(self.config, *args, **kwargs)
+
+ def test_get_final_value_attrs(self):
+
+ # no diff
+ removing = {"foo": "bar"}
+ keeping = {"foo": "bar"}
+ final = {"foo": "bar"}
+ is_diff = False
+ diff = self.make_diff(removing, keeping, final)
+ attrs = diff.get_final_value_attrs(is_diff)
+ self.assertEqual(attrs, {})
+
+ # values differ
+ keeping = {"foo": "baz"}
+ final = {"foo": "baz"}
+ is_diff = True
+ diff = self.make_diff(removing, keeping, final)
+ attrs = diff.get_final_value_attrs(is_diff)
+ self.assertEqual(attrs, {"class_": "has-background-warning"})
+
+ def test_render_field_row(self):
+
+ # no diffs
+ removing = {"foo": "bar"}
+ keeping = {"foo": "bar"}
+ final = {"foo": "bar"}
+ diff = self.make_diff(removing, keeping, final)
+ html = diff.render_field_row("foo")
+ self.assertTrue(html.startswith(""))
+ self.assertNotIn('style="padding: 0.5rem;"', html)
+ self.assertIn("'bar'", html)
+ self.assertNotIn(f'style="background-color: {diff.old_color}"', html)
+ self.assertNotIn("'baz'", html)
+ self.assertNotIn(f'style="background-color: {diff.new_color}"', html)
+ self.assertNotIn("'bar,baz'", html)
+ self.assertNotIn('class="has-background-warning"', html)
+ self.assertTrue(html.endswith("
"))
+
+ # remove vs. keep diffs, but not keep vs. final
+ removing = {"foo": "bar"}
+ keeping = {"foo": "baz"}
+ final = {"foo": "baz"}
+ diff = self.make_diff(removing, keeping, final)
+ html = diff.render_field_row("foo")
+ self.assertTrue(html.startswith(""))
+ self.assertNotIn('style="padding: 0.5rem;"', html)
+ self.assertIn("'bar'", html)
+ self.assertIn(f'style="background-color: {diff.old_color}"', html)
+ self.assertIn("'baz'", html)
+ self.assertIn(f'style="background-color: {diff.new_color}"', html)
+ self.assertNotIn("'bar,baz'", html)
+ self.assertNotIn('class="has-background-warning"', html)
+ self.assertTrue(html.endswith("
"))
+
+ # remove vs. keep diffs, *and* keep vs. final diffs
+ removing = {"foo": "bar"}
+ keeping = {"foo": "baz"}
+ final = {"foo": "bar,baz"}
+ diff = self.make_diff(removing, keeping, final)
+ html = diff.render_field_row("foo")
+ self.assertTrue(html.startswith(""))
+ self.assertNotIn('style="padding: 0.5rem;"', html)
+ self.assertIn("'bar'", html)
+ self.assertIn(f'style="background-color: {diff.old_color}"', html)
+ self.assertIn("'baz'", html)
+ self.assertIn(f'style="background-color: {diff.new_color}"', html)
+ self.assertIn("'bar,baz'", html)
+ self.assertIn('class="has-background-warning"', html)
+ self.assertTrue(html.endswith("
"))
+
+ # again, with cell padding
+ diff.cell_padding = "0.5rem"
+ html = diff.render_field_row("foo")
+ self.assertTrue(html.startswith(""))
+ self.assertIn('style="padding: 0.5rem"', html)
+ self.assertIn("'bar'", html)
+ self.assertIn(f"background-color: {diff.old_color}", html)
+ self.assertIn("'baz'", html)
+ self.assertIn(f"background-color: {diff.new_color}", html)
+ self.assertIn("'bar,baz'", html)
+ self.assertIn('class="has-background-warning"', html)
+ self.assertTrue(html.endswith("
"))
+
+
class TestVersionDiff(VersionWebTestCase):
def make_diff(self, *args, **kwargs):
diff --git a/tests/views/test_master.py b/tests/views/test_master.py
index bb31e8c..c1b493d 100644
--- a/tests/views/test_master.py
+++ b/tests/views/test_master.py
@@ -10,6 +10,7 @@ from sqlalchemy import orm
from pyramid import testing
from pyramid.response import Response
from pyramid.httpexceptions import HTTPNotFound, HTTPFound
+from webhelpers2.html import HTML
from wuttjamaican.conf import WuttaConfig
from wuttaweb.views import master as mod
@@ -36,6 +37,7 @@ class TestMasterView(WebTestCase):
downloadable=True,
executable=True,
configurable=True,
+ mergeable=True,
has_rows=True,
rows_creatable=True,
):
@@ -623,73 +625,122 @@ class TestMasterView(WebTestCase):
def test_make_model_grid(self):
self.pyramid_config.add_route("settings.delete_bulk", "/settings/delete-bulk")
+ self.pyramid_config.add_route("people.merge", "/people/merge")
model = self.app.model
- # no model class
- with patch.multiple(
- mod.MasterView, create=True, model_name="Widget", model_key="uuid"
- ):
- view = mod.MasterView(self.request)
- grid = view.make_model_grid()
- self.assertIsNone(grid.model_class)
+ with patch.object(mod.MasterView, "Session", return_value=self.session):
- # explicit model class
- with patch.multiple(mod.MasterView, create=True, model_class=model.Setting):
- grid = view.make_model_grid(session=self.session)
- self.assertIs(grid.model_class, model.Setting)
+ # no model class
+ with patch.multiple(
+ mod.MasterView, create=True, model_name="Widget", model_key="uuid"
+ ):
+ view = self.make_view()
+ grid = view.make_model_grid()
+ self.assertIsNone(grid.model_class)
- # no row class by default
- with patch.multiple(mod.MasterView, create=True, model_class=model.Setting):
- grid = view.make_model_grid(session=self.session)
- self.assertIsNone(grid.row_class)
-
- # can specify row class
- get_row_class = MagicMock()
- with patch.multiple(
- mod.MasterView,
- create=True,
- model_class=model.Setting,
- grid_row_class=get_row_class,
- ):
- grid = view.make_model_grid(session=self.session)
- self.assertIs(grid.row_class, get_row_class)
-
- # no actions by default
- with patch.multiple(mod.MasterView, create=True, model_class=model.Setting):
- grid = view.make_model_grid(session=self.session)
- self.assertEqual(grid.actions, [])
-
- # now let's test some more actions logic
- with patch.multiple(
- mod.MasterView,
- create=True,
- model_class=model.Setting,
- viewable=True,
- editable=True,
- deletable=True,
- ):
-
- # should have 3 actions now, but for lack of perms
- grid = view.make_model_grid(session=self.session)
- self.assertEqual(len(grid.actions), 0)
-
- # but root user has perms, so gets 3 actions
- with patch.object(self.request, "is_root", new=True):
+ # explicit model class
+ with patch.multiple(mod.MasterView, create=True, model_class=model.Setting):
+ view = self.make_view()
grid = view.make_model_grid(session=self.session)
- self.assertEqual(len(grid.actions), 3)
+ self.assertIs(grid.model_class, model.Setting)
- # no tools by default
- with patch.multiple(mod.MasterView, create=True, model_class=model.Setting):
- grid = view.make_model_grid(session=self.session)
- self.assertEqual(grid.tools, {})
-
- # delete-results tool added if master/perms allow
- with patch.multiple(
- mod.MasterView, create=True, model_class=model.Setting, deletable_bulk=True
- ):
- with patch.object(self.request, "is_root", new=True):
+ # no row class by default
+ with patch.multiple(mod.MasterView, create=True, model_class=model.Setting):
+ view = self.make_view()
grid = view.make_model_grid(session=self.session)
- self.assertIn("delete-results", grid.tools)
+ self.assertIsNone(grid.row_class)
+
+ # can specify row class
+ get_row_class = MagicMock()
+ with patch.multiple(
+ mod.MasterView,
+ create=True,
+ model_class=model.Setting,
+ grid_row_class=get_row_class,
+ ):
+ view = self.make_view()
+ grid = view.make_model_grid(session=self.session)
+ self.assertIs(grid.row_class, get_row_class)
+
+ # no actions by default
+ with patch.multiple(mod.MasterView, create=True, model_class=model.Setting):
+ view = self.make_view()
+ grid = view.make_model_grid(session=self.session)
+ self.assertEqual(grid.actions, [])
+
+ # now let's test some more actions logic
+ with patch.multiple(
+ mod.MasterView,
+ create=True,
+ model_class=model.Setting,
+ viewable=True,
+ editable=True,
+ deletable=True,
+ ):
+ view = self.make_view()
+
+ # should have 3 actions now, but for lack of perms
+ grid = view.make_model_grid(session=self.session)
+ self.assertEqual(len(grid.actions), 0)
+
+ # but root user has perms, so gets 3 actions
+ with patch.object(self.request, "is_root", new=True):
+ grid = view.make_model_grid(session=self.session)
+ self.assertEqual(len(grid.actions), 3)
+
+ # no tools by default
+ with patch.multiple(mod.MasterView, create=True, model_class=model.Setting):
+ view = self.make_view()
+ grid = view.make_model_grid(session=self.session)
+ self.assertEqual(grid.tools, {})
+
+ # delete-results tool added if master/perms allow
+ with patch.multiple(
+ mod.MasterView,
+ create=True,
+ model_class=model.Setting,
+ deletable_bulk=True,
+ ):
+ view = self.make_view()
+ with patch.object(self.request, "is_root", new=True):
+ grid = view.make_model_grid(session=self.session)
+ self.assertIn("delete-results", grid.tools)
+
+ # merge tool added if master/perms allow
+ with patch.multiple(
+ mod.MasterView,
+ model_class=model.Person,
+ route_prefix="people",
+ mergeable=True,
+ create=True,
+ ):
+ view = self.make_view()
+ with patch.object(self.request, "is_root", new=True):
+ grid = view.make_model_grid()
+ self.assertIn("merge", grid.tools)
+
+ # test checkable flag
+ with patch.multiple(
+ mod.MasterView,
+ model_class=model.Person,
+ route_prefix="people",
+ create=True,
+ ):
+ view = self.make_view()
+
+ # not checkable by default
+ grid = view.make_model_grid()
+ self.assertFalse(grid.checkable)
+
+ # but can override
+ grid = view.make_model_grid(checkable=True)
+ self.assertTrue(grid.checkable)
+
+ # checkable is true if merge allowed
+ with patch.object(mod.MasterView, "mergeable", new=True):
+ with patch.object(self.request, "is_root", new=True):
+ grid = view.make_model_grid()
+ self.assertTrue(grid.checkable)
def test_get_grid_data(self):
model = self.app.model
@@ -1604,6 +1655,426 @@ class TestMasterView(WebTestCase):
# nb. nothing was deleted
self.assertEqual(self.session.query(model.Setting).count(), 6)
+ def test_merge_get_simple_fields(self):
+ model = self.app.model
+ with patch.object(mod.MasterView, "model_class", new=model.Person):
+ view = self.make_view()
+
+ # fields include table columns by default
+ fields = view.merge_get_simple_fields()
+ self.assertEqual(
+ fields,
+ ["uuid", "full_name", "first_name", "middle_name", "last_name"],
+ )
+
+ # but class can specify fields
+ view.merge_simple_fields = ["first_name", "last_name"]
+ fields = view.merge_get_simple_fields()
+ self.assertEqual(
+ fields,
+ ["first_name", "last_name"],
+ )
+
+ def test_merge_get_additive_fields(self):
+ model = self.app.model
+ with patch.object(mod.MasterView, "model_class", new=model.Person):
+ view = self.make_view()
+
+ # no additive fields by default
+ fields = view.merge_get_additive_fields()
+ self.assertEqual(fields, [])
+
+ # but class can specify fields
+ view.merge_additive_fields = ["usernames"]
+ fields = view.merge_get_additive_fields()
+ self.assertEqual(fields, ["usernames"])
+
+ def test_merge_get_coalesce_fields(self):
+ model = self.app.model
+ with patch.object(mod.MasterView, "model_class", new=model.Person):
+ view = self.make_view()
+
+ # no coalesce fields by default
+ fields = view.merge_get_coalesce_fields()
+ self.assertEqual(fields, [])
+
+ # but class can specify fields
+ view.merge_coalesce_fields = ["active"]
+ fields = view.merge_get_coalesce_fields()
+ self.assertEqual(fields, ["active"])
+
+ def test_merge_get_all_fields(self):
+ model = self.app.model
+ with patch.object(mod.MasterView, "model_class", new=model.Person):
+ view = self.make_view()
+
+ # nb. "all" fields will be a sorted list
+
+ # only column (simple) fields by default
+ fields = view.merge_get_all_fields()
+ self.assertEqual(
+ fields,
+ ["first_name", "full_name", "last_name", "middle_name", "uuid"],
+ )
+
+ # but class can specify fields
+ view.merge_simple_fields = ["first_name", "last_name"]
+ view.merge_additive_fields = ["usernames"]
+ view.merge_coalesce_fields = ["active"]
+ fields = view.merge_get_all_fields()
+ self.assertEqual(
+ fields,
+ ["active", "first_name", "last_name", "usernames"],
+ )
+
+ def test_merge_get_data(self):
+ model = self.app.model
+ person = model.Person(first_name="Fred", last_name="Flintstone")
+ with patch.object(mod.MasterView, "model_class", new=model.Person):
+ view = self.make_view()
+
+ # data will include "all" fields
+ view.merge_simple_fields = ["first_name", "last_name", "usernames"]
+ data = view.merge_get_data(person)
+ self.assertEqual(
+ data,
+ {
+ "first_name": "Fred",
+ "last_name": "Flintstone",
+ # nb. person has no such attr, so null value
+ "usernames": None,
+ },
+ )
+
+ def test_merge_get_final_data(self):
+ model = self.app.model
+
+ removing = {
+ "first_name": "Freddie",
+ "last_name": "Flintstone",
+ "user_count": 1,
+ "usernames": ["freddie"],
+ "some_value": 42,
+ "active": True,
+ }
+
+ keeping = {
+ "first_name": "Fred",
+ "last_name": "Flintstone",
+ "user_count": 1,
+ "usernames": ["fred"],
+ "some_value": None,
+ "active": False,
+ }
+
+ with patch.object(mod.MasterView, "model_class", new=model.Person):
+ view = self.make_view()
+ view.merge_simple_fields = ["first_name", "last_name"]
+ view.merge_additive_fields = ["user_count", "usernames"]
+ view.merge_coalesce_fields = ["some_value", "active"]
+
+ final = view.merge_get_final_data(removing, keeping)
+ self.assertEqual(
+ final,
+ {
+ "first_name": "Fred",
+ "last_name": "Flintstone",
+ "user_count": 2,
+ "usernames": ["fred", "freddie"],
+ "some_value": 42,
+ "active": True,
+ },
+ )
+
+ def test_merge_execute(self):
+ model = self.app.model
+
+ person1 = model.Person(
+ first_name="Freddie", last_name="Flintstone", full_name="Freddie Flintstone"
+ )
+ self.session.add(person1)
+ person2 = model.Person(
+ first_name="Fred", last_name="Flintstone", full_name="Fred Flintstone"
+ )
+ self.session.add(person2)
+ self.session.commit()
+ self.assertEqual(self.session.query(model.Person).count(), 2)
+
+ with patch.object(mod.MasterView, "Session", return_value=self.session):
+ view = self.make_view()
+
+ # default merge logic just deletes 'removing' person1
+ view.merge_execute(person1, person2)
+ self.assertEqual(self.session.query(model.Person).count(), 1)
+ person = self.session.query(model.Person).one()
+ self.assertIs(person, person2)
+ self.assertEqual(person.first_name, "Fred")
+ self.assertEqual(person.full_name, "Fred Flintstone")
+
+ def test_merge_validate_and_execute(self):
+ model = self.app.model
+
+ person1 = model.Person(
+ first_name="Freddie", last_name="Flintstone", full_name="Freddie Flintstone"
+ )
+ self.session.add(person1)
+ person2 = model.Person(
+ first_name="Fred", last_name="Flintstone", full_name="Fred Flintstone"
+ )
+ self.session.add(person2)
+ self.session.commit()
+ self.assertEqual(self.session.query(model.Person).count(), 2)
+ self.assertIn(person1, self.session)
+
+ with patch.object(mod.MasterView, "Session", return_value=self.session):
+ view = self.make_view()
+
+ # default merge logic just deletes 'removing' person1
+ result = view.merge_validate_and_execute(person1, person2)
+ self.assertTrue(result)
+ self.assertEqual(self.session.query(model.Person).count(), 1)
+ person = self.session.query(model.Person).one()
+ self.assertIs(person, person2)
+ self.assertEqual(person.first_name, "Fred")
+ self.assertEqual(person.full_name, "Fred Flintstone")
+ self.assertFalse(self.request.session.peek_flash("warning"))
+ self.assertFalse(self.request.session.peek_flash("error"))
+ self.assertEqual(
+ self.request.session.pop_flash(),
+ ["Freddie Flintstone has been merged into Fred Flintstone"],
+ )
+
+ # restore Freddie
+ self.assertNotIn(person1, self.session)
+ person1 = self.session.merge(person1)
+ self.session.commit()
+ self.assertEqual(self.session.query(model.Person).count(), 2)
+
+ # merge does not validate
+ with patch.object(view, "merge_why_not", return_value="because i said so"):
+ result = view.merge_validate_and_execute(person1, person2)
+ self.assertFalse(result)
+ self.assertEqual(self.session.query(model.Person).count(), 2)
+ self.assertFalse(self.request.session.peek_flash())
+ self.assertFalse(self.request.session.peek_flash("error"))
+ self.assertEqual(
+ self.request.session.pop_flash("warning"),
+ [
+ HTML.literal(
+ 'Merge cannot proceed:
'
+ 'because i said so
'
+ ),
+ ],
+ )
+
+ # error executing merge
+ with patch.object(view, "merge_execute", side_effect=RuntimeError):
+ result = view.merge_validate_and_execute(person1, person2)
+ self.assertFalse(result)
+ self.assertEqual(self.session.query(model.Person).count(), 2)
+ self.assertFalse(self.request.session.peek_flash())
+ self.assertFalse(self.request.session.peek_flash("warning"))
+ self.assertEqual(
+ self.request.session.pop_flash("error"),
+ [
+ HTML.literal(
+ 'Merge failed:
'
+ 'RuntimeError
'
+ ),
+ ],
+ )
+
+ def test_merge(self):
+ self.pyramid_config.add_route("home", "/")
+ self.pyramid_config.add_route("login", "/auth/login")
+ self.pyramid_config.add_route("people", "/people/")
+ self.pyramid_config.add_route("people.merge", "/people/merge")
+ self.pyramid_config.add_route("people.view", "/people/{uuid}")
+ model = self.app.model
+
+ class MergeRoute:
+ name = "people.merge"
+
+ person1 = model.Person(
+ first_name="Freddie", last_name="Flintstone", full_name="Freddie Flintstone"
+ )
+ self.session.add(person1)
+ person2 = model.Person(
+ first_name="Fred", last_name="Flintstone", full_name="Fred Flintstone"
+ )
+ self.session.add(person2)
+ self.session.commit()
+ self.assertEqual(self.session.query(model.Person).count(), 2)
+ self.assertIn(person1, self.session)
+
+ with patch.multiple(
+ mod.MasterView,
+ Session=MagicMock(return_value=self.session),
+ model_class=model.Person,
+ route_prefix="people",
+ create=True,
+ ):
+ view = self.make_view()
+
+ # GET request will redirect to index
+ result = view.merge()
+ self.assertIsInstance(result, HTTPFound)
+ self.assertEqual(result.location, "http://example.com/people/")
+ self.assertEqual(self.session.query(model.Person).count(), 2)
+
+ # assume POST from now on
+ with patch.multiple(
+ self.request, matched_route=MergeRoute, method="POST", create=True
+ ):
+
+ # POST without 'execute-merge' flag shows user the diff
+ with patch.object(
+ self.request,
+ "POST",
+ new={"uuids": f"{person1.uuid},{person2.uuid}"},
+ ):
+ response = view.merge()
+ self.assertIsInstance(response, Response)
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(self.session.query(model.Person).count(), 2)
+ self.assertFalse(self.request.session.peek_flash())
+ self.assertFalse(self.request.session.peek_flash("warning"))
+ self.assertFalse(self.request.session.peek_flash("error"))
+
+ # default merge logic deletes person1, then redirects
+ with patch.object(
+ self.request,
+ "POST",
+ new={
+ "uuids": f"{person1.uuid},{person2.uuid}",
+ "execute-merge": "true",
+ },
+ ):
+ result = view.merge()
+ self.assertIsInstance(result, HTTPFound)
+ self.assertEqual(
+ result.location, f"http://example.com/people/{person2.uuid}"
+ )
+ self.assertEqual(self.session.query(model.Person).count(), 1)
+ self.assertNotIn(person1, self.session)
+ self.assertIn(person2, self.session)
+ person = self.session.query(model.Person).one()
+ self.assertIs(person, person2)
+ self.assertEqual(person.first_name, "Fred")
+ self.assertEqual(person.full_name, "Fred Flintstone")
+ self.assertFalse(self.request.session.peek_flash("warning"))
+ self.assertFalse(self.request.session.peek_flash("error"))
+ self.assertEqual(
+ self.request.session.pop_flash(),
+ ["Freddie Flintstone has been merged into Fred Flintstone"],
+ )
+
+ # restore Freddie
+ self.assertNotIn(person1, self.session)
+ person1 = self.session.merge(person1)
+ self.session.commit()
+ self.assertEqual(self.session.query(model.Person).count(), 2)
+
+ # simple redirect if invalid uuids specified
+ with patch.object(
+ self.request,
+ "POST",
+ new={
+ "uuids": "bogus1,bogus2",
+ "execute-merge": "true",
+ },
+ ):
+ with self.assertRaises(HTTPFound) as cm:
+ view.merge()
+ self.assertEqual(
+ cm.exception.location, "http://example.com/people/"
+ )
+ self.assertEqual(self.session.query(model.Person).count(), 2)
+ self.assertFalse(self.request.session.peek_flash())
+ self.assertFalse(self.request.session.peek_flash("warning"))
+ self.assertFalse(self.request.session.peek_flash("error"))
+
+ # simple redirect if unknown uuids specified
+ fake1 = self.app.make_true_uuid()
+ fake2 = self.app.make_true_uuid()
+ with patch.object(
+ self.request,
+ "POST",
+ new={
+ "uuids": f"{fake1},{fake2}",
+ "execute-merge": "true",
+ },
+ ):
+ with self.assertRaises(HTTPFound) as cm:
+ view.merge()
+ self.assertEqual(
+ cm.exception.location, "http://example.com/people/"
+ )
+ self.assertEqual(self.session.query(model.Person).count(), 2)
+ self.assertFalse(self.request.session.peek_flash())
+ self.assertFalse(self.request.session.peek_flash("warning"))
+ self.assertFalse(self.request.session.peek_flash("error"))
+
+ # warning redirect if merge does not validate
+ with patch.object(
+ self.request,
+ "POST",
+ new={
+ "uuids": f"{person1.uuid},{person2.uuid}",
+ "execute-merge": "true",
+ },
+ ):
+ with patch.object(
+ view, "merge_why_not", return_value="because i said so"
+ ):
+ response = view.merge()
+ self.assertIsInstance(response, Response)
+ self.assertEqual(self.session.query(model.Person).count(), 2)
+ self.assertFalse(self.request.session.peek_flash())
+ self.assertFalse(self.request.session.peek_flash("error"))
+ # TODO: since response is already rendered, the warning flash
+ # msg has already been popped off the stack..will have to
+ # avoid render_to_response() to properly test that..
+ self.assertFalse(self.request.session.peek_flash("warning"))
+ # self.assertEqual(
+ # self.request.session.pop_flash("warning"),
+ # [
+ # HTML.literal(
+ # 'Merge cannot proceed:
'
+ # 'because i said so
'
+ # ),
+ # ],
+ # )
+
+ # error redirect if merge execution fails
+ with patch.object(
+ self.request,
+ "POST",
+ new={
+ "uuids": f"{person1.uuid},{person2.uuid}",
+ "execute-merge": "true",
+ },
+ ):
+ with patch.object(view, "merge_execute", side_effect=RuntimeError):
+ response = view.merge()
+ self.assertIsInstance(response, Response)
+ self.assertEqual(self.session.query(model.Person).count(), 2)
+ self.assertFalse(self.request.session.peek_flash())
+ self.assertFalse(self.request.session.peek_flash("warning"))
+ # TODO: since response is already rendered, the error flash
+ # msg has already been popped off the stack..will have to
+ # avoid render_to_response() to properly test that..
+ self.assertFalse(self.request.session.peek_flash("error"))
+ # self.assertEqual(
+ # self.request.session.pop_flash("error"),
+ # [
+ # HTML.literal(
+ # 'Merge failed:
'
+ # 'RuntimeError
'
+ # ),
+ # ],
+ # )
+
def test_autocomplete(self):
model = self.app.model
diff --git a/tests/views/test_people.py b/tests/views/test_people.py
index 29f6694..4344589 100644
--- a/tests/views/test_people.py
+++ b/tests/views/test_people.py
@@ -154,3 +154,43 @@ class TestPersonView(WebTestCase):
response = view.make_user()
# nb. this always redirects for now
self.assertEqual(response.status_code, 302)
+
+ def test_merge_get_data(self):
+ model = self.app.model
+
+ person = model.Person(full_name="Fred Flintstone")
+ self.session.add(person)
+ user = model.User(username="fred", person=person)
+ self.session.add(user)
+ self.session.flush()
+
+ view = self.make_view()
+ data = view.merge_get_data(person)
+ self.assertIn("usernames", data)
+ self.assertEqual(data["usernames"], ["fred"])
+
+ def test_merge_execute(self):
+ model = self.app.model
+
+ person1 = model.Person(full_name="Freddie Flintstone")
+ self.session.add(person1)
+ user1 = model.User(username="freddie", person=person1)
+ self.session.add(user1)
+
+ person2 = model.Person(full_name="Fred Flintstone")
+ self.session.add(person2)
+ user2 = model.User(username="fred", person=person2)
+ self.session.add(user2)
+
+ self.session.commit()
+ self.assertEqual(self.session.query(model.Person).count(), 2)
+ self.assertEqual(self.session.query(model.User).count(), 2)
+
+ view = self.make_view()
+ with patch.object(view, "Session", return_value=self.session):
+ view.merge_execute(person1, person2)
+ self.assertEqual(self.session.query(model.Person).count(), 1)
+ self.assertEqual(self.session.query(model.User).count(), 2)
+ person = self.session.query(model.Person).one()
+ self.assertIs(person, person2)
+ self.assertEqual(len(person.users), 2)
diff --git a/tests/views/test_users.py b/tests/views/test_users.py
index 2fe08aa..b4eecb4 100644
--- a/tests/views/test_users.py
+++ b/tests/views/test_users.py
@@ -1,6 +1,6 @@
# -*- coding: utf-8; -*-
-from unittest.mock import patch
+from unittest.mock import patch, MagicMock
from sqlalchemy import orm
@@ -8,7 +8,7 @@ import colander
from wuttaweb.grids import Grid
from wuttaweb.views import users as mod
-from wuttaweb.testing import WebTestCase, FunctionalTestCase
+from wuttaweb.testing import WebTestCase, VersionWebTestCase, FunctionalTestCase
class TestUserView(WebTestCase):
@@ -496,6 +496,220 @@ class TestUserView(WebTestCase):
result = view.delete_api_token()
self.assertEqual(result, {"error": "API token not found"})
+ def test_merge_get_simple_fields(self):
+ view = self.make_view()
+
+ # password field should not be included
+ fields = view.merge_get_simple_fields()
+ self.assertNotIn("password", fields)
+ self.assertIn("username", fields)
+ self.assertIn("active", fields)
+
+ def test_merge_get_additive_fields(self):
+ view = self.make_view()
+
+ # nb. this is not a "versioned" test case, so transaction_count
+ # field will not be included
+ fields = view.merge_get_additive_fields()
+ self.assertNotIn("transaction_count", fields)
+ self.assertIn("roles", fields)
+
+ def test_merge_get_data(self):
+ model = self.app.model
+ auth = self.app.get_auth_handler()
+ view = self.make_view()
+
+ admin = auth.get_role_administrator(self.session)
+ user = model.User(username="fred")
+ user.roles.append(admin)
+ self.session.add(user)
+ self.session.commit()
+
+ # nb. this is not a "versioned" test case, so transaction_count
+ # field will not be included
+ data = view.merge_get_data(user)
+ self.assertEqual(data["username"], "fred")
+ self.assertEqual(data["roles"], [admin.name])
+ self.assertNotIn("transaction_count", data)
+
+ def test_merge_why_not(self):
+ model = self.app.model
+ view = self.make_view()
+
+ user1 = model.User(username="freddie")
+ self.session.add(user1)
+ user2 = model.User(username="fred")
+ self.session.add(user2)
+ self.session.commit()
+
+ # normally no reason not to merge
+ self.assertIsNone(view.merge_why_not(user1, user2))
+
+ # can merge even if current user is involved (being kept)
+ with patch.object(self.request, "user", new=user2):
+ self.assertIsNone(view.merge_why_not(user1, user2))
+
+ # but cannot merge if it means removing current user
+ with patch.object(self.request, "user", new=user1):
+ reason = view.merge_why_not(user1, user2)
+ self.assertEqual(reason, "Cannot remove user who is currently logged in!")
+
+ def test_merge_execute(self):
+ model = self.app.model
+ enum = self.app.enum
+ auth = self.app.get_auth_handler()
+ view = self.make_view()
+
+ admin = auth.get_role_administrator(self.session)
+ user1 = model.User(username="freddie")
+ user1.roles.append(admin)
+ self.session.add(user1)
+ user2 = model.User(username="fred")
+ self.session.add(user2)
+
+ upgrade = model.Upgrade(
+ description="test",
+ created_by=user1,
+ executed_by=user1,
+ status=enum.UpgradeStatus.SUCCESS,
+ )
+ self.session.add(upgrade)
+ self.session.commit()
+
+ with patch.object(view, "Session", return_value=self.session):
+ view.merge_execute(user1, user2)
+ self.session.commit()
+
+ self.assertEqual(self.session.query(model.User).count(), 1)
+ self.assertNotIn(user1, self.session)
+ self.assertIn(user2, self.session)
+
+ self.assertIn(admin, user2.roles)
+
+ self.assertIs(upgrade.created_by, user2)
+ self.assertIs(upgrade.executed_by, user2)
+
+
+class TestVersionedUserView(VersionWebTestCase):
+
+ def make_view(self):
+ return mod.UserView(self.request)
+
+ def test_merge_get_additive_fields(self):
+ view = self.make_view()
+
+ # nb. contrast this to the "non-versioned" test case above
+ fields = view.merge_get_additive_fields()
+ self.assertIn("transaction_count", fields)
+ self.assertIn("roles", fields)
+
+ def test_merge_get_data(self):
+ import sqlalchemy_continuum as continuum
+
+ model = self.app.model
+ auth = self.app.get_auth_handler()
+ txncls = continuum.transaction_class(model.User)
+
+ # nb. must reset the User model reference, due to nature of
+ # test setup/teardown
+ with patch.multiple(
+ mod.UserView,
+ model_class=model.User,
+ Session=MagicMock(return_value=self.session),
+ ):
+ view = self.make_view()
+
+ # make admin user
+ admin = auth.get_role_administrator(self.session)
+ user = model.User(username="fred")
+ user.roles.append(admin)
+ self.session.add(user)
+ self.session.commit()
+ self.assertEqual(self.session.query(txncls).count(), 1)
+
+ # nb. contrast this to the "non-versioned" test case above
+ data = view.merge_get_data(user)
+ self.assertEqual(data["username"], "fred")
+ self.assertEqual(data["roles"], [admin.name])
+ self.assertEqual(data["transaction_count"], 0)
+
+ # admin user then creates 2 records w/ 1 txn
+ # nb. must trick wuttaweb continuum plugin to assign author
+ with patch.object(self.request, "user", new=user):
+ person1 = model.Person(full_name="Barney Rubble")
+ self.session.add(person1)
+ person2 = model.Person(full_name="Betty Rubble")
+ self.session.add(person2)
+ self.session.commit()
+
+ self.assertEqual(self.session.query(txncls).count(), 2)
+ txn1, txn2 = self.session.query(txncls).order_by(txncls.id).all()
+ self.assertIsNone(txn1.user)
+ self.assertIs(txn2.user, user)
+
+ # nb. contrast this to the "non-versioned" test case above
+ data = view.merge_get_data(user)
+ self.assertEqual(data["username"], "fred")
+ self.assertEqual(data["roles"], [admin.name])
+ self.assertEqual(data["transaction_count"], 1)
+
+ def test_merge_execute(self):
+ import sqlalchemy_continuum as continuum
+
+ model = self.app.model
+ auth = self.app.get_auth_handler()
+ txncls = continuum.transaction_class(model.User)
+
+ # nb. must reset the User model reference, due to nature of
+ # test setup/teardown
+ with patch.multiple(
+ mod.UserView,
+ model_class=model.User,
+ Session=MagicMock(return_value=self.session),
+ ):
+ view = self.make_view()
+
+ # make pair of users
+ admin = auth.get_role_administrator(self.session)
+ user1 = model.User(username="freddie")
+ user1.roles.append(admin)
+ self.session.add(user1)
+ user2 = model.User(username="fred")
+ self.session.add(user2)
+ self.session.commit()
+ self.assertEqual(self.session.query(model.User).count(), 2)
+ self.assertEqual(self.session.query(txncls).count(), 1)
+
+ # admin user then creates 2 records w/ 1 txn
+ # nb. must trick wuttaweb continuum plugin to assign author
+ with patch.object(self.request, "user", new=user1):
+ person1 = model.Person(full_name="Barney Rubble")
+ self.session.add(person1)
+ person2 = model.Person(full_name="Betty Rubble")
+ self.session.add(person2)
+ self.session.commit()
+
+ self.assertEqual(self.session.query(txncls).count(), 2)
+ txn1, txn2 = self.session.query(txncls).order_by(txncls.id).all()
+ self.assertIsNone(txn1.user)
+ self.assertIs(txn2.user, user1)
+ self.assertEqual(len(user2.roles), 0)
+
+ # merge user1 => user2 (as user2, for 3rd txn)
+ with patch.object(self.request, "user", new=user2):
+ view.merge_execute(user1, user2)
+ self.session.commit()
+ self.assertEqual(self.session.query(txncls).count(), 3)
+ txn1, txn2, txn3 = self.session.query(txncls).order_by(txncls.id).all()
+ self.assertIs(txn3.user, user2)
+ self.assertEqual(self.session.query(model.User).count(), 1)
+ user = self.session.query(model.User).one()
+ self.assertIs(user, user2)
+
+ # user2 is now admin, and author of txn2
+ self.assertIn(admin, user2.roles)
+ self.assertIs(txn2.user, user2)
+
# TODO: this test seems to work fine on its own, but not in conjunction
# with the next class below. will have to sort this out before adding