wuttasync/tests/importing/test_csv.py

437 lines
15 KiB
Python
Raw Normal View History

# -*- coding: utf-8; -*-
import csv
import datetime
import decimal
import uuid as _uuid
from unittest import TestCase
from unittest.mock import patch
import sqlalchemy as sa
from sqlalchemy import orm
from wuttjamaican.testing import DataTestCase
from wuttasync.importing import (
csv as mod,
ImportHandler,
ToSqlalchemyHandler,
ToSqlalchemy,
)
class TestFromCsv(DataTestCase):
def setUp(self):
self.setup_db()
self.handler = ImportHandler(self.config)
self.data_path = self.write_file(
"data.txt",
"""\
2024-12-05 21:19:06 -06:00
name,value
foo,bar
foo2,bar2
""",
)
2024-12-05 21:19:06 -06:00
def make_importer(self, **kwargs):
kwargs.setdefault("handler", self.handler)
return mod.FromCsv(self.config, **kwargs)
def test_get_input_file_name(self):
model = self.app.model
imp = self.make_importer(model_class=model.Setting)
# name can be guessed
self.assertEqual(imp.get_input_file_name(), "Setting.csv")
# name can be explicitly set
imp.input_file_name = "data.txt"
self.assertEqual(imp.get_input_file_name(), "data.txt")
def test_open_input_file(self):
model = self.app.model
imp = self.make_importer(model_class=model.Setting)
2024-12-05 21:19:06 -06:00
# normal operation, input file includes all fields
imp = self.make_importer(
model_class=model.Setting, input_file_path=self.data_path
)
self.assertEqual(imp.fields, ["name", "value"])
imp.open_input_file()
2024-12-05 21:19:06 -06:00
self.assertEqual(imp.input_file.name, self.data_path)
self.assertIsInstance(imp.input_reader, csv.DictReader)
self.assertEqual(imp.fields, ["name", "value"])
imp.input_file.close()
2024-12-05 21:19:06 -06:00
# this file is missing a field, plus we'll pretend more are
# supported - but should wind up with just the one field
missing = self.write_file("missing.txt", "name")
2024-12-05 21:19:06 -06:00
imp = self.make_importer(model_class=model.Setting, input_file_path=missing)
imp.fields.extend(["lots", "more"])
self.assertEqual(imp.fields, ["name", "value", "lots", "more"])
2024-12-05 21:19:06 -06:00
imp.open_input_file()
self.assertEqual(imp.fields, ["name"])
2024-12-05 21:19:06 -06:00
imp.input_file.close()
# and what happens when no known fields are found
bogus = self.write_file("bogus.txt", "blarg")
2024-12-05 21:19:06 -06:00
imp = self.make_importer(model_class=model.Setting, input_file_path=bogus)
self.assertEqual(imp.fields, ["name", "value"])
2024-12-05 21:19:06 -06:00
self.assertRaises(ValueError, imp.open_input_file)
def test_close_input_file(self):
model = self.app.model
imp = self.make_importer(model_class=model.Setting)
2024-12-05 21:19:06 -06:00
imp.input_file_path = self.data_path
imp.open_input_file()
imp.close_input_file()
self.assertIsNone(imp.input_reader)
self.assertIsNone(imp.input_file)
def test_get_source_objects(self):
model = self.app.model
imp = self.make_importer(model_class=model.Setting)
2024-12-05 21:19:06 -06:00
imp.input_file_path = self.data_path
imp.open_input_file()
objects = imp.get_source_objects()
imp.close_input_file()
self.assertEqual(len(objects), 2)
self.assertEqual(objects[0], {"name": "foo", "value": "bar"})
self.assertEqual(objects[1], {"name": "foo2", "value": "bar2"})
class MockMixinImporter(mod.FromCsvToSqlalchemyMixin, mod.FromCsv, ToSqlalchemy):
pass
class TestFromCsvToSqlalchemyMixin(DataTestCase):
def setUp(self):
self.setup_db()
self.handler = ImportHandler(self.config)
def make_importer(self, **kwargs):
kwargs.setdefault("handler", self.handler)
return MockMixinImporter(self.config, **kwargs)
def test_constructor(self):
model = self.app.model
# no coercers
imp = self.make_importer(model_class=model.Setting)
self.assertEqual(imp.coercers, {})
# typical
imp = self.make_importer(
model_class=model.Upgrade, coercers=mod.make_coercers(model.Setting)
)
self.assertEqual(len(imp.coercers), 2)
def test_normalize_source_object(self):
model = self.app.model
# no uuid keys
imp = self.make_importer(model_class=model.Setting)
result = imp.normalize_source_object({"name": "foo", "value": "bar"})
self.assertEqual(result, {"name": "foo", "value": "bar"})
# source has proper UUID
imp = self.make_importer(
model_class=model.Upgrade,
fields=["uuid", "description"],
coercers=mod.make_coercers(model.Upgrade),
)
result = imp.normalize_source_object(
{
"uuid": "06753693-d892-77f0-8000-ce71bf7ebbba",
"description": "testing",
}
)
self.assertEqual(
result,
{
"uuid": _uuid.UUID("06753693-d892-77f0-8000-ce71bf7ebbba"),
"description": "testing",
},
)
# source has string uuid
imp = self.make_importer(
model_class=model.Upgrade,
fields=["uuid", "description"],
coercers=mod.make_coercers(model.Upgrade),
)
result = imp.normalize_source_object(
{"uuid": "06753693d89277f08000ce71bf7ebbba", "description": "testing"}
)
self.assertEqual(
result,
{
"uuid": _uuid.UUID("06753693-d892-77f0-8000-ce71bf7ebbba"),
"description": "testing",
},
)
# source has boolean true/false
imp = self.make_importer(
model_class=model.Upgrade,
fields=["uuid", "executing"],
coercers=mod.make_coercers(model.Upgrade),
)
result = imp.normalize_source_object(
{"uuid": "06753693d89277f08000ce71bf7ebbba", "executing": "True"}
)
self.assertEqual(
result,
{
"uuid": _uuid.UUID("06753693-d892-77f0-8000-ce71bf7ebbba"),
"executing": True,
},
)
result = imp.normalize_source_object(
{"uuid": "06753693d89277f08000ce71bf7ebbba", "executing": "false"}
)
self.assertEqual(
result,
{
"uuid": _uuid.UUID("06753693-d892-77f0-8000-ce71bf7ebbba"),
"executing": False,
},
)
class MockMixinHandler(mod.FromCsvToSqlalchemyHandlerMixin, ToSqlalchemyHandler):
ToImporterBase = ToSqlalchemy
class TestFromCsvToSqlalchemyHandlerMixin(DataTestCase):
def make_handler(self, **kwargs):
return MockMixinHandler(self.config, **kwargs)
def test_get_target_model(self):
with patch.object(
mod.FromCsvToSqlalchemyHandlerMixin, "define_importers", return_value={}
):
handler = self.make_handler()
self.assertRaises(NotImplementedError, handler.get_target_model)
def test_define_importers(self):
model = self.app.model
with patch.object(
mod.FromCsvToSqlalchemyHandlerMixin, "get_target_model", return_value=model
):
handler = self.make_handler()
importers = handler.define_importers()
self.assertIn("Setting", importers)
self.assertTrue(issubclass(importers["Setting"], mod.FromCsv))
self.assertTrue(issubclass(importers["Setting"], ToSqlalchemy))
self.assertIn("User", importers)
self.assertIn("Person", importers)
self.assertIn("Role", importers)
def test_make_importer_factory(self):
model = self.app.model
with patch.object(
mod.FromCsvToSqlalchemyHandlerMixin, "define_importers", return_value={}
):
handler = self.make_handler()
factory = handler.make_importer_factory(model.Setting, "Setting")
self.assertTrue(issubclass(factory, mod.FromCsv))
self.assertTrue(issubclass(factory, ToSqlalchemy))
self.assertTrue(isinstance(factory.coercers, dict))
class TestFromCsvToWutta(DataTestCase):
def make_handler(self, **kwargs):
return mod.FromCsvToWutta(self.config, **kwargs)
def test_get_target_model(self):
handler = self.make_handler()
self.assertIs(handler.get_target_model(), self.app.model)
Base = orm.declarative_base()
class Example(Base):
__tablename__ = "example"
id = sa.Column(sa.Integer(), primary_key=True, nullable=False)
optional_id = sa.Column(sa.Integer(), nullable=True)
name = sa.Column(sa.String(length=100), nullable=False)
optional_name = sa.Column(sa.String(length=100), nullable=True)
flag = sa.Column(sa.Boolean(), nullable=False)
optional_flag = sa.Column(sa.Boolean(), nullable=True)
dt = sa.Column(sa.DateTime(), nullable=False)
optional_dt = sa.Column(sa.DateTime(), nullable=True)
dec = sa.Column(sa.Numeric(scale=8, precision=2), nullable=False)
optional_dec = sa.Column(sa.Numeric(scale=8, precision=2), nullable=True)
flt = sa.Column(sa.Float(), nullable=False)
optional_flt = sa.Column(sa.Float(), nullable=True)
class TestMakeCoercers(TestCase):
def test_basic(self):
coercers = mod.make_coercers(Example)
self.assertEqual(len(coercers), 12)
self.assertIs(coercers["id"], mod.coerce_integer)
self.assertIs(coercers["optional_id"], mod.coerce_integer)
self.assertIs(coercers["name"], mod.coerce_noop)
self.assertIs(coercers["optional_name"], mod.coerce_string_nullable)
self.assertIs(coercers["flag"], mod.coerce_boolean)
self.assertIs(coercers["optional_flag"], mod.coerce_boolean_nullable)
self.assertIs(coercers["dt"], mod.coerce_datetime)
self.assertIs(coercers["optional_dt"], mod.coerce_datetime)
self.assertIs(coercers["dec"], mod.coerce_decimal)
self.assertIs(coercers["optional_dec"], mod.coerce_decimal)
self.assertIs(coercers["flt"], mod.coerce_float)
self.assertIs(coercers["optional_flt"], mod.coerce_float)
class TestMakeCoercer(TestCase):
def test_basic(self):
func = mod.make_coercer(Example.id)
self.assertIs(func, mod.coerce_integer)
func = mod.make_coercer(Example.optional_id)
self.assertIs(func, mod.coerce_integer)
func = mod.make_coercer(Example.name)
self.assertIs(func, mod.coerce_noop)
func = mod.make_coercer(Example.optional_name)
self.assertIs(func, mod.coerce_string_nullable)
func = mod.make_coercer(Example.flag)
self.assertIs(func, mod.coerce_boolean)
func = mod.make_coercer(Example.optional_flag)
self.assertIs(func, mod.coerce_boolean_nullable)
func = mod.make_coercer(Example.dt)
self.assertIs(func, mod.coerce_datetime)
func = mod.make_coercer(Example.optional_dt)
self.assertIs(func, mod.coerce_datetime)
func = mod.make_coercer(Example.dec)
self.assertIs(func, mod.coerce_decimal)
func = mod.make_coercer(Example.optional_dec)
self.assertIs(func, mod.coerce_decimal)
func = mod.make_coercer(Example.flt)
self.assertIs(func, mod.coerce_float)
func = mod.make_coercer(Example.optional_flt)
self.assertIs(func, mod.coerce_float)
class TestCoercers(TestCase):
def test_coerce_boolean(self):
self.assertTrue(mod.coerce_boolean("true"))
self.assertTrue(mod.coerce_boolean("1"))
self.assertTrue(mod.coerce_boolean("yes"))
self.assertFalse(mod.coerce_boolean("false"))
self.assertFalse(mod.coerce_boolean("0"))
self.assertFalse(mod.coerce_boolean("no"))
self.assertFalse(mod.coerce_boolean(""))
def test_coerce_boolean_nullable(self):
self.assertTrue(mod.coerce_boolean_nullable("true"))
self.assertTrue(mod.coerce_boolean_nullable("1"))
self.assertTrue(mod.coerce_boolean_nullable("yes"))
self.assertFalse(mod.coerce_boolean_nullable("false"))
self.assertFalse(mod.coerce_boolean_nullable("0"))
self.assertFalse(mod.coerce_boolean_nullable("no"))
self.assertIsNone(mod.coerce_boolean_nullable(""))
def test_coerce_datetime(self):
self.assertIsNone(mod.coerce_datetime(""))
value = mod.coerce_datetime("2025-10-19 20:56:00")
self.assertIsInstance(value, datetime.datetime)
self.assertEqual(value, datetime.datetime(2025, 10, 19, 20, 56))
value = mod.coerce_datetime("2025-10-19 20:56:00.1234")
self.assertIsInstance(value, datetime.datetime)
self.assertEqual(value, datetime.datetime(2025, 10, 19, 20, 56, 0, 123400))
self.assertRaises(ValueError, mod.coerce_datetime, "XXX")
def test_coerce_decimal(self):
self.assertIsNone(mod.coerce_decimal(""))
value = mod.coerce_decimal("42")
self.assertIsInstance(value, decimal.Decimal)
self.assertEqual(value, decimal.Decimal("42.0"))
self.assertEqual(value, 42)
value = mod.coerce_decimal("42.0")
self.assertIsInstance(value, decimal.Decimal)
self.assertEqual(value, decimal.Decimal("42.0"))
self.assertEqual(value, 42)
self.assertRaises(decimal.InvalidOperation, mod.coerce_decimal, "XXX")
def test_coerce_float(self):
self.assertEqual(mod.coerce_float("42"), 42.0)
self.assertEqual(mod.coerce_float("42.0"), 42.0)
self.assertIsNone(mod.coerce_float(""))
self.assertRaises(ValueError, mod.coerce_float, "XXX")
def test_coerce_integer(self):
self.assertEqual(mod.coerce_integer("42"), 42)
self.assertRaises(ValueError, mod.coerce_integer, "42.0")
self.assertIsNone(mod.coerce_integer(""))
self.assertRaises(ValueError, mod.coerce_integer, "XXX")
def test_coerce_noop(self):
self.assertEqual(mod.coerce_noop(""), "")
self.assertEqual(mod.coerce_noop("42"), "42")
self.assertEqual(mod.coerce_noop("XXX"), "XXX")
def test_coerce_string_nullable(self):
self.assertIsNone(mod.coerce_string_nullable(""))
self.assertEqual(mod.coerce_string_nullable("42"), "42")
self.assertEqual(mod.coerce_string_nullable("XXX"), "XXX")
def test_coerce_uuid(self):
self.assertIsNone(mod.coerce_uuid(""))
uuid = mod.coerce_uuid("06753693d89277f08000ce71bf7ebbba")
self.assertIsInstance(uuid, _uuid.UUID)
self.assertEqual(uuid, _uuid.UUID("06753693d89277f08000ce71bf7ebbba"))
self.assertEqual(uuid.hex, "06753693d89277f08000ce71bf7ebbba")
uuid = mod.coerce_uuid("06753693-d892-77f0-8000-ce71bf7ebbba")
self.assertIsInstance(uuid, _uuid.UUID)
self.assertEqual(uuid, _uuid.UUID("06753693-d892-77f0-8000-ce71bf7ebbba"))
self.assertEqual(str(uuid), "06753693-d892-77f0-8000-ce71bf7ebbba")
self.assertEqual(uuid.hex, "06753693d89277f08000ce71bf7ebbba")