- You are about to merge - two ${model_title} records, possibly updating and/or deleting - various other records. -
- -- This tool can show you some basics but is not able to give you - the full picture of the implications of this merge. -
- -- You are urged to proceed with - caution! Ideally try the merge on a test site first. -
- -Merge cannot proceed:
' - 'because i said so
' - ), - ], - ) - - # error executing merge - with patch.object(view, "merge_execute", side_effect=RuntimeError): - result = view.merge_validate_and_execute(person1, person2) - self.assertFalse(result) - self.assertEqual(self.session.query(model.Person).count(), 2) - self.assertFalse(self.request.session.peek_flash()) - self.assertFalse(self.request.session.peek_flash("warning")) - self.assertEqual( - self.request.session.pop_flash("error"), - [ - HTML.literal( - 'Merge failed:
' - 'RuntimeError
' - ), - ], - ) - - def test_merge(self): - self.pyramid_config.add_route("home", "/") - self.pyramid_config.add_route("login", "/auth/login") - self.pyramid_config.add_route("people", "/people/") - self.pyramid_config.add_route("people.merge", "/people/merge") - self.pyramid_config.add_route("people.view", "/people/{uuid}") - model = self.app.model - - class MergeRoute: - name = "people.merge" - - person1 = model.Person( - first_name="Freddie", last_name="Flintstone", full_name="Freddie Flintstone" - ) - self.session.add(person1) - person2 = model.Person( - first_name="Fred", last_name="Flintstone", full_name="Fred Flintstone" - ) - self.session.add(person2) - self.session.commit() - self.assertEqual(self.session.query(model.Person).count(), 2) - self.assertIn(person1, self.session) - - with patch.multiple( - mod.MasterView, - Session=MagicMock(return_value=self.session), - model_class=model.Person, - route_prefix="people", - create=True, - ): - view = self.make_view() - - # GET request will redirect to index - result = view.merge() - self.assertIsInstance(result, HTTPFound) - self.assertEqual(result.location, "http://example.com/people/") - self.assertEqual(self.session.query(model.Person).count(), 2) - - # assume POST from now on - with patch.multiple( - self.request, matched_route=MergeRoute, method="POST", create=True - ): - - # POST without 'execute-merge' flag shows user the diff - with patch.object( - self.request, - "POST", - new={"uuids": f"{person1.uuid},{person2.uuid}"}, - ): - response = view.merge() - self.assertIsInstance(response, Response) - self.assertEqual(response.status_code, 200) - self.assertEqual(self.session.query(model.Person).count(), 2) - self.assertFalse(self.request.session.peek_flash()) - self.assertFalse(self.request.session.peek_flash("warning")) - self.assertFalse(self.request.session.peek_flash("error")) - - # default merge logic deletes person1, then redirects - with patch.object( - self.request, - "POST", - new={ - "uuids": f"{person1.uuid},{person2.uuid}", - "execute-merge": "true", - }, - ): - result = view.merge() - self.assertIsInstance(result, HTTPFound) - self.assertEqual( - result.location, f"http://example.com/people/{person2.uuid}" - ) - self.assertEqual(self.session.query(model.Person).count(), 1) - self.assertNotIn(person1, self.session) - self.assertIn(person2, self.session) - person = self.session.query(model.Person).one() - self.assertIs(person, person2) - self.assertEqual(person.first_name, "Fred") - self.assertEqual(person.full_name, "Fred Flintstone") - self.assertFalse(self.request.session.peek_flash("warning")) - self.assertFalse(self.request.session.peek_flash("error")) - self.assertEqual( - self.request.session.pop_flash(), - ["Freddie Flintstone has been merged into Fred Flintstone"], - ) - - # restore Freddie - self.assertNotIn(person1, self.session) - person1 = self.session.merge(person1) - self.session.commit() - self.assertEqual(self.session.query(model.Person).count(), 2) - - # simple redirect if invalid uuids specified - with patch.object( - self.request, - "POST", - new={ - "uuids": "bogus1,bogus2", - "execute-merge": "true", - }, - ): - with self.assertRaises(HTTPFound) as cm: - view.merge() - self.assertEqual( - cm.exception.location, "http://example.com/people/" - ) - self.assertEqual(self.session.query(model.Person).count(), 2) - self.assertFalse(self.request.session.peek_flash()) - self.assertFalse(self.request.session.peek_flash("warning")) - self.assertFalse(self.request.session.peek_flash("error")) - - # simple redirect if unknown uuids specified - fake1 = self.app.make_true_uuid() - fake2 = self.app.make_true_uuid() - with patch.object( - self.request, - "POST", - new={ - "uuids": f"{fake1},{fake2}", - "execute-merge": "true", - }, - ): - with self.assertRaises(HTTPFound) as cm: - view.merge() - self.assertEqual( - cm.exception.location, "http://example.com/people/" - ) - self.assertEqual(self.session.query(model.Person).count(), 2) - self.assertFalse(self.request.session.peek_flash()) - self.assertFalse(self.request.session.peek_flash("warning")) - self.assertFalse(self.request.session.peek_flash("error")) - - # warning redirect if merge does not validate - with patch.object( - self.request, - "POST", - new={ - "uuids": f"{person1.uuid},{person2.uuid}", - "execute-merge": "true", - }, - ): - with patch.object( - view, "merge_why_not", return_value="because i said so" - ): - response = view.merge() - self.assertIsInstance(response, Response) - self.assertEqual(self.session.query(model.Person).count(), 2) - self.assertFalse(self.request.session.peek_flash()) - self.assertFalse(self.request.session.peek_flash("error")) - # TODO: since response is already rendered, the warning flash - # msg has already been popped off the stack..will have to - # avoid render_to_response() to properly test that.. - self.assertFalse(self.request.session.peek_flash("warning")) - # self.assertEqual( - # self.request.session.pop_flash("warning"), - # [ - # HTML.literal( - # 'Merge cannot proceed:
' - # 'because i said so
' - # ), - # ], - # ) - - # error redirect if merge execution fails - with patch.object( - self.request, - "POST", - new={ - "uuids": f"{person1.uuid},{person2.uuid}", - "execute-merge": "true", - }, - ): - with patch.object(view, "merge_execute", side_effect=RuntimeError): - response = view.merge() - self.assertIsInstance(response, Response) - self.assertEqual(self.session.query(model.Person).count(), 2) - self.assertFalse(self.request.session.peek_flash()) - self.assertFalse(self.request.session.peek_flash("warning")) - # TODO: since response is already rendered, the error flash - # msg has already been popped off the stack..will have to - # avoid render_to_response() to properly test that.. - self.assertFalse(self.request.session.peek_flash("error")) - # self.assertEqual( - # self.request.session.pop_flash("error"), - # [ - # HTML.literal( - # 'Merge failed:
' - # 'RuntimeError
' - # ), - # ], - # ) - def test_autocomplete(self): model = self.app.model diff --git a/tests/views/test_people.py b/tests/views/test_people.py index 4344589..29f6694 100644 --- a/tests/views/test_people.py +++ b/tests/views/test_people.py @@ -154,43 +154,3 @@ class TestPersonView(WebTestCase): response = view.make_user() # nb. this always redirects for now self.assertEqual(response.status_code, 302) - - def test_merge_get_data(self): - model = self.app.model - - person = model.Person(full_name="Fred Flintstone") - self.session.add(person) - user = model.User(username="fred", person=person) - self.session.add(user) - self.session.flush() - - view = self.make_view() - data = view.merge_get_data(person) - self.assertIn("usernames", data) - self.assertEqual(data["usernames"], ["fred"]) - - def test_merge_execute(self): - model = self.app.model - - person1 = model.Person(full_name="Freddie Flintstone") - self.session.add(person1) - user1 = model.User(username="freddie", person=person1) - self.session.add(user1) - - person2 = model.Person(full_name="Fred Flintstone") - self.session.add(person2) - user2 = model.User(username="fred", person=person2) - self.session.add(user2) - - self.session.commit() - self.assertEqual(self.session.query(model.Person).count(), 2) - self.assertEqual(self.session.query(model.User).count(), 2) - - view = self.make_view() - with patch.object(view, "Session", return_value=self.session): - view.merge_execute(person1, person2) - self.assertEqual(self.session.query(model.Person).count(), 1) - self.assertEqual(self.session.query(model.User).count(), 2) - person = self.session.query(model.Person).one() - self.assertIs(person, person2) - self.assertEqual(len(person.users), 2) diff --git a/tests/views/test_users.py b/tests/views/test_users.py index b4eecb4..2fe08aa 100644 --- a/tests/views/test_users.py +++ b/tests/views/test_users.py @@ -1,6 +1,6 @@ # -*- coding: utf-8; -*- -from unittest.mock import patch, MagicMock +from unittest.mock import patch from sqlalchemy import orm @@ -8,7 +8,7 @@ import colander from wuttaweb.grids import Grid from wuttaweb.views import users as mod -from wuttaweb.testing import WebTestCase, VersionWebTestCase, FunctionalTestCase +from wuttaweb.testing import WebTestCase, FunctionalTestCase class TestUserView(WebTestCase): @@ -496,220 +496,6 @@ class TestUserView(WebTestCase): result = view.delete_api_token() self.assertEqual(result, {"error": "API token not found"}) - def test_merge_get_simple_fields(self): - view = self.make_view() - - # password field should not be included - fields = view.merge_get_simple_fields() - self.assertNotIn("password", fields) - self.assertIn("username", fields) - self.assertIn("active", fields) - - def test_merge_get_additive_fields(self): - view = self.make_view() - - # nb. this is not a "versioned" test case, so transaction_count - # field will not be included - fields = view.merge_get_additive_fields() - self.assertNotIn("transaction_count", fields) - self.assertIn("roles", fields) - - def test_merge_get_data(self): - model = self.app.model - auth = self.app.get_auth_handler() - view = self.make_view() - - admin = auth.get_role_administrator(self.session) - user = model.User(username="fred") - user.roles.append(admin) - self.session.add(user) - self.session.commit() - - # nb. this is not a "versioned" test case, so transaction_count - # field will not be included - data = view.merge_get_data(user) - self.assertEqual(data["username"], "fred") - self.assertEqual(data["roles"], [admin.name]) - self.assertNotIn("transaction_count", data) - - def test_merge_why_not(self): - model = self.app.model - view = self.make_view() - - user1 = model.User(username="freddie") - self.session.add(user1) - user2 = model.User(username="fred") - self.session.add(user2) - self.session.commit() - - # normally no reason not to merge - self.assertIsNone(view.merge_why_not(user1, user2)) - - # can merge even if current user is involved (being kept) - with patch.object(self.request, "user", new=user2): - self.assertIsNone(view.merge_why_not(user1, user2)) - - # but cannot merge if it means removing current user - with patch.object(self.request, "user", new=user1): - reason = view.merge_why_not(user1, user2) - self.assertEqual(reason, "Cannot remove user who is currently logged in!") - - def test_merge_execute(self): - model = self.app.model - enum = self.app.enum - auth = self.app.get_auth_handler() - view = self.make_view() - - admin = auth.get_role_administrator(self.session) - user1 = model.User(username="freddie") - user1.roles.append(admin) - self.session.add(user1) - user2 = model.User(username="fred") - self.session.add(user2) - - upgrade = model.Upgrade( - description="test", - created_by=user1, - executed_by=user1, - status=enum.UpgradeStatus.SUCCESS, - ) - self.session.add(upgrade) - self.session.commit() - - with patch.object(view, "Session", return_value=self.session): - view.merge_execute(user1, user2) - self.session.commit() - - self.assertEqual(self.session.query(model.User).count(), 1) - self.assertNotIn(user1, self.session) - self.assertIn(user2, self.session) - - self.assertIn(admin, user2.roles) - - self.assertIs(upgrade.created_by, user2) - self.assertIs(upgrade.executed_by, user2) - - -class TestVersionedUserView(VersionWebTestCase): - - def make_view(self): - return mod.UserView(self.request) - - def test_merge_get_additive_fields(self): - view = self.make_view() - - # nb. contrast this to the "non-versioned" test case above - fields = view.merge_get_additive_fields() - self.assertIn("transaction_count", fields) - self.assertIn("roles", fields) - - def test_merge_get_data(self): - import sqlalchemy_continuum as continuum - - model = self.app.model - auth = self.app.get_auth_handler() - txncls = continuum.transaction_class(model.User) - - # nb. must reset the User model reference, due to nature of - # test setup/teardown - with patch.multiple( - mod.UserView, - model_class=model.User, - Session=MagicMock(return_value=self.session), - ): - view = self.make_view() - - # make admin user - admin = auth.get_role_administrator(self.session) - user = model.User(username="fred") - user.roles.append(admin) - self.session.add(user) - self.session.commit() - self.assertEqual(self.session.query(txncls).count(), 1) - - # nb. contrast this to the "non-versioned" test case above - data = view.merge_get_data(user) - self.assertEqual(data["username"], "fred") - self.assertEqual(data["roles"], [admin.name]) - self.assertEqual(data["transaction_count"], 0) - - # admin user then creates 2 records w/ 1 txn - # nb. must trick wuttaweb continuum plugin to assign author - with patch.object(self.request, "user", new=user): - person1 = model.Person(full_name="Barney Rubble") - self.session.add(person1) - person2 = model.Person(full_name="Betty Rubble") - self.session.add(person2) - self.session.commit() - - self.assertEqual(self.session.query(txncls).count(), 2) - txn1, txn2 = self.session.query(txncls).order_by(txncls.id).all() - self.assertIsNone(txn1.user) - self.assertIs(txn2.user, user) - - # nb. contrast this to the "non-versioned" test case above - data = view.merge_get_data(user) - self.assertEqual(data["username"], "fred") - self.assertEqual(data["roles"], [admin.name]) - self.assertEqual(data["transaction_count"], 1) - - def test_merge_execute(self): - import sqlalchemy_continuum as continuum - - model = self.app.model - auth = self.app.get_auth_handler() - txncls = continuum.transaction_class(model.User) - - # nb. must reset the User model reference, due to nature of - # test setup/teardown - with patch.multiple( - mod.UserView, - model_class=model.User, - Session=MagicMock(return_value=self.session), - ): - view = self.make_view() - - # make pair of users - admin = auth.get_role_administrator(self.session) - user1 = model.User(username="freddie") - user1.roles.append(admin) - self.session.add(user1) - user2 = model.User(username="fred") - self.session.add(user2) - self.session.commit() - self.assertEqual(self.session.query(model.User).count(), 2) - self.assertEqual(self.session.query(txncls).count(), 1) - - # admin user then creates 2 records w/ 1 txn - # nb. must trick wuttaweb continuum plugin to assign author - with patch.object(self.request, "user", new=user1): - person1 = model.Person(full_name="Barney Rubble") - self.session.add(person1) - person2 = model.Person(full_name="Betty Rubble") - self.session.add(person2) - self.session.commit() - - self.assertEqual(self.session.query(txncls).count(), 2) - txn1, txn2 = self.session.query(txncls).order_by(txncls.id).all() - self.assertIsNone(txn1.user) - self.assertIs(txn2.user, user1) - self.assertEqual(len(user2.roles), 0) - - # merge user1 => user2 (as user2, for 3rd txn) - with patch.object(self.request, "user", new=user2): - view.merge_execute(user1, user2) - self.session.commit() - self.assertEqual(self.session.query(txncls).count(), 3) - txn1, txn2, txn3 = self.session.query(txncls).order_by(txncls.id).all() - self.assertIs(txn3.user, user2) - self.assertEqual(self.session.query(model.User).count(), 1) - user = self.session.query(model.User).one() - self.assertIs(user, user2) - - # user2 is now admin, and author of txn2 - self.assertIn(admin, user2.roles) - self.assertIs(txn2.user, user2) - # TODO: this test seems to work fine on its own, but not in conjunction # with the next class below. will have to sort this out before adding