3
0
Fork 0

feat: add "wizard" for creating new table/model/revision

This commit is contained in:
Lance Edgar 2025-12-28 14:20:52 -06:00
parent b1ebe1095e
commit e12c523176
10 changed files with 1687 additions and 25 deletions

View file

@ -42,6 +42,7 @@ dependencies = [
"pyramid_fanstatic", "pyramid_fanstatic",
"pyramid_mako", "pyramid_mako",
"pyramid_tm", "pyramid_tm",
"SQLAlchemy-Utils",
"waitress", "waitress",
"WebHelpers2", "WebHelpers2",
"WuttJamaican[db]>=0.27.0", "WuttJamaican[db]>=0.27.0",

View file

@ -0,0 +1,68 @@
## -*- coding: utf-8; mode: python; -*-
# -*- coding: utf-8; -*-
"""
Model definition for ${model_title_plural}
"""
import sqlalchemy as sa
from sqlalchemy import orm
from wuttjamaican.db import model
class ${model_name}(model.Base):
"""
${description}
"""
__tablename__ = "${table_name}"
% if any([c["data_type"]["type"] == "_fk_uuid_" for c in columns]):
__table_args__ = (
% for column in columns:
% if column["data_type"]["type"] == "_fk_uuid_":
sa.ForeignKeyConstraint(["${column['name']}"], ["${column['data_type']['reference']}.uuid"],
name="${table_name}_fk_${column['data_type']['reference']}"),
% endif
% endfor
)
% endif
% if versioned:
% if all([c["versioned"] for c in columns]):
__versioned__ = {}
% else:
__versioned__ = {
"exclude": [
% for column in columns:
% if not column["versioned"]:
"${column['name']}",
% endif
% endfor
],
}
% endif
% endif
__wutta_hint__ = {
"model_title": "${model_title}",
"model_title_plural": "${model_title_plural}",
}
% for column in columns:
% if column["name"] == "uuid":
uuid = model.uuid_column()
% else:
${column["name"]} = sa.Column(${column["formatted_data_type"]}, nullable=${column["nullable"]}, doc="""
${column["description"] or ""}
""")
% if column["data_type"]["type"] == "_fk_uuid_" and column["relationship"]:
${column["relationship"]["name"]} = orm.relationship(
"${column['relationship']['reference_model']}",
doc="""
${column["description"] or ""}
""")
% endif
% endif
% endfor
# TODO: you usually should define the __str__() method
# def __str__(self):
# return self.name or ""

View file

@ -27,11 +27,18 @@
<div> <div>
<div class="buttons"> <div class="buttons">
% if request.has_perm("tables.list"): % if request.has_perm("app_tables.list"):
<wutta-button type="is-primary" <wutta-button type="is-primary"
tag="a" href="${url('tables')}" tag="a" href="${url('app_tables')}"
icon-left="table" icon-left="table"
label="Database Tables" label="App Tables"
once />
% endif
% if request.has_perm("app_tables.create"):
<wutta-button type="is-primary"
tag="a" href="${url('app_tables.create')}"
icon-left="plus"
label="New Table"
once /> once />
% endif % endif
</div> </div>

View file

@ -12,11 +12,11 @@
once /> once />
% endif % endif
% if request.has_perm("tables.list"): % if request.has_perm("app_tables.list"):
<wutta-button type="is-primary" <wutta-button type="is-primary"
tag="a" href="${url('tables')}" tag="a" href="${url('app_tables')}"
icon-left="table" icon-left="table"
label="Database Tables" label="App Tables"
once /> once />
% endif % endif

View file

@ -34,11 +34,11 @@
<div class="buttons"> <div class="buttons">
% if request.has_perm("tables.list"): % if request.has_perm("app_tables.list"):
<wutta-button type="is-primary" <wutta-button type="is-primary"
tag="a" href="${url('tables')}" tag="a" href="${url('app_tables')}"
icon-left="table" icon-left="table"
label="Database Tables" label="App Tables"
once /> once />
% endif % endif

File diff suppressed because it is too large Load diff

View file

@ -443,14 +443,18 @@ class AlembicMigrationView(MasterView): # pylint: disable=abstract-method
return form return form
def validate_revise_branch(self, node, value): def validate_revise_branch( # pylint: disable=missing-function-docstring
self, node, value
):
if value["branching_option"] == "revise": if value["branching_option"] == "revise":
if not value["revise_branch"]: if not value["revise_branch"]:
node["revise_branch"].raise_invalid( node["revise_branch"].raise_invalid(
"Must specify which branch to revise." "Must specify which branch to revise."
) )
def validate_new_branch(self, node, value): def validate_new_branch( # pylint: disable=missing-function-docstring
self, node, value
):
if value["branching_option"] == "new": if value["branching_option"] == "new":
if not value["new_branch"]: if not value["new_branch"]:
@ -464,7 +468,6 @@ class AlembicMigrationView(MasterView): # pylint: disable=abstract-method
def save_create_form(self, form): # pylint: disable=empty-docstring def save_create_form(self, form): # pylint: disable=empty-docstring
""" """ """ """
alembic = make_alembic_config(self.config) alembic = make_alembic_config(self.config)
script = get_alembic_scriptdir(self.config, alembic)
data = form.validated data = form.validated
# kwargs for `alembic revision` command # kwargs for `alembic revision` command
@ -486,7 +489,8 @@ class AlembicMigrationView(MasterView): # pylint: disable=abstract-method
intro = HTML.tag( intro = HTML.tag(
"p", "p",
class_="block", class_="block",
c="New migration script has been created. Please review and modify the file contents as needed:", c="New migration script has been created. "
"Please review and modify the file contents as needed:",
) )
path = HTML.tag( path = HTML.tag(
@ -516,7 +520,10 @@ class AlembicMigrationView(MasterView): # pylint: disable=abstract-method
rev = self.get_instance() rev = self.get_instance()
os.remove(rev["path"]) os.remove(rev["path"])
def get_revise_branch_options(self, script): # TODO: this is effectivey duplicated in AppTableView.get_migration_branch_options()
def get_revise_branch_options( # pylint: disable=missing-function-docstring
self, script
):
branches = set() branches = set()
for rev in script.get_revisions(script.get_heads()): for rev in script.get_revisions(script.get_heads()):
branches.update(rev.branch_labels) branches.update(rev.branch_labels)

View file

@ -24,12 +24,23 @@
Table Views Table Views
""" """
import sqlalchemy as sa import os
from alembic import command as alembic_command
from sqlalchemy_utils import get_mapper
from mako.lookup import TemplateLookup
from webhelpers2.html import HTML
from wuttjamaican.db.conf import (
check_alembic_current,
make_alembic_config,
get_alembic_scriptdir,
)
from wuttaweb.views import MasterView from wuttaweb.views import MasterView
class TableView(MasterView): class AppTableView(MasterView): # pylint: disable=abstract-method
""" """
Master view showing all tables in the :term:`app database`. Master view showing all tables in the :term:`app database`.
@ -40,17 +51,20 @@ class TableView(MasterView):
* ``/tables/`` * ``/tables/``
""" """
model_name = "table" # pylint: disable=duplicate-code
model_title = "Database Table" model_name = "app_table"
model_title = "App Table"
model_key = "name" model_key = "name"
url_prefix = "/tables/app"
filterable = False filterable = False
sortable = True sortable = True
sort_on_backend = False sort_on_backend = False
paginated = True paginated = True
paginate_on_backend = False paginate_on_backend = False
creatable = False creatable = True
editable = False editable = False
deletable = False deletable = False
# pylint: enable=duplicate-code
labels = { labels = {
"name": "Table Name", "name": "Table Name",
@ -70,6 +84,23 @@ class TableView(MasterView):
# "row_count", # "row_count",
] ]
has_rows = True
rows_title = "Columns"
rows_filterable = False
rows_sort_defaults = "sequence"
rows_sort_on_backend = False
rows_paginated = True
rows_paginate_on_backend = False
rows_viewable = False
row_grid_columns = [
"sequence",
"column_name",
"data_type",
"nullable",
"description",
]
def get_grid_data( # pylint: disable=empty-docstring def get_grid_data( # pylint: disable=empty-docstring
self, columns=None, session=None self, columns=None, session=None
): ):
@ -93,6 +124,9 @@ class TableView(MasterView):
g = grid g = grid
super().configure_grid(g) super().configure_grid(g)
# nb. show more tables by default
g.pagesize = 50
# schema # schema
g.set_searchable("schema") g.set_searchable("schema")
@ -100,7 +134,9 @@ class TableView(MasterView):
g.set_searchable("name") g.set_searchable("name")
g.set_link("name") g.set_link("name")
def get_instance(self): # pylint: disable=empty-docstring def get_instance( # pylint: disable=empty-docstring,arguments-differ,unused-argument
self, **kwargs
):
""" """ """ """
if "_cached_instance" not in self.__dict__: if "_cached_instance" not in self.__dict__:
model = self.app.model model = self.app.model
@ -111,6 +147,7 @@ class TableView(MasterView):
"name": table.name, "name": table.name,
"schema": table.schema or "", "schema": table.schema or "",
# "row_count": 42, # "row_count": 42,
"table": table,
} }
self.__dict__["_cached_instance"] = data self.__dict__["_cached_instance"] = data
@ -121,14 +158,248 @@ class TableView(MasterView):
""" """ """ """
return instance["name"] return instance["name"]
def get_row_grid_data(self, obj): # pylint: disable=empty-docstring
""" """
table = obj
data = []
for i, column in enumerate(table["table"].columns, 1):
data.append(
{
"column": column,
"sequence": i,
"column_name": column.name,
"data_type": str(repr(column.type)),
"nullable": column.nullable,
"description": (column.doc or "").strip(),
}
)
return data
def configure_row_grid(self, grid): # pylint: disable=empty-docstring
""" """
g = grid
super().configure_row_grid(g)
# nb. try not to hide any columns by default
g.pagesize = 100
# sequence
g.set_label("sequence", "Seq.")
# column_name
g.set_searchable("column_name")
# data_type
g.set_searchable("data_type")
# nullable
g.set_renderer("nullable", "boolean")
# description
g.set_searchable("description")
g.set_renderer("description", self.render_column_description)
def render_column_description( # pylint: disable=missing-function-docstring,unused-argument
self, column, field, value
):
if not value:
return ""
max_length = 100
if len(value) <= max_length:
return value
return HTML.tag("span", title=value, c=f"{value[:max_length]} ...")
def get_template_context(self, context): # pylint: disable=empty-docstring
""" """
if self.creating:
model = self.app.model
# alembic current
context["alembic_is_current"] = check_alembic_current(self.config)
# existing tables
# TODO: any reason this should check grid data instead of metadata?
unwanted = ["transaction", "transaction_meta"]
context["existing_tables"] = [
{"name": table}
for table in sorted(model.Base.metadata.tables)
if table not in unwanted and not table.endswith("_version")
]
# model dir
context["model_dir"] = os.path.dirname(model.__file__)
# migration branch
script = get_alembic_scriptdir(self.config)
branch_options = self.get_migration_branch_options(script)
context["migration_branch_options"] = branch_options
branch = self.config.get(
f"{self.config.appname}.alembic.default_revise_branch"
)
if not branch and len(branch_options) == 1:
branch = branch_options[0]
context["migration_branch"] = branch
return context
# TODO: this is effectivey duplicated in AlembicMigrationView.get_revise_branch_options()
def get_migration_branch_options( # pylint: disable=missing-function-docstring
self, script
):
branches = set()
for rev in script.get_revisions(script.get_heads()):
branches.update(rev.branch_labels)
return sorted(branches)
def wizard_action(self): # pylint: disable=too-many-return-statements
"""
AJAX view to handle various actions for the "new table" wizard.
"""
data = self.request.json_body
action = data.get("action", "").strip()
try:
# nb. cannot use match/case statement until python 3.10, but this
# project technically still supports python 3.8
if action == "write_model_file":
return self.write_model_file(data)
if action == "check_model":
return self.check_model(data)
if action == "write_revision_script":
return self.write_revision_script(data)
if action == "migrate_db":
return self.migrate_db(data)
if action == "check_table":
return self.check_table(data)
if action == "":
return {"error": "Must specify the action to perform."}
return {"error": f"Unknown action requested: {action}"}
except Exception as err: # pylint: disable=broad-exception-caught
return {"error": f"Unexpected error occurred: {err}"}
def write_model_file(self, data): # pylint: disable=missing-function-docstring
model = self.app.model
path = data["module_file"]
if os.path.exists(path):
if data["overwrite"]:
os.remove(path)
else:
return {"error": "File already exists"}
for column in data["columns"]:
if column["data_type"]["type"] == "_fk_uuid_" and column["relationship"]:
name = column["relationship"]
table = model.Base.metadata.tables[column["data_type"]["reference"]]
mapper = get_mapper(table)
reference_model = mapper.class_.__name__
column["relationship"] = {
"name": name,
"reference_model": reference_model,
}
# TODO: make templates dir configurable?
templates = [self.app.resource_path("wuttaweb:code-templates")]
table_templates = TemplateLookup(directories=templates)
template = table_templates.get_template("/new-table.mako")
content = template.render(**data)
with open(path, "wt", encoding="utf_8") as f:
f.write(content)
return {}
def check_model(self, data): # pylint: disable=missing-function-docstring
model = self.app.model
model_name = data["model_name"]
if not hasattr(model, model_name):
return {
"problem": "class not found in app model",
"model": model.__name__,
}
return {}
def write_revision_script(self, data): # pylint: disable=missing-function-docstring
alembic_config = make_alembic_config(self.config)
script = alembic_command.revision(
alembic_config,
autogenerate=True,
head=f"{data['branch']}@head",
message=data["message"],
)
return {"script": script.path}
def migrate_db( # pylint: disable=missing-function-docstring,unused-argument
self, data
):
alembic_config = make_alembic_config(self.config)
alembic_command.upgrade(alembic_config, "heads")
return {}
def check_table(self, data): # pylint: disable=missing-function-docstring
model = self.app.model
name = data["name"]
table = model.Base.metadata.tables.get(name)
if table is None:
return {"problem": "table does not exist in app model"}
session = self.Session()
count = session.query(table).count()
route_prefix = self.get_route_prefix()
url = self.request.route_url(f"{route_prefix}.view", name=name)
return {"url": url, "count": count}
@classmethod
def defaults(cls, config): # pylint: disable=empty-docstring
""" """
cls._apptable_defaults(config)
cls._defaults(config)
@classmethod
def _apptable_defaults(cls, config):
route_prefix = cls.get_route_prefix()
permission_prefix = cls.get_permission_prefix()
model_title_plural = cls.get_model_title_plural()
url_prefix = cls.get_url_prefix()
# fix permission group
config.add_wutta_permission_group(
permission_prefix, model_title_plural, overwrite=False
)
# wizard actions
config.add_route(
f"{route_prefix}.wizard_action",
f"{url_prefix}/new/wizard-action",
request_method="POST",
)
config.add_view(
cls,
attr="wizard_action",
route_name=f"{route_prefix}.wizard_action",
renderer="json",
permission=f"{permission_prefix}.create",
)
def defaults(config, **kwargs): # pylint: disable=missing-function-docstring def defaults(config, **kwargs): # pylint: disable=missing-function-docstring
base = globals() base = globals()
TableView = kwargs.get( # pylint: disable=invalid-name,redefined-outer-name AppTableView = kwargs.get( # pylint: disable=invalid-name,redefined-outer-name
"TableView", base["TableView"] "AppTableView", base["AppTableView"]
) )
TableView.defaults(config) AppTableView.defaults(config)
def includeme(config): # pylint: disable=missing-function-docstring def includeme(config): # pylint: disable=missing-function-docstring

View file

@ -1,15 +1,38 @@
# -*- coding: utf-8; -*- # -*- coding: utf-8; -*-
import os
from unittest.mock import patch from unittest.mock import patch
from alembic import command as alembic_command
from wuttjamaican.db.conf import check_alembic_current, make_alembic_config
from wuttaweb.testing import WebTestCase from wuttaweb.testing import WebTestCase
from wuttaweb.views import tables as mod from wuttaweb.views import tables as mod
class TestUpgradeView(WebTestCase): class TestAppTableView(WebTestCase):
def make_config(self, **kwargs):
sqlite_path = self.write_file("test.sqlite", "")
self.sqlite_engine_url = f"sqlite:///{sqlite_path}"
config_path = self.write_file(
"test.ini",
f"""
[wutta.db]
default.url = {self.sqlite_engine_url}
[alembic]
script_location = wuttjamaican.db:alembic
version_locations = wuttjamaican.db:alembic/versions
""",
)
return super().make_config([config_path], **kwargs)
def make_view(self): def make_view(self):
return mod.TableView(self.request) return mod.AppTableView(self.request)
def test_includeme(self): def test_includeme(self):
self.pyramid_config.include("wuttaweb.views.tables") self.pyramid_config.include("wuttaweb.views.tables")
@ -51,3 +74,253 @@ class TestUpgradeView(WebTestCase):
table = {"name": "poser_foo"} table = {"name": "poser_foo"}
self.assertEqual(view.get_instance_title(table), "poser_foo") self.assertEqual(view.get_instance_title(table), "poser_foo")
def test_get_row_grid_data(self):
model = self.app.model
view = self.make_view()
table = model.Base.metadata.tables["person"]
table_dict = {"name": "person", "table": table}
data = view.get_row_grid_data(table_dict)
self.assertIsInstance(data, list)
self.assertGreater(len(data), 4)
columns = [c["column_name"] for c in data]
self.assertIn("full_name", columns)
self.assertIn("first_name", columns)
self.assertIn("last_name", columns)
def test_configure_row_grid(self):
view = self.make_view()
# sanity / coverage check
grid = view.make_grid(columns=["column_name", "data_type"])
view.configure_row_grid(grid)
def test_render_column_description(self):
view = self.make_view()
# nb. first 2 params are igored
text = view.render_column_description(None, None, "hello world")
self.assertEqual(text, "hello world")
text = view.render_column_description(None, None, "")
self.assertEqual(text, "")
text = view.render_column_description(None, None, None)
self.assertEqual(text, "")
msg = (
"This is a very long and rambling sentence. "
"There is no point to it except that it is long. "
"Far too long to be reasonable."
"I mean I am serious when I say this is simply too long."
)
text = view.render_column_description(None, None, msg)
self.assertNotEqual(text, msg)
self.assertIn("<span title=", text)
def test_get_template_context(self):
view = self.make_view()
# normal view gets no extra context
context = view.get_template_context({})
self.assertIsInstance(context, dict)
self.assertNotIn("alembic_is_current", context)
self.assertNotIn("existing_tables", context)
self.assertNotIn("model_dir", context)
self.assertNotIn("migration_branch_options", context)
self.assertNotIn("migration_branch", context)
# but 'create' view gets extra context
with patch.object(view, "creating", new=True):
context = view.get_template_context({})
self.assertIsInstance(context, dict)
self.assertIn("alembic_is_current", context)
self.assertIn("existing_tables", context)
self.assertIn("model_dir", context)
self.assertIn("migration_branch_options", context)
self.assertIn("migration_branch", context)
def test_write_model_file(self):
view = self.make_view()
module_path = self.write_file("widget.py", "")
self.assertEqual(os.path.getsize(module_path), 0)
sample = {
"action": "write_model_file",
"module_file": module_path,
"overwrite": False,
"table_name": "poser_widget",
"model_name": "PoserWidget",
"model_title": "Poser Widget",
"model_title_plural": "Poser Widgets",
"description": "A widget for Poser",
"versioned": True,
"columns": [
{
"name": "uuid",
"data_type": {
"type": "UUID",
},
"formatted_data_type": "sa.UUID()",
"nullable": False,
"description": "primary key",
"versioned": "n/a",
"relationship": None,
},
{
"name": "name",
"data_type": {
"type": "String",
},
"formatted_data_type": "sa.String(length=100)",
"nullable": False,
"description": "name of widget",
"versioned": True,
"relationship": None,
},
{
"name": "owner_uuid",
"data_type": {
"type": "_fk_uuid_",
"reference": "user",
},
"formatted_data_type": "sa.UUID()",
"nullable": False,
"description": "owner of widget",
"versioned": True,
"relationship": {
"name": "owner",
"reference_model": "User",
},
},
],
}
with patch.object(self.request, "json_body", new=sample, create=True):
# does not overwrite by default
result = view.wizard_action()
self.assertIn("error", result)
self.assertEqual(result["error"], "File already exists")
# but it can overwrite if requested
with patch.dict(sample, {"overwrite": True}):
result = view.wizard_action()
self.assertNotIn("error", result)
self.assertGreater(os.path.getsize(module_path), 1000)
def test_check_model(self):
view = self.make_view()
sample = {
"action": "check_model",
"model_name": "Person",
}
with patch.object(self.request, "json_body", new=sample, create=True):
# empty result means the model exists
result = view.wizard_action()
self.assertEqual(result, {})
# problem is specified if not
with patch.dict(sample, {"model_name": "gobbledygook"}):
result = view.wizard_action()
self.assertIn("problem", result)
self.assertEqual(result["problem"], "class not found in app model")
def test_write_revision_script(self):
view = self.make_view()
sample = {
"action": "write_revision_script",
"branch": "wutta",
"message": "just a test",
}
# tell alembic our db is already current
alembic = make_alembic_config(self.config)
alembic_command.stamp(alembic, "heads")
self.assertTrue(check_alembic_current(self.config, alembic))
with patch.object(self.request, "json_body", new=sample, create=True):
# nb. this writes a real script in the wuttjamaican project
result = view.wizard_action()
self.assertIn("script", result)
outdir = os.path.dirname(result["script"])
self.assertEqual(
outdir, self.app.resource_path("wuttjamaican.db:alembic/versions")
)
# alembic now thinks we need to upgrade
self.assertFalse(check_alembic_current(self.config, alembic))
# must be sure to delete that script
os.remove(result["script"])
def test_migrate_db(self):
view = self.make_view()
sample = {"action": "migrate_db"}
# tell alembic our db is already current
alembic = make_alembic_config(self.config)
alembic_command.stamp(alembic, "heads")
self.assertTrue(check_alembic_current(self.config, alembic))
# force downgrade to wutta@-1
alembic_command.downgrade(alembic, "wutta@-1")
# alembic now thinks we need to upgrade
self.assertFalse(check_alembic_current(self.config, alembic))
with patch.object(self.request, "json_body", new=sample, create=True):
# now test our view method; alembic should then know we are current
view.wizard_action()
self.assertTrue(check_alembic_current(self.config, alembic))
def test_check_table(self):
self.pyramid_config.add_route("app_tables.view", "/tables/app/{name}")
view = self.make_view()
sample = {"action": "check_table", "name": "person"}
with patch.object(view, "Session", return_value=self.session):
with patch.object(self.request, "json_body", new=sample, create=True):
# result with URL means the table exists
result = view.wizard_action()
self.assertIn("url", result)
self.assertNotIn("problem", result)
# problem is specified if not
with patch.dict(sample, {"name": "gobbledygook"}):
result = view.wizard_action()
self.assertIn("problem", result)
self.assertEqual(result["problem"], "table does not exist in app model")
def test_wizard_action(self):
view = self.make_view()
# missing action
with patch.object(self.request, "json_body", create=True, new={}):
result = view.wizard_action()
self.assertIn("error", result)
self.assertEqual(result["error"], "Must specify the action to perform.")
# unknown action
with patch.object(
self.request, "json_body", create=True, new={"action": "nothing"}
):
result = view.wizard_action()
self.assertIn("error", result)
self.assertEqual(result["error"], "Unknown action requested: nothing")
# error invoking action
with patch.object(
self.request, "json_body", create=True, new={"action": "check_table"}
):
with patch.object(view, "check_table", side_effect=RuntimeError("whoa")):
result = view.wizard_action()
self.assertIn("error", result)
self.assertEqual(result["error"], "Unexpected error occurred: whoa")