3
0
Fork 0

feat: add basic support for merging 2 records, w/ preview

including basic logic for merging Person or User records
This commit is contained in:
Lance Edgar 2026-03-20 17:20:02 -05:00
parent 8bfbf0e570
commit ee3a789682
10 changed files with 1554 additions and 73 deletions

View file

@ -10,6 +10,7 @@ from sqlalchemy import orm
from pyramid import testing
from pyramid.response import Response
from pyramid.httpexceptions import HTTPNotFound, HTTPFound
from webhelpers2.html import HTML
from wuttjamaican.conf import WuttaConfig
from wuttaweb.views import master as mod
@ -36,6 +37,7 @@ class TestMasterView(WebTestCase):
downloadable=True,
executable=True,
configurable=True,
mergeable=True,
has_rows=True,
rows_creatable=True,
):
@ -623,73 +625,122 @@ class TestMasterView(WebTestCase):
def test_make_model_grid(self):
self.pyramid_config.add_route("settings.delete_bulk", "/settings/delete-bulk")
self.pyramid_config.add_route("people.merge", "/people/merge")
model = self.app.model
# no model class
with patch.multiple(
mod.MasterView, create=True, model_name="Widget", model_key="uuid"
):
view = mod.MasterView(self.request)
grid = view.make_model_grid()
self.assertIsNone(grid.model_class)
with patch.object(mod.MasterView, "Session", return_value=self.session):
# explicit model class
with patch.multiple(mod.MasterView, create=True, model_class=model.Setting):
grid = view.make_model_grid(session=self.session)
self.assertIs(grid.model_class, model.Setting)
# no model class
with patch.multiple(
mod.MasterView, create=True, model_name="Widget", model_key="uuid"
):
view = self.make_view()
grid = view.make_model_grid()
self.assertIsNone(grid.model_class)
# no row class by default
with patch.multiple(mod.MasterView, create=True, model_class=model.Setting):
grid = view.make_model_grid(session=self.session)
self.assertIsNone(grid.row_class)
# can specify row class
get_row_class = MagicMock()
with patch.multiple(
mod.MasterView,
create=True,
model_class=model.Setting,
grid_row_class=get_row_class,
):
grid = view.make_model_grid(session=self.session)
self.assertIs(grid.row_class, get_row_class)
# no actions by default
with patch.multiple(mod.MasterView, create=True, model_class=model.Setting):
grid = view.make_model_grid(session=self.session)
self.assertEqual(grid.actions, [])
# now let's test some more actions logic
with patch.multiple(
mod.MasterView,
create=True,
model_class=model.Setting,
viewable=True,
editable=True,
deletable=True,
):
# should have 3 actions now, but for lack of perms
grid = view.make_model_grid(session=self.session)
self.assertEqual(len(grid.actions), 0)
# but root user has perms, so gets 3 actions
with patch.object(self.request, "is_root", new=True):
# explicit model class
with patch.multiple(mod.MasterView, create=True, model_class=model.Setting):
view = self.make_view()
grid = view.make_model_grid(session=self.session)
self.assertEqual(len(grid.actions), 3)
self.assertIs(grid.model_class, model.Setting)
# no tools by default
with patch.multiple(mod.MasterView, create=True, model_class=model.Setting):
grid = view.make_model_grid(session=self.session)
self.assertEqual(grid.tools, {})
# delete-results tool added if master/perms allow
with patch.multiple(
mod.MasterView, create=True, model_class=model.Setting, deletable_bulk=True
):
with patch.object(self.request, "is_root", new=True):
# no row class by default
with patch.multiple(mod.MasterView, create=True, model_class=model.Setting):
view = self.make_view()
grid = view.make_model_grid(session=self.session)
self.assertIn("delete-results", grid.tools)
self.assertIsNone(grid.row_class)
# can specify row class
get_row_class = MagicMock()
with patch.multiple(
mod.MasterView,
create=True,
model_class=model.Setting,
grid_row_class=get_row_class,
):
view = self.make_view()
grid = view.make_model_grid(session=self.session)
self.assertIs(grid.row_class, get_row_class)
# no actions by default
with patch.multiple(mod.MasterView, create=True, model_class=model.Setting):
view = self.make_view()
grid = view.make_model_grid(session=self.session)
self.assertEqual(grid.actions, [])
# now let's test some more actions logic
with patch.multiple(
mod.MasterView,
create=True,
model_class=model.Setting,
viewable=True,
editable=True,
deletable=True,
):
view = self.make_view()
# should have 3 actions now, but for lack of perms
grid = view.make_model_grid(session=self.session)
self.assertEqual(len(grid.actions), 0)
# but root user has perms, so gets 3 actions
with patch.object(self.request, "is_root", new=True):
grid = view.make_model_grid(session=self.session)
self.assertEqual(len(grid.actions), 3)
# no tools by default
with patch.multiple(mod.MasterView, create=True, model_class=model.Setting):
view = self.make_view()
grid = view.make_model_grid(session=self.session)
self.assertEqual(grid.tools, {})
# delete-results tool added if master/perms allow
with patch.multiple(
mod.MasterView,
create=True,
model_class=model.Setting,
deletable_bulk=True,
):
view = self.make_view()
with patch.object(self.request, "is_root", new=True):
grid = view.make_model_grid(session=self.session)
self.assertIn("delete-results", grid.tools)
# merge tool added if master/perms allow
with patch.multiple(
mod.MasterView,
model_class=model.Person,
route_prefix="people",
mergeable=True,
create=True,
):
view = self.make_view()
with patch.object(self.request, "is_root", new=True):
grid = view.make_model_grid()
self.assertIn("merge", grid.tools)
# test checkable flag
with patch.multiple(
mod.MasterView,
model_class=model.Person,
route_prefix="people",
create=True,
):
view = self.make_view()
# not checkable by default
grid = view.make_model_grid()
self.assertFalse(grid.checkable)
# but can override
grid = view.make_model_grid(checkable=True)
self.assertTrue(grid.checkable)
# checkable is true if merge allowed
with patch.object(mod.MasterView, "mergeable", new=True):
with patch.object(self.request, "is_root", new=True):
grid = view.make_model_grid()
self.assertTrue(grid.checkable)
def test_get_grid_data(self):
model = self.app.model
@ -1604,6 +1655,426 @@ class TestMasterView(WebTestCase):
# nb. nothing was deleted
self.assertEqual(self.session.query(model.Setting).count(), 6)
def test_merge_get_simple_fields(self):
model = self.app.model
with patch.object(mod.MasterView, "model_class", new=model.Person):
view = self.make_view()
# fields include table columns by default
fields = view.merge_get_simple_fields()
self.assertEqual(
fields,
["uuid", "full_name", "first_name", "middle_name", "last_name"],
)
# but class can specify fields
view.merge_simple_fields = ["first_name", "last_name"]
fields = view.merge_get_simple_fields()
self.assertEqual(
fields,
["first_name", "last_name"],
)
def test_merge_get_additive_fields(self):
model = self.app.model
with patch.object(mod.MasterView, "model_class", new=model.Person):
view = self.make_view()
# no additive fields by default
fields = view.merge_get_additive_fields()
self.assertEqual(fields, [])
# but class can specify fields
view.merge_additive_fields = ["usernames"]
fields = view.merge_get_additive_fields()
self.assertEqual(fields, ["usernames"])
def test_merge_get_coalesce_fields(self):
model = self.app.model
with patch.object(mod.MasterView, "model_class", new=model.Person):
view = self.make_view()
# no coalesce fields by default
fields = view.merge_get_coalesce_fields()
self.assertEqual(fields, [])
# but class can specify fields
view.merge_coalesce_fields = ["active"]
fields = view.merge_get_coalesce_fields()
self.assertEqual(fields, ["active"])
def test_merge_get_all_fields(self):
model = self.app.model
with patch.object(mod.MasterView, "model_class", new=model.Person):
view = self.make_view()
# nb. "all" fields will be a sorted list
# only column (simple) fields by default
fields = view.merge_get_all_fields()
self.assertEqual(
fields,
["first_name", "full_name", "last_name", "middle_name", "uuid"],
)
# but class can specify fields
view.merge_simple_fields = ["first_name", "last_name"]
view.merge_additive_fields = ["usernames"]
view.merge_coalesce_fields = ["active"]
fields = view.merge_get_all_fields()
self.assertEqual(
fields,
["active", "first_name", "last_name", "usernames"],
)
def test_merge_get_data(self):
model = self.app.model
person = model.Person(first_name="Fred", last_name="Flintstone")
with patch.object(mod.MasterView, "model_class", new=model.Person):
view = self.make_view()
# data will include "all" fields
view.merge_simple_fields = ["first_name", "last_name", "usernames"]
data = view.merge_get_data(person)
self.assertEqual(
data,
{
"first_name": "Fred",
"last_name": "Flintstone",
# nb. person has no such attr, so null value
"usernames": None,
},
)
def test_merge_get_final_data(self):
model = self.app.model
removing = {
"first_name": "Freddie",
"last_name": "Flintstone",
"user_count": 1,
"usernames": ["freddie"],
"some_value": 42,
"active": True,
}
keeping = {
"first_name": "Fred",
"last_name": "Flintstone",
"user_count": 1,
"usernames": ["fred"],
"some_value": None,
"active": False,
}
with patch.object(mod.MasterView, "model_class", new=model.Person):
view = self.make_view()
view.merge_simple_fields = ["first_name", "last_name"]
view.merge_additive_fields = ["user_count", "usernames"]
view.merge_coalesce_fields = ["some_value", "active"]
final = view.merge_get_final_data(removing, keeping)
self.assertEqual(
final,
{
"first_name": "Fred",
"last_name": "Flintstone",
"user_count": 2,
"usernames": ["fred", "freddie"],
"some_value": 42,
"active": True,
},
)
def test_merge_execute(self):
model = self.app.model
person1 = model.Person(
first_name="Freddie", last_name="Flintstone", full_name="Freddie Flintstone"
)
self.session.add(person1)
person2 = model.Person(
first_name="Fred", last_name="Flintstone", full_name="Fred Flintstone"
)
self.session.add(person2)
self.session.commit()
self.assertEqual(self.session.query(model.Person).count(), 2)
with patch.object(mod.MasterView, "Session", return_value=self.session):
view = self.make_view()
# default merge logic just deletes 'removing' person1
view.merge_execute(person1, person2)
self.assertEqual(self.session.query(model.Person).count(), 1)
person = self.session.query(model.Person).one()
self.assertIs(person, person2)
self.assertEqual(person.first_name, "Fred")
self.assertEqual(person.full_name, "Fred Flintstone")
def test_merge_validate_and_execute(self):
model = self.app.model
person1 = model.Person(
first_name="Freddie", last_name="Flintstone", full_name="Freddie Flintstone"
)
self.session.add(person1)
person2 = model.Person(
first_name="Fred", last_name="Flintstone", full_name="Fred Flintstone"
)
self.session.add(person2)
self.session.commit()
self.assertEqual(self.session.query(model.Person).count(), 2)
self.assertIn(person1, self.session)
with patch.object(mod.MasterView, "Session", return_value=self.session):
view = self.make_view()
# default merge logic just deletes 'removing' person1
result = view.merge_validate_and_execute(person1, person2)
self.assertTrue(result)
self.assertEqual(self.session.query(model.Person).count(), 1)
person = self.session.query(model.Person).one()
self.assertIs(person, person2)
self.assertEqual(person.first_name, "Fred")
self.assertEqual(person.full_name, "Fred Flintstone")
self.assertFalse(self.request.session.peek_flash("warning"))
self.assertFalse(self.request.session.peek_flash("error"))
self.assertEqual(
self.request.session.pop_flash(),
["Freddie Flintstone has been merged into Fred Flintstone"],
)
# restore Freddie
self.assertNotIn(person1, self.session)
person1 = self.session.merge(person1)
self.session.commit()
self.assertEqual(self.session.query(model.Person).count(), 2)
# merge does not validate
with patch.object(view, "merge_why_not", return_value="because i said so"):
result = view.merge_validate_and_execute(person1, person2)
self.assertFalse(result)
self.assertEqual(self.session.query(model.Person).count(), 2)
self.assertFalse(self.request.session.peek_flash())
self.assertFalse(self.request.session.peek_flash("error"))
self.assertEqual(
self.request.session.pop_flash("warning"),
[
HTML.literal(
'<p class="block">Merge cannot proceed:</p>'
'<p class="block">because i said so</p>'
),
],
)
# error executing merge
with patch.object(view, "merge_execute", side_effect=RuntimeError):
result = view.merge_validate_and_execute(person1, person2)
self.assertFalse(result)
self.assertEqual(self.session.query(model.Person).count(), 2)
self.assertFalse(self.request.session.peek_flash())
self.assertFalse(self.request.session.peek_flash("warning"))
self.assertEqual(
self.request.session.pop_flash("error"),
[
HTML.literal(
'<p class="block">Merge failed:</p>'
'<p class="block">RuntimeError</p>'
),
],
)
def test_merge(self):
self.pyramid_config.add_route("home", "/")
self.pyramid_config.add_route("login", "/auth/login")
self.pyramid_config.add_route("people", "/people/")
self.pyramid_config.add_route("people.merge", "/people/merge")
self.pyramid_config.add_route("people.view", "/people/{uuid}")
model = self.app.model
class MergeRoute:
name = "people.merge"
person1 = model.Person(
first_name="Freddie", last_name="Flintstone", full_name="Freddie Flintstone"
)
self.session.add(person1)
person2 = model.Person(
first_name="Fred", last_name="Flintstone", full_name="Fred Flintstone"
)
self.session.add(person2)
self.session.commit()
self.assertEqual(self.session.query(model.Person).count(), 2)
self.assertIn(person1, self.session)
with patch.multiple(
mod.MasterView,
Session=MagicMock(return_value=self.session),
model_class=model.Person,
route_prefix="people",
create=True,
):
view = self.make_view()
# GET request will redirect to index
result = view.merge()
self.assertIsInstance(result, HTTPFound)
self.assertEqual(result.location, "http://example.com/people/")
self.assertEqual(self.session.query(model.Person).count(), 2)
# assume POST from now on
with patch.multiple(
self.request, matched_route=MergeRoute, method="POST", create=True
):
# POST without 'execute-merge' flag shows user the diff
with patch.object(
self.request,
"POST",
new={"uuids": f"{person1.uuid},{person2.uuid}"},
):
response = view.merge()
self.assertIsInstance(response, Response)
self.assertEqual(response.status_code, 200)
self.assertEqual(self.session.query(model.Person).count(), 2)
self.assertFalse(self.request.session.peek_flash())
self.assertFalse(self.request.session.peek_flash("warning"))
self.assertFalse(self.request.session.peek_flash("error"))
# default merge logic deletes person1, then redirects
with patch.object(
self.request,
"POST",
new={
"uuids": f"{person1.uuid},{person2.uuid}",
"execute-merge": "true",
},
):
result = view.merge()
self.assertIsInstance(result, HTTPFound)
self.assertEqual(
result.location, f"http://example.com/people/{person2.uuid}"
)
self.assertEqual(self.session.query(model.Person).count(), 1)
self.assertNotIn(person1, self.session)
self.assertIn(person2, self.session)
person = self.session.query(model.Person).one()
self.assertIs(person, person2)
self.assertEqual(person.first_name, "Fred")
self.assertEqual(person.full_name, "Fred Flintstone")
self.assertFalse(self.request.session.peek_flash("warning"))
self.assertFalse(self.request.session.peek_flash("error"))
self.assertEqual(
self.request.session.pop_flash(),
["Freddie Flintstone has been merged into Fred Flintstone"],
)
# restore Freddie
self.assertNotIn(person1, self.session)
person1 = self.session.merge(person1)
self.session.commit()
self.assertEqual(self.session.query(model.Person).count(), 2)
# simple redirect if invalid uuids specified
with patch.object(
self.request,
"POST",
new={
"uuids": "bogus1,bogus2",
"execute-merge": "true",
},
):
with self.assertRaises(HTTPFound) as cm:
view.merge()
self.assertEqual(
cm.exception.location, "http://example.com/people/"
)
self.assertEqual(self.session.query(model.Person).count(), 2)
self.assertFalse(self.request.session.peek_flash())
self.assertFalse(self.request.session.peek_flash("warning"))
self.assertFalse(self.request.session.peek_flash("error"))
# simple redirect if unknown uuids specified
fake1 = self.app.make_true_uuid()
fake2 = self.app.make_true_uuid()
with patch.object(
self.request,
"POST",
new={
"uuids": f"{fake1},{fake2}",
"execute-merge": "true",
},
):
with self.assertRaises(HTTPFound) as cm:
view.merge()
self.assertEqual(
cm.exception.location, "http://example.com/people/"
)
self.assertEqual(self.session.query(model.Person).count(), 2)
self.assertFalse(self.request.session.peek_flash())
self.assertFalse(self.request.session.peek_flash("warning"))
self.assertFalse(self.request.session.peek_flash("error"))
# warning redirect if merge does not validate
with patch.object(
self.request,
"POST",
new={
"uuids": f"{person1.uuid},{person2.uuid}",
"execute-merge": "true",
},
):
with patch.object(
view, "merge_why_not", return_value="because i said so"
):
response = view.merge()
self.assertIsInstance(response, Response)
self.assertEqual(self.session.query(model.Person).count(), 2)
self.assertFalse(self.request.session.peek_flash())
self.assertFalse(self.request.session.peek_flash("error"))
# TODO: since response is already rendered, the warning flash
# msg has already been popped off the stack..will have to
# avoid render_to_response() to properly test that..
self.assertFalse(self.request.session.peek_flash("warning"))
# self.assertEqual(
# self.request.session.pop_flash("warning"),
# [
# HTML.literal(
# '<p class="block">Merge cannot proceed:</p>'
# '<p class="block">because i said so</p>'
# ),
# ],
# )
# error redirect if merge execution fails
with patch.object(
self.request,
"POST",
new={
"uuids": f"{person1.uuid},{person2.uuid}",
"execute-merge": "true",
},
):
with patch.object(view, "merge_execute", side_effect=RuntimeError):
response = view.merge()
self.assertIsInstance(response, Response)
self.assertEqual(self.session.query(model.Person).count(), 2)
self.assertFalse(self.request.session.peek_flash())
self.assertFalse(self.request.session.peek_flash("warning"))
# TODO: since response is already rendered, the error flash
# msg has already been popped off the stack..will have to
# avoid render_to_response() to properly test that..
self.assertFalse(self.request.session.peek_flash("error"))
# self.assertEqual(
# self.request.session.pop_flash("error"),
# [
# HTML.literal(
# '<p class="block">Merge failed:</p>'
# '<p class="block">RuntimeError</p>'
# ),
# ],
# )
def test_autocomplete(self):
model = self.app.model

View file

@ -154,3 +154,43 @@ class TestPersonView(WebTestCase):
response = view.make_user()
# nb. this always redirects for now
self.assertEqual(response.status_code, 302)
def test_merge_get_data(self):
model = self.app.model
person = model.Person(full_name="Fred Flintstone")
self.session.add(person)
user = model.User(username="fred", person=person)
self.session.add(user)
self.session.flush()
view = self.make_view()
data = view.merge_get_data(person)
self.assertIn("usernames", data)
self.assertEqual(data["usernames"], ["fred"])
def test_merge_execute(self):
model = self.app.model
person1 = model.Person(full_name="Freddie Flintstone")
self.session.add(person1)
user1 = model.User(username="freddie", person=person1)
self.session.add(user1)
person2 = model.Person(full_name="Fred Flintstone")
self.session.add(person2)
user2 = model.User(username="fred", person=person2)
self.session.add(user2)
self.session.commit()
self.assertEqual(self.session.query(model.Person).count(), 2)
self.assertEqual(self.session.query(model.User).count(), 2)
view = self.make_view()
with patch.object(view, "Session", return_value=self.session):
view.merge_execute(person1, person2)
self.assertEqual(self.session.query(model.Person).count(), 1)
self.assertEqual(self.session.query(model.User).count(), 2)
person = self.session.query(model.Person).one()
self.assertIs(person, person2)
self.assertEqual(len(person.users), 2)

View file

@ -1,6 +1,6 @@
# -*- coding: utf-8; -*-
from unittest.mock import patch
from unittest.mock import patch, MagicMock
from sqlalchemy import orm
@ -8,7 +8,7 @@ import colander
from wuttaweb.grids import Grid
from wuttaweb.views import users as mod
from wuttaweb.testing import WebTestCase, FunctionalTestCase
from wuttaweb.testing import WebTestCase, VersionWebTestCase, FunctionalTestCase
class TestUserView(WebTestCase):
@ -496,6 +496,220 @@ class TestUserView(WebTestCase):
result = view.delete_api_token()
self.assertEqual(result, {"error": "API token not found"})
def test_merge_get_simple_fields(self):
view = self.make_view()
# password field should not be included
fields = view.merge_get_simple_fields()
self.assertNotIn("password", fields)
self.assertIn("username", fields)
self.assertIn("active", fields)
def test_merge_get_additive_fields(self):
view = self.make_view()
# nb. this is not a "versioned" test case, so transaction_count
# field will not be included
fields = view.merge_get_additive_fields()
self.assertNotIn("transaction_count", fields)
self.assertIn("roles", fields)
def test_merge_get_data(self):
model = self.app.model
auth = self.app.get_auth_handler()
view = self.make_view()
admin = auth.get_role_administrator(self.session)
user = model.User(username="fred")
user.roles.append(admin)
self.session.add(user)
self.session.commit()
# nb. this is not a "versioned" test case, so transaction_count
# field will not be included
data = view.merge_get_data(user)
self.assertEqual(data["username"], "fred")
self.assertEqual(data["roles"], [admin.name])
self.assertNotIn("transaction_count", data)
def test_merge_why_not(self):
model = self.app.model
view = self.make_view()
user1 = model.User(username="freddie")
self.session.add(user1)
user2 = model.User(username="fred")
self.session.add(user2)
self.session.commit()
# normally no reason not to merge
self.assertIsNone(view.merge_why_not(user1, user2))
# can merge even if current user is involved (being kept)
with patch.object(self.request, "user", new=user2):
self.assertIsNone(view.merge_why_not(user1, user2))
# but cannot merge if it means removing current user
with patch.object(self.request, "user", new=user1):
reason = view.merge_why_not(user1, user2)
self.assertEqual(reason, "Cannot remove user who is currently logged in!")
def test_merge_execute(self):
model = self.app.model
enum = self.app.enum
auth = self.app.get_auth_handler()
view = self.make_view()
admin = auth.get_role_administrator(self.session)
user1 = model.User(username="freddie")
user1.roles.append(admin)
self.session.add(user1)
user2 = model.User(username="fred")
self.session.add(user2)
upgrade = model.Upgrade(
description="test",
created_by=user1,
executed_by=user1,
status=enum.UpgradeStatus.SUCCESS,
)
self.session.add(upgrade)
self.session.commit()
with patch.object(view, "Session", return_value=self.session):
view.merge_execute(user1, user2)
self.session.commit()
self.assertEqual(self.session.query(model.User).count(), 1)
self.assertNotIn(user1, self.session)
self.assertIn(user2, self.session)
self.assertIn(admin, user2.roles)
self.assertIs(upgrade.created_by, user2)
self.assertIs(upgrade.executed_by, user2)
class TestVersionedUserView(VersionWebTestCase):
def make_view(self):
return mod.UserView(self.request)
def test_merge_get_additive_fields(self):
view = self.make_view()
# nb. contrast this to the "non-versioned" test case above
fields = view.merge_get_additive_fields()
self.assertIn("transaction_count", fields)
self.assertIn("roles", fields)
def test_merge_get_data(self):
import sqlalchemy_continuum as continuum
model = self.app.model
auth = self.app.get_auth_handler()
txncls = continuum.transaction_class(model.User)
# nb. must reset the User model reference, due to nature of
# test setup/teardown
with patch.multiple(
mod.UserView,
model_class=model.User,
Session=MagicMock(return_value=self.session),
):
view = self.make_view()
# make admin user
admin = auth.get_role_administrator(self.session)
user = model.User(username="fred")
user.roles.append(admin)
self.session.add(user)
self.session.commit()
self.assertEqual(self.session.query(txncls).count(), 1)
# nb. contrast this to the "non-versioned" test case above
data = view.merge_get_data(user)
self.assertEqual(data["username"], "fred")
self.assertEqual(data["roles"], [admin.name])
self.assertEqual(data["transaction_count"], 0)
# admin user then creates 2 records w/ 1 txn
# nb. must trick wuttaweb continuum plugin to assign author
with patch.object(self.request, "user", new=user):
person1 = model.Person(full_name="Barney Rubble")
self.session.add(person1)
person2 = model.Person(full_name="Betty Rubble")
self.session.add(person2)
self.session.commit()
self.assertEqual(self.session.query(txncls).count(), 2)
txn1, txn2 = self.session.query(txncls).order_by(txncls.id).all()
self.assertIsNone(txn1.user)
self.assertIs(txn2.user, user)
# nb. contrast this to the "non-versioned" test case above
data = view.merge_get_data(user)
self.assertEqual(data["username"], "fred")
self.assertEqual(data["roles"], [admin.name])
self.assertEqual(data["transaction_count"], 1)
def test_merge_execute(self):
import sqlalchemy_continuum as continuum
model = self.app.model
auth = self.app.get_auth_handler()
txncls = continuum.transaction_class(model.User)
# nb. must reset the User model reference, due to nature of
# test setup/teardown
with patch.multiple(
mod.UserView,
model_class=model.User,
Session=MagicMock(return_value=self.session),
):
view = self.make_view()
# make pair of users
admin = auth.get_role_administrator(self.session)
user1 = model.User(username="freddie")
user1.roles.append(admin)
self.session.add(user1)
user2 = model.User(username="fred")
self.session.add(user2)
self.session.commit()
self.assertEqual(self.session.query(model.User).count(), 2)
self.assertEqual(self.session.query(txncls).count(), 1)
# admin user then creates 2 records w/ 1 txn
# nb. must trick wuttaweb continuum plugin to assign author
with patch.object(self.request, "user", new=user1):
person1 = model.Person(full_name="Barney Rubble")
self.session.add(person1)
person2 = model.Person(full_name="Betty Rubble")
self.session.add(person2)
self.session.commit()
self.assertEqual(self.session.query(txncls).count(), 2)
txn1, txn2 = self.session.query(txncls).order_by(txncls.id).all()
self.assertIsNone(txn1.user)
self.assertIs(txn2.user, user1)
self.assertEqual(len(user2.roles), 0)
# merge user1 => user2 (as user2, for 3rd txn)
with patch.object(self.request, "user", new=user2):
view.merge_execute(user1, user2)
self.session.commit()
self.assertEqual(self.session.query(txncls).count(), 3)
txn1, txn2, txn3 = self.session.query(txncls).order_by(txncls.id).all()
self.assertIs(txn3.user, user2)
self.assertEqual(self.session.query(model.User).count(), 1)
user = self.session.query(model.User).one()
self.assertIs(user, user2)
# user2 is now admin, and author of txn2
self.assertIn(admin, user2.roles)
self.assertIs(txn2.user, user2)
# TODO: this test seems to work fine on its own, but not in conjunction
# with the next class below. will have to sort this out before adding