feat: add the import-versions command, handler logic

only works if wutta-continuum is already installed and enabled.

this also rearranges some existing classes, for better consistency
This commit is contained in:
Lance Edgar 2025-12-18 20:03:47 -06:00
parent c38cd2c179
commit fc250a433c
19 changed files with 1345 additions and 76 deletions

View file

@ -2,6 +2,8 @@
from unittest.mock import patch
from sqlalchemy import orm
from wuttjamaican.testing import DataTestCase
from wuttasync.importing import base as mod, ImportHandler, Orientation
@ -78,13 +80,31 @@ class TestImporter(DataTestCase):
def test_get_keys(self):
model = self.app.model
# nb. get_keys() will cache the return value, so must
# re-create importer for each test
# keys inspected from model by default
imp = self.make_importer(model_class=model.Setting)
self.assertEqual(imp.get_keys(), ["name"])
with patch.multiple(imp, create=True, key="value"):
self.assertEqual(imp.get_keys(), ["value"])
with patch.multiple(imp, create=True, keys=["foo", "bar"]):
imp = self.make_importer(model_class=model.User)
self.assertEqual(imp.get_keys(), ["uuid"])
# class may define 'keys'
imp = self.make_importer(model_class=model.User)
with patch.object(imp, "keys", new=["foo", "bar"], create=True):
self.assertEqual(imp.get_keys(), ["foo", "bar"])
# class may define 'key'
imp = self.make_importer(model_class=model.User)
with patch.object(imp, "key", new="whatever", create=True):
self.assertEqual(imp.get_keys(), ["whatever"])
# class may define 'default_keys'
imp = self.make_importer(model_class=model.User)
with patch.object(imp, "default_keys", new=["baz", "foo"]):
self.assertEqual(imp.get_keys(), ["baz", "foo"])
def test_process_data(self):
model = self.app.model
imp = self.make_importer(
@ -651,6 +671,105 @@ class TestFromFile(DataTestCase):
close.assert_called_once_with()
class TestQueryWrapper(DataTestCase):
def test_basic(self):
model = self.app.model
p1 = model.Person(full_name="John Doe")
self.session.add(p1)
p2 = model.Person(full_name="Jane Doe")
self.session.add(p2)
self.session.commit()
# cannot get count via len(query), must use query.count()
query = self.session.query(model.Person)
self.assertEqual(query.count(), 2)
self.assertRaises(TypeError, len, query)
# but can use len(wrapper)
wrapper = mod.QueryWrapper(query)
self.assertEqual(len(wrapper), 2)
# iter(wrapper) should work too
people = [p for p in wrapper]
self.assertEqual(people, [p1, p2])
people = [p for p in iter(wrapper)]
self.assertEqual(people, [p1, p2])
people = [p for p in list(wrapper)]
self.assertEqual(people, [p1, p2])
class TestFromSqlalchemy(DataTestCase):
def setUp(self):
self.setup_db()
self.handler = ImportHandler(self.config)
def make_importer(self, **kwargs):
kwargs.setdefault("handler", self.handler)
return mod.FromSqlalchemy(self.config, **kwargs)
def test_get_source_query(self):
model = self.app.model
imp = self.make_importer(
source_model_class=model.Upgrade, source_session=self.session
)
query = imp.get_source_query()
self.assertIsInstance(query, orm.Query)
self.assertEqual(len(query.selectable.froms), 1)
table = query.selectable.froms[0]
self.assertEqual(table.name, "upgrade")
def test_get_source_objects(self):
model = self.app.model
user1 = model.User(username="fred")
self.session.add(user1)
user2 = model.User(username="bettie")
self.session.add(user2)
self.session.commit()
imp = self.make_importer(
source_model_class=model.User, source_session=self.session
)
result = imp.get_source_objects()
self.assertIsInstance(result, mod.QueryWrapper)
self.assertEqual(len(result), 2)
self.assertEqual(list(result), [user1, user2])
class TestFromSqlalchemyMirror(DataTestCase):
def setUp(self):
self.setup_db()
self.handler = ImportHandler(self.config)
def make_importer(self, **kwargs):
kwargs.setdefault("handler", self.handler)
return mod.FromSqlalchemyMirror(self.config, **kwargs)
def test_source_model_class(self):
model = self.app.model
# source_model_class will mirror model_class
imp = self.make_importer(model_class=model.Upgrade)
self.assertIs(imp.model_class, model.Upgrade)
self.assertIs(imp.source_model_class, model.Upgrade)
def test_normalize_source_object(self):
model = self.app.model
imp = self.make_importer(model_class=model.Upgrade)
upgrade = model.Upgrade()
# normalize_source_object() should invoke normalize_target_object()
with patch.object(imp, "normalize_target_object") as normalize_target_object:
normalize_target_object.return_value = 42
result = imp.normalize_source_object(upgrade)
self.assertEqual(result, 42)
normalize_target_object.assert_called_once_with(upgrade)
class TestToSqlalchemy(DataTestCase):
def setUp(self):

View file

@ -213,6 +213,97 @@ class TestFromFileHandler(DataTestCase):
process_data.assert_called_once_with(input_file_dir=self.tempdir)
class TestFromSqlalchemyHandler(DataTestCase):
def make_handler(self, **kwargs):
return mod.FromSqlalchemyHandler(self.config, **kwargs)
def test_make_source_session(self):
handler = self.make_handler()
self.assertRaises(NotImplementedError, handler.make_source_session)
def test_begin_source_transaction(self):
handler = self.make_handler()
self.assertIsNone(handler.source_session)
with patch.object(handler, "make_source_session", return_value=self.session):
handler.begin_source_transaction()
self.assertIs(handler.source_session, self.session)
def test_commit_source_transaction(self):
model = self.app.model
handler = self.make_handler()
handler.source_session = self.session
self.assertEqual(self.session.query(model.User).count(), 0)
# nb. do not commit this yet
user = model.User(username="fred")
self.session.add(user)
self.assertTrue(self.session.in_transaction())
self.assertIn(user, self.session)
handler.commit_source_transaction()
self.assertIsNone(handler.source_session)
self.assertFalse(self.session.in_transaction())
self.assertNotIn(user, self.session) # hm, surprising?
self.assertEqual(self.session.query(model.User).count(), 1)
def test_rollback_source_transaction(self):
model = self.app.model
handler = self.make_handler()
handler.source_session = self.session
self.assertEqual(self.session.query(model.User).count(), 0)
# nb. do not commit this yet
user = model.User(username="fred")
self.session.add(user)
self.assertTrue(self.session.in_transaction())
self.assertIn(user, self.session)
handler.rollback_source_transaction()
self.assertIsNone(handler.source_session)
self.assertFalse(self.session.in_transaction())
self.assertNotIn(user, self.session)
self.assertEqual(self.session.query(model.User).count(), 0)
def test_get_importer_kwargs(self):
handler = self.make_handler()
handler.source_session = self.session
kw = handler.get_importer_kwargs("User")
self.assertIn("source_session", kw)
self.assertIs(kw["source_session"], self.session)
class TestFromWuttaHandler(DataTestCase):
def make_handler(self, **kwargs):
return mod.FromWuttaHandler(self.config, **kwargs)
def test_get_source_title(self):
handler = self.make_handler()
# uses app title by default
self.config.setdefault("wutta.app_title", "What About This")
self.assertEqual(handler.get_source_title(), "What About This")
# or generic default if present
handler.generic_source_title = "WHATABOUTTHIS"
self.assertEqual(handler.get_source_title(), "WHATABOUTTHIS")
# but prefer specific title if present
handler.source_title = "what_about_this"
self.assertEqual(handler.get_source_title(), "what_about_this")
def test_make_source_session(self):
handler = self.make_handler()
# makes "new" (mocked in our case) app session
with patch.object(self.app, "make_session") as make_session:
make_session.return_value = self.session
session = handler.make_source_session()
make_session.assert_called_once_with()
self.assertIs(session, self.session)
class TestToSqlalchemyHandler(DataTestCase):
def make_handler(self, **kwargs):
@ -256,3 +347,34 @@ class TestToSqlalchemyHandler(DataTestCase):
kw = handler.get_importer_kwargs("Setting")
self.assertIn("target_session", kw)
self.assertIs(kw["target_session"], self.session)
class TestToWuttaHandler(DataTestCase):
def make_handler(self, **kwargs):
return mod.ToWuttaHandler(self.config, **kwargs)
def test_get_target_title(self):
handler = self.make_handler()
# uses app title by default
self.config.setdefault("wutta.app_title", "What About This")
self.assertEqual(handler.get_target_title(), "What About This")
# or generic default if present
handler.generic_target_title = "WHATABOUTTHIS"
self.assertEqual(handler.get_target_title(), "WHATABOUTTHIS")
# but prefer specific title if present
handler.target_title = "what_about_this"
self.assertEqual(handler.get_target_title(), "what_about_this")
def test_make_target_session(self):
handler = self.make_handler()
# makes "new" (mocked in our case) app session
with patch.object(self.app, "make_session") as make_session:
make_session.return_value = self.session
session = handler.make_target_session()
make_session.assert_called_once_with()
self.assertIs(session, self.session)

View file

@ -0,0 +1,247 @@
# -*- coding: utf-8; -*-
from sqlalchemy import orm
import sqlalchemy_continuum as continuum
from wuttjamaican.util import make_true_uuid
from wutta_continuum.testing import VersionTestCase
from wuttasync.importing import versions as mod, Importer
class TestFromWuttaToVersions(VersionTestCase):
def make_handler(self, **kwargs):
return mod.FromWuttaToVersions(self.config, **kwargs)
def test_begin_target_transaction(self):
model = self.app.model
txncls = continuum.transaction_class(model.User)
handler = self.make_handler()
self.assertIsNone(handler.continuum_uow)
self.assertIsNone(handler.continuum_txn)
handler.begin_target_transaction()
self.assertIsInstance(handler.continuum_uow, continuum.UnitOfWork)
self.assertIsInstance(handler.continuum_txn, txncls)
def test_get_importer_kwargs(self):
handler = self.make_handler()
handler.begin_target_transaction()
kw = handler.get_importer_kwargs("User")
self.assertIn("continuum_txn", kw)
self.assertIs(kw["continuum_txn"], handler.continuum_txn)
def test_make_importer_factory(self):
model = self.app.model
handler = self.make_handler()
# versioned class
factory = handler.make_importer_factory(model.User, "User")
self.assertTrue(issubclass(factory, mod.FromWuttaToVersionBase))
self.assertIs(factory.source_model_class, model.User)
self.assertIs(factory.model_class, continuum.version_class(model.User))
# non-versioned
factory = handler.make_importer_factory(model.Upgrade, "Upgrade")
self.assertIsNone(factory)
def test_define_importers(self):
handler = self.make_handler()
importers = handler.define_importers()
self.assertIn("User", importers)
self.assertIn("Person", importers)
self.assertNotIn("Upgrade", importers)
class UserImporter(mod.FromWuttaToVersionBase):
@property
def model_class(self):
model = self.app.model
return model.User
class TestFromWuttaToVersionBase(VersionTestCase):
def make_importer(self, model_class=None, **kwargs):
imp = mod.FromWuttaToVersionBase(self.config, **kwargs)
if model_class:
imp.model_class = model_class
return imp
def test_get_simple_fields(self):
model = self.app.model
vercls = continuum.version_class(model.User)
# first confirm what a "normal" importer would do
imp = Importer(self.config, model_class=vercls)
fields = imp.get_simple_fields()
self.assertIn("username", fields)
self.assertIn("person_uuid", fields)
self.assertIn("transaction_id", fields)
self.assertIn("operation_type", fields)
self.assertIn("end_transaction_id", fields)
# now test what the "version" importer does
imp = self.make_importer(model_class=vercls)
fields = imp.get_simple_fields()
self.assertIn("username", fields)
self.assertIn("person_uuid", fields)
self.assertNotIn("transaction_id", fields)
self.assertNotIn("operation_type", fields)
self.assertNotIn("end_transaction_id", fields)
def test_get_target_query(self):
model = self.app.model
vercls = continuum.version_class(model.User)
imp = self.make_importer(model_class=vercls, target_session=self.session)
# TODO: not sure what else to test here..
query = imp.get_target_query()
self.assertIsInstance(query, orm.Query)
def test_normalize_target_object(self):
model = self.app.model
vercls = continuum.version_class(model.User)
imp = self.make_importer(model_class=vercls)
user = model.User(username="fred")
self.session.add(user)
self.session.commit()
version = user.versions[0]
# version object should be embedded in data dict
data = imp.normalize_target_object(version)
self.assertIsInstance(data, dict)
self.assertIn("_version", data)
self.assertIs(data["_version"], version)
# but normal object is not embedded
data = imp.normalize_target_object(user)
self.assertIsInstance(data, dict)
self.assertNotIn("_version", data)
def test_make_version(self):
model = self.app.model
vercls = continuum.version_class(model.User)
user = model.User(username="fred")
self.session.add(user)
self.session.commit()
handler = mod.FromWuttaToVersions(self.config)
handler.begin_target_transaction()
handler.target_session.close()
handler.target_session = self.session
imp = self.make_importer(
model_class=vercls,
fields=["uuid", "username"],
keys=("uuid",),
target_session=self.session,
continuum_txn=handler.continuum_txn,
)
data = {"uuid": user.uuid, "username": "freddie"}
version = imp.make_version(data, continuum.Operation.UPDATE)
self.assertIsInstance(version, vercls)
self.assertEqual(version.uuid, user.uuid)
self.assertEqual(version.username, "freddie")
self.assertIn(version, self.session)
self.assertIs(version.transaction, imp.continuum_txn)
self.assertEqual(version.operation_type, continuum.Operation.UPDATE)
def test_create_target_object(self):
model = self.app.model
vercls = continuum.version_class(model.User)
handler = mod.FromWuttaToVersions(self.config)
handler.begin_target_transaction()
handler.target_session.close()
handler.target_session = self.session
imp = self.make_importer(
model_class=vercls,
fields=["uuid", "username"],
keys=("uuid",),
target_session=self.session,
continuum_txn=handler.continuum_txn,
)
source_data = {"uuid": make_true_uuid(), "username": "bettie"}
self.assertEqual(self.session.query(vercls).count(), 0)
version = imp.create_target_object((source_data["uuid"], 1), source_data)
self.assertEqual(self.session.query(vercls).count(), 1)
self.assertEqual(version.transaction_id, imp.continuum_txn.id)
self.assertEqual(version.operation_type, continuum.Operation.INSERT)
self.assertIsNone(version.end_transaction_id)
def test_update_target_object(self):
model = self.app.model
vercls = continuum.version_class(model.User)
user = model.User(username="fred")
self.session.add(user)
self.session.commit()
version1 = user.versions[0]
handler = mod.FromWuttaToVersions(self.config)
handler.begin_target_transaction()
handler.target_session.close()
handler.target_session = self.session
imp = self.make_importer(
model_class=vercls,
fields=["uuid", "username"],
keys=("uuid",),
target_session=self.session,
continuum_txn=handler.continuum_txn,
)
source_data = {"uuid": user.uuid, "username": "freddie"}
target_data = imp.normalize_target_object(version1)
self.assertEqual(self.session.query(vercls).count(), 1)
self.assertIsNone(version1.end_transaction_id)
version2 = imp.update_target_object(
version1, source_data, target_data=target_data
)
self.assertEqual(self.session.query(vercls).count(), 2)
self.assertEqual(version1.end_transaction_id, imp.continuum_txn.id)
self.assertEqual(version2.transaction_id, imp.continuum_txn.id)
self.assertEqual(version2.operation_type, continuum.Operation.UPDATE)
self.assertIsNone(version2.end_transaction_id)
def test_delete_target_object(self):
model = self.app.model
vercls = continuum.version_class(model.User)
user = model.User(username="fred")
self.session.add(user)
self.session.commit()
version1 = user.versions[0]
handler = mod.FromWuttaToVersions(self.config)
handler.begin_target_transaction()
handler.target_session.close()
handler.target_session = self.session
imp = self.make_importer(
model_class=vercls,
fields=["uuid", "username"],
keys=("uuid",),
target_session=self.session,
continuum_txn=handler.continuum_txn,
)
self.assertEqual(self.session.query(vercls).count(), 1)
self.assertIsNone(version1.end_transaction_id)
version2 = imp.delete_target_object(version1)
self.assertEqual(self.session.query(vercls).count(), 2)
self.assertEqual(version1.end_transaction_id, imp.continuum_txn.id)
self.assertEqual(version2.transaction_id, imp.continuum_txn.id)
self.assertEqual(version2.operation_type, continuum.Operation.DELETE)
self.assertIsNone(version2.end_transaction_id)

View file

@ -1,38 +1,3 @@
# -*- coding: utf-8; -*-
from unittest.mock import patch
from wuttjamaican.testing import DataTestCase
from wuttasync.importing import wutta as mod
class TestToWuttaHandler(DataTestCase):
def make_handler(self, **kwargs):
return mod.ToWuttaHandler(self.config, **kwargs)
def test_get_target_title(self):
handler = self.make_handler()
# uses app title by default
self.config.setdefault("wutta.app_title", "What About This")
self.assertEqual(handler.get_target_title(), "What About This")
# or generic default if present
handler.generic_target_title = "WHATABOUTTHIS"
self.assertEqual(handler.get_target_title(), "WHATABOUTTHIS")
# but prefer specific title if present
handler.target_title = "what_about_this"
self.assertEqual(handler.get_target_title(), "what_about_this")
def test_make_target_session(self):
handler = self.make_handler()
# makes "new" (mocked in our case) app session
with patch.object(self.app, "make_session") as make_session:
make_session.return_value = self.session
session = handler.make_target_session()
make_session.assert_called_once_with()
self.assertIs(session, self.session)