+ You are about to merge + two ${model_title} records, possibly updating and/or deleting + various other records. +
+ ++ This tool can show you some basics but is not able to give you + the full picture of the implications of this merge. +
+ ++ You are urged to proceed with + caution! Ideally try the merge on a test site first. +
+ +Merge cannot proceed:
' + 'because i said so
' + ), + ], + ) + + # error executing merge + with patch.object(view, "merge_execute", side_effect=RuntimeError): + result = view.merge_validate_and_execute(person1, person2) + self.assertFalse(result) + self.assertEqual(self.session.query(model.Person).count(), 2) + self.assertFalse(self.request.session.peek_flash()) + self.assertFalse(self.request.session.peek_flash("warning")) + self.assertEqual( + self.request.session.pop_flash("error"), + [ + HTML.literal( + 'Merge failed:
' + 'RuntimeError
' + ), + ], + ) + + def test_merge(self): + self.pyramid_config.add_route("home", "/") + self.pyramid_config.add_route("login", "/auth/login") + self.pyramid_config.add_route("people", "/people/") + self.pyramid_config.add_route("people.merge", "/people/merge") + self.pyramid_config.add_route("people.view", "/people/{uuid}") + model = self.app.model + + class MergeRoute: + name = "people.merge" + + person1 = model.Person( + first_name="Freddie", last_name="Flintstone", full_name="Freddie Flintstone" + ) + self.session.add(person1) + person2 = model.Person( + first_name="Fred", last_name="Flintstone", full_name="Fred Flintstone" + ) + self.session.add(person2) + self.session.commit() + self.assertEqual(self.session.query(model.Person).count(), 2) + self.assertIn(person1, self.session) + + with patch.multiple( + mod.MasterView, + Session=MagicMock(return_value=self.session), + model_class=model.Person, + route_prefix="people", + create=True, + ): + view = self.make_view() + + # GET request will redirect to index + result = view.merge() + self.assertIsInstance(result, HTTPFound) + self.assertEqual(result.location, "http://example.com/people/") + self.assertEqual(self.session.query(model.Person).count(), 2) + + # assume POST from now on + with patch.multiple( + self.request, matched_route=MergeRoute, method="POST", create=True + ): + + # POST without 'execute-merge' flag shows user the diff + with patch.object( + self.request, + "POST", + new={"uuids": f"{person1.uuid},{person2.uuid}"}, + ): + response = view.merge() + self.assertIsInstance(response, Response) + self.assertEqual(response.status_code, 200) + self.assertEqual(self.session.query(model.Person).count(), 2) + self.assertFalse(self.request.session.peek_flash()) + self.assertFalse(self.request.session.peek_flash("warning")) + self.assertFalse(self.request.session.peek_flash("error")) + + # default merge logic deletes person1, then redirects + with patch.object( + self.request, + "POST", + new={ + "uuids": f"{person1.uuid},{person2.uuid}", + "execute-merge": "true", + }, + ): + result = view.merge() + self.assertIsInstance(result, HTTPFound) + self.assertEqual( + result.location, f"http://example.com/people/{person2.uuid}" + ) + self.assertEqual(self.session.query(model.Person).count(), 1) + self.assertNotIn(person1, self.session) + self.assertIn(person2, self.session) + person = self.session.query(model.Person).one() + self.assertIs(person, person2) + self.assertEqual(person.first_name, "Fred") + self.assertEqual(person.full_name, "Fred Flintstone") + self.assertFalse(self.request.session.peek_flash("warning")) + self.assertFalse(self.request.session.peek_flash("error")) + self.assertEqual( + self.request.session.pop_flash(), + ["Freddie Flintstone has been merged into Fred Flintstone"], + ) + + # restore Freddie + self.assertNotIn(person1, self.session) + person1 = self.session.merge(person1) + self.session.commit() + self.assertEqual(self.session.query(model.Person).count(), 2) + + # simple redirect if invalid uuids specified + with patch.object( + self.request, + "POST", + new={ + "uuids": "bogus1,bogus2", + "execute-merge": "true", + }, + ): + with self.assertRaises(HTTPFound) as cm: + view.merge() + self.assertEqual( + cm.exception.location, "http://example.com/people/" + ) + self.assertEqual(self.session.query(model.Person).count(), 2) + self.assertFalse(self.request.session.peek_flash()) + self.assertFalse(self.request.session.peek_flash("warning")) + self.assertFalse(self.request.session.peek_flash("error")) + + # simple redirect if unknown uuids specified + fake1 = self.app.make_true_uuid() + fake2 = self.app.make_true_uuid() + with patch.object( + self.request, + "POST", + new={ + "uuids": f"{fake1},{fake2}", + "execute-merge": "true", + }, + ): + with self.assertRaises(HTTPFound) as cm: + view.merge() + self.assertEqual( + cm.exception.location, "http://example.com/people/" + ) + self.assertEqual(self.session.query(model.Person).count(), 2) + self.assertFalse(self.request.session.peek_flash()) + self.assertFalse(self.request.session.peek_flash("warning")) + self.assertFalse(self.request.session.peek_flash("error")) + + # warning redirect if merge does not validate + with patch.object( + self.request, + "POST", + new={ + "uuids": f"{person1.uuid},{person2.uuid}", + "execute-merge": "true", + }, + ): + with patch.object( + view, "merge_why_not", return_value="because i said so" + ): + response = view.merge() + self.assertIsInstance(response, Response) + self.assertEqual(self.session.query(model.Person).count(), 2) + self.assertFalse(self.request.session.peek_flash()) + self.assertFalse(self.request.session.peek_flash("error")) + # TODO: since response is already rendered, the warning flash + # msg has already been popped off the stack..will have to + # avoid render_to_response() to properly test that.. + self.assertFalse(self.request.session.peek_flash("warning")) + # self.assertEqual( + # self.request.session.pop_flash("warning"), + # [ + # HTML.literal( + # 'Merge cannot proceed:
' + # 'because i said so
' + # ), + # ], + # ) + + # error redirect if merge execution fails + with patch.object( + self.request, + "POST", + new={ + "uuids": f"{person1.uuid},{person2.uuid}", + "execute-merge": "true", + }, + ): + with patch.object(view, "merge_execute", side_effect=RuntimeError): + response = view.merge() + self.assertIsInstance(response, Response) + self.assertEqual(self.session.query(model.Person).count(), 2) + self.assertFalse(self.request.session.peek_flash()) + self.assertFalse(self.request.session.peek_flash("warning")) + # TODO: since response is already rendered, the error flash + # msg has already been popped off the stack..will have to + # avoid render_to_response() to properly test that.. + self.assertFalse(self.request.session.peek_flash("error")) + # self.assertEqual( + # self.request.session.pop_flash("error"), + # [ + # HTML.literal( + # 'Merge failed:
' + # 'RuntimeError
' + # ), + # ], + # ) + def test_autocomplete(self): model = self.app.model diff --git a/tests/views/test_people.py b/tests/views/test_people.py index 29f6694..4344589 100644 --- a/tests/views/test_people.py +++ b/tests/views/test_people.py @@ -154,3 +154,43 @@ class TestPersonView(WebTestCase): response = view.make_user() # nb. this always redirects for now self.assertEqual(response.status_code, 302) + + def test_merge_get_data(self): + model = self.app.model + + person = model.Person(full_name="Fred Flintstone") + self.session.add(person) + user = model.User(username="fred", person=person) + self.session.add(user) + self.session.flush() + + view = self.make_view() + data = view.merge_get_data(person) + self.assertIn("usernames", data) + self.assertEqual(data["usernames"], ["fred"]) + + def test_merge_execute(self): + model = self.app.model + + person1 = model.Person(full_name="Freddie Flintstone") + self.session.add(person1) + user1 = model.User(username="freddie", person=person1) + self.session.add(user1) + + person2 = model.Person(full_name="Fred Flintstone") + self.session.add(person2) + user2 = model.User(username="fred", person=person2) + self.session.add(user2) + + self.session.commit() + self.assertEqual(self.session.query(model.Person).count(), 2) + self.assertEqual(self.session.query(model.User).count(), 2) + + view = self.make_view() + with patch.object(view, "Session", return_value=self.session): + view.merge_execute(person1, person2) + self.assertEqual(self.session.query(model.Person).count(), 1) + self.assertEqual(self.session.query(model.User).count(), 2) + person = self.session.query(model.Person).one() + self.assertIs(person, person2) + self.assertEqual(len(person.users), 2) diff --git a/tests/views/test_users.py b/tests/views/test_users.py index 2fe08aa..b4eecb4 100644 --- a/tests/views/test_users.py +++ b/tests/views/test_users.py @@ -1,6 +1,6 @@ # -*- coding: utf-8; -*- -from unittest.mock import patch +from unittest.mock import patch, MagicMock from sqlalchemy import orm @@ -8,7 +8,7 @@ import colander from wuttaweb.grids import Grid from wuttaweb.views import users as mod -from wuttaweb.testing import WebTestCase, FunctionalTestCase +from wuttaweb.testing import WebTestCase, VersionWebTestCase, FunctionalTestCase class TestUserView(WebTestCase): @@ -496,6 +496,220 @@ class TestUserView(WebTestCase): result = view.delete_api_token() self.assertEqual(result, {"error": "API token not found"}) + def test_merge_get_simple_fields(self): + view = self.make_view() + + # password field should not be included + fields = view.merge_get_simple_fields() + self.assertNotIn("password", fields) + self.assertIn("username", fields) + self.assertIn("active", fields) + + def test_merge_get_additive_fields(self): + view = self.make_view() + + # nb. this is not a "versioned" test case, so transaction_count + # field will not be included + fields = view.merge_get_additive_fields() + self.assertNotIn("transaction_count", fields) + self.assertIn("roles", fields) + + def test_merge_get_data(self): + model = self.app.model + auth = self.app.get_auth_handler() + view = self.make_view() + + admin = auth.get_role_administrator(self.session) + user = model.User(username="fred") + user.roles.append(admin) + self.session.add(user) + self.session.commit() + + # nb. this is not a "versioned" test case, so transaction_count + # field will not be included + data = view.merge_get_data(user) + self.assertEqual(data["username"], "fred") + self.assertEqual(data["roles"], [admin.name]) + self.assertNotIn("transaction_count", data) + + def test_merge_why_not(self): + model = self.app.model + view = self.make_view() + + user1 = model.User(username="freddie") + self.session.add(user1) + user2 = model.User(username="fred") + self.session.add(user2) + self.session.commit() + + # normally no reason not to merge + self.assertIsNone(view.merge_why_not(user1, user2)) + + # can merge even if current user is involved (being kept) + with patch.object(self.request, "user", new=user2): + self.assertIsNone(view.merge_why_not(user1, user2)) + + # but cannot merge if it means removing current user + with patch.object(self.request, "user", new=user1): + reason = view.merge_why_not(user1, user2) + self.assertEqual(reason, "Cannot remove user who is currently logged in!") + + def test_merge_execute(self): + model = self.app.model + enum = self.app.enum + auth = self.app.get_auth_handler() + view = self.make_view() + + admin = auth.get_role_administrator(self.session) + user1 = model.User(username="freddie") + user1.roles.append(admin) + self.session.add(user1) + user2 = model.User(username="fred") + self.session.add(user2) + + upgrade = model.Upgrade( + description="test", + created_by=user1, + executed_by=user1, + status=enum.UpgradeStatus.SUCCESS, + ) + self.session.add(upgrade) + self.session.commit() + + with patch.object(view, "Session", return_value=self.session): + view.merge_execute(user1, user2) + self.session.commit() + + self.assertEqual(self.session.query(model.User).count(), 1) + self.assertNotIn(user1, self.session) + self.assertIn(user2, self.session) + + self.assertIn(admin, user2.roles) + + self.assertIs(upgrade.created_by, user2) + self.assertIs(upgrade.executed_by, user2) + + +class TestVersionedUserView(VersionWebTestCase): + + def make_view(self): + return mod.UserView(self.request) + + def test_merge_get_additive_fields(self): + view = self.make_view() + + # nb. contrast this to the "non-versioned" test case above + fields = view.merge_get_additive_fields() + self.assertIn("transaction_count", fields) + self.assertIn("roles", fields) + + def test_merge_get_data(self): + import sqlalchemy_continuum as continuum + + model = self.app.model + auth = self.app.get_auth_handler() + txncls = continuum.transaction_class(model.User) + + # nb. must reset the User model reference, due to nature of + # test setup/teardown + with patch.multiple( + mod.UserView, + model_class=model.User, + Session=MagicMock(return_value=self.session), + ): + view = self.make_view() + + # make admin user + admin = auth.get_role_administrator(self.session) + user = model.User(username="fred") + user.roles.append(admin) + self.session.add(user) + self.session.commit() + self.assertEqual(self.session.query(txncls).count(), 1) + + # nb. contrast this to the "non-versioned" test case above + data = view.merge_get_data(user) + self.assertEqual(data["username"], "fred") + self.assertEqual(data["roles"], [admin.name]) + self.assertEqual(data["transaction_count"], 0) + + # admin user then creates 2 records w/ 1 txn + # nb. must trick wuttaweb continuum plugin to assign author + with patch.object(self.request, "user", new=user): + person1 = model.Person(full_name="Barney Rubble") + self.session.add(person1) + person2 = model.Person(full_name="Betty Rubble") + self.session.add(person2) + self.session.commit() + + self.assertEqual(self.session.query(txncls).count(), 2) + txn1, txn2 = self.session.query(txncls).order_by(txncls.id).all() + self.assertIsNone(txn1.user) + self.assertIs(txn2.user, user) + + # nb. contrast this to the "non-versioned" test case above + data = view.merge_get_data(user) + self.assertEqual(data["username"], "fred") + self.assertEqual(data["roles"], [admin.name]) + self.assertEqual(data["transaction_count"], 1) + + def test_merge_execute(self): + import sqlalchemy_continuum as continuum + + model = self.app.model + auth = self.app.get_auth_handler() + txncls = continuum.transaction_class(model.User) + + # nb. must reset the User model reference, due to nature of + # test setup/teardown + with patch.multiple( + mod.UserView, + model_class=model.User, + Session=MagicMock(return_value=self.session), + ): + view = self.make_view() + + # make pair of users + admin = auth.get_role_administrator(self.session) + user1 = model.User(username="freddie") + user1.roles.append(admin) + self.session.add(user1) + user2 = model.User(username="fred") + self.session.add(user2) + self.session.commit() + self.assertEqual(self.session.query(model.User).count(), 2) + self.assertEqual(self.session.query(txncls).count(), 1) + + # admin user then creates 2 records w/ 1 txn + # nb. must trick wuttaweb continuum plugin to assign author + with patch.object(self.request, "user", new=user1): + person1 = model.Person(full_name="Barney Rubble") + self.session.add(person1) + person2 = model.Person(full_name="Betty Rubble") + self.session.add(person2) + self.session.commit() + + self.assertEqual(self.session.query(txncls).count(), 2) + txn1, txn2 = self.session.query(txncls).order_by(txncls.id).all() + self.assertIsNone(txn1.user) + self.assertIs(txn2.user, user1) + self.assertEqual(len(user2.roles), 0) + + # merge user1 => user2 (as user2, for 3rd txn) + with patch.object(self.request, "user", new=user2): + view.merge_execute(user1, user2) + self.session.commit() + self.assertEqual(self.session.query(txncls).count(), 3) + txn1, txn2, txn3 = self.session.query(txncls).order_by(txncls.id).all() + self.assertIs(txn3.user, user2) + self.assertEqual(self.session.query(model.User).count(), 1) + user = self.session.query(model.User).one() + self.assertIs(user, user2) + + # user2 is now admin, and author of txn2 + self.assertIn(admin, user2.roles) + self.assertIs(txn2.user, user2) + # TODO: this test seems to work fine on its own, but not in conjunction # with the next class below. will have to sort this out before adding