3
0
Fork 0

Compare commits

..

10 commits

Author SHA1 Message Date
19048548f1 bump: version 0.25.1 → 0.26.0 2025-12-28 15:16:36 -06:00
c45f7482a6 test: fix method signature in test suite base class
just making pylint happy
2025-12-28 15:16:27 -06:00
e12c523176 feat: add "wizard" for creating new table/model/revision 2025-12-28 15:16:27 -06:00
b1ebe1095e feat: add support for Create Alembic Migration
just a wrapper around `alembic revision` but it seems useful...
2025-12-28 15:16:27 -06:00
58bf8b4bbb fix: let checkbox widget show static text instead of Yes/No
probably needs further improvement..but this is progress
2025-12-28 15:16:27 -06:00
70950ae9b8 feat: add CopyableTextWidget and <wutta-copyable-text> component 2025-12-28 15:16:27 -06:00
49c001c9ad feat: overhaul how form vue template is rendered
now a page template can add `<%def name="form_vue_fields()">` and the
form should inspect/discover and use that instead of its default
2025-12-28 15:16:27 -06:00
9edf6f298c fix: rename form-saving methods etc. for consistency in MasterView
also adds redirect methods where missing
2025-12-28 15:16:27 -06:00
3e7aa1fa0b feat: add basic views for Alembic Migrations, Dashboard
can see all "known" migration revisions (per alembic config), and
migrate the DB arbitrarily via alembic upgrade/downgrade
2025-12-28 15:16:24 -06:00
6791abe96f feat: add basic Table views
very minimal so far, just laying foundations
2025-12-21 16:06:02 -06:00
38 changed files with 4332 additions and 316 deletions

View file

@ -5,6 +5,25 @@ All notable changes to wuttaweb will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).
## v0.26.0 (2025-12-28)
### Feat
- add "wizard" for creating new table/model/revision
- add support for Create Alembic Migration
- add CopyableTextWidget and `<wutta-copyable-text>` component
- overhaul how form vue template is rendered
- add basic views for Alembic Migrations, Dashboard
- add basic Table views
### Fix
- let checkbox widget show static text instead of Yes/No
- rename form-saving methods etc. for consistency in MasterView
- temporarily avoid make_uuid()
- remove password filter option for Users grid
- use smarter default for `grid.sort_multiple` based on model class
## v0.25.1 (2025-12-20)
### Fix

View file

@ -0,0 +1,6 @@
``wuttaweb.views.alembic``
==========================
.. automodule:: wuttaweb.views.alembic
:members:

View file

@ -0,0 +1,6 @@
``wuttaweb.views.tables``
=========================
.. automodule:: wuttaweb.views.tables
:members:

View file

@ -28,6 +28,7 @@ templates_path = ["_templates"]
exclude_patterns = ["_build", "Thumbs.db", ".DS_Store"]
intersphinx_mapping = {
"alembic": ("https://alembic.sqlalchemy.org/en/latest/", None),
"colander": ("https://docs.pylonsproject.org/projects/colander/en/latest/", None),
"deform": ("https://docs.pylonsproject.org/projects/deform/en/latest/", None),
"fanstatic": ("https://www.fanstatic.org/en/latest/", None),

View file

@ -58,6 +58,7 @@ the narrative docs are pretty scant. That will eventually change.
api/wuttaweb.subscribers
api/wuttaweb.util
api/wuttaweb.views
api/wuttaweb.views.alembic
api/wuttaweb.views.auth
api/wuttaweb.views.base
api/wuttaweb.views.batch
@ -70,6 +71,7 @@ the narrative docs are pretty scant. That will eventually change.
api/wuttaweb.views.reports
api/wuttaweb.views.roles
api/wuttaweb.views.settings
api/wuttaweb.views.tables
api/wuttaweb.views.upgrades
api/wuttaweb.views.users

View file

@ -6,7 +6,7 @@ build-backend = "hatchling.build"
[project]
name = "WuttaWeb"
version = "0.25.1"
version = "0.26.0"
description = "Web App for Wutta Framework"
readme = "README.md"
authors = [{name = "Lance Edgar", email = "lance@wuttaproject.org"}]
@ -42,9 +42,10 @@ dependencies = [
"pyramid_fanstatic",
"pyramid_mako",
"pyramid_tm",
"SQLAlchemy-Utils",
"waitress",
"WebHelpers2",
"WuttJamaican[db]>=0.27.0",
"WuttJamaican[db]>=0.28.0",
"zope.sqlalchemy>=1.5",
]

View file

@ -0,0 +1,68 @@
## -*- coding: utf-8; mode: python; -*-
# -*- coding: utf-8; -*-
"""
Model definition for ${model_title_plural}
"""
import sqlalchemy as sa
from sqlalchemy import orm
from wuttjamaican.db import model
class ${model_name}(model.Base):
"""
${description}
"""
__tablename__ = "${table_name}"
% if any([c["data_type"]["type"] == "_fk_uuid_" for c in columns]):
__table_args__ = (
% for column in columns:
% if column["data_type"]["type"] == "_fk_uuid_":
sa.ForeignKeyConstraint(["${column['name']}"], ["${column['data_type']['reference']}.uuid"],
name="${table_name}_fk_${column['data_type']['reference']}"),
% endif
% endfor
)
% endif
% if versioned:
% if all([c["versioned"] for c in columns]):
__versioned__ = {}
% else:
__versioned__ = {
"exclude": [
% for column in columns:
% if not column["versioned"]:
"${column['name']}",
% endif
% endfor
],
}
% endif
% endif
__wutta_hint__ = {
"model_title": "${model_title}",
"model_title_plural": "${model_title_plural}",
}
% for column in columns:
% if column["name"] == "uuid":
uuid = model.uuid_column()
% else:
${column["name"]} = sa.Column(${column["formatted_data_type"]}, nullable=${column["nullable"]}, doc="""
${column["description"] or ""}
""")
% if column["data_type"]["type"] == "_fk_uuid_" and column["relationship"]:
${column["relationship"]["name"]} = orm.relationship(
"${column['relationship']['reference_model']}",
doc="""
${column["description"] or ""}
""")
% endif
% endif
% endfor
# TODO: you usually should define the __str__() method
# def __str__(self):
# return self.name or ""

View file

@ -275,6 +275,10 @@ class Form: # pylint: disable=too-many-instance-attributes,too-many-public-meth
deform_form = None
validated = None
vue_template = "/forms/vue_template.mako"
fields_template = "/forms/vue_fields.mako"
buttons_template = "/forms/vue_buttons.mako"
def __init__( # pylint: disable=too-many-arguments,too-many-positional-arguments,too-many-locals
self,
request,
@ -331,6 +335,7 @@ class Form: # pylint: disable=too-many-instance-attributes,too-many-public-meth
self.show_button_cancel = show_button_cancel
self.button_label_cancel = button_label_cancel
self.auto_disable_cancel = auto_disable_cancel
self.form_attrs = {}
self.config = self.request.wutta_config
self.app = self.config.get_app()
@ -806,7 +811,10 @@ class Form: # pylint: disable=too-many-instance-attributes,too-many-public-meth
# get fields
fields = self.get_fields()
if not fields:
raise NotImplementedError
raise ValueError(
"could not determine fields list; "
"please set model_class or fields explicitly"
)
if self.model_class:
@ -939,7 +947,7 @@ class Form: # pylint: disable=too-many-instance-attributes,too-many-public-meth
"""
return HTML.tag(self.vue_tagname, **kwargs)
def render_vue_template(self, template="/forms/vue_template.mako", **context):
def render_vue_template(self, template=None, **context):
"""
Render the Vue template block for the form.
@ -954,8 +962,8 @@ class Form: # pylint: disable=too-many-instance-attributes,too-many-public-meth
</script>
<script>
WuttaFormData = {}
WuttaForm = {
const WuttaFormData = {}
const WuttaForm = {
template: 'wutta-form-template',
}
</script>
@ -969,36 +977,121 @@ class Form: # pylint: disable=too-many-instance-attributes,too-many-public-meth
Actual output will of course depend on form attributes, i.e.
:attr:`vue_tagname` and :attr:`fields` list etc.
:param template: Path to Mako template which is used to render
the output.
Default logic will also invoke (indirectly):
* :meth:`render_vue_fields()`
* :meth:`render_vue_buttons()`
:param template: Optional template path to override the class
default.
:returns: HTML literal
"""
context = self.get_vue_context(**context)
html = render(template or self.vue_template, context)
return HTML.literal(html)
def get_vue_context(self, **context): # pylint: disable=missing-function-docstring
context["form"] = self
context["dform"] = self.get_deform()
context.setdefault("request", self.request)
context["model_data"] = self.get_vue_model_data()
# set form method, enctype
context.setdefault("form_attrs", {})
context["form_attrs"].setdefault("method", self.action_method)
form_attrs = context.setdefault("form_attrs", dict(self.form_attrs))
form_attrs.setdefault("method", self.action_method)
if self.action_method == "post":
context["form_attrs"].setdefault("enctype", "multipart/form-data")
form_attrs.setdefault("enctype", "multipart/form-data")
# auto disable button on submit
if self.auto_disable_submit:
context["form_attrs"]["@submit"] = "formSubmitting = true"
form_attrs["@submit"] = "formSubmitting = true"
output = render(template, context)
return HTML.literal(output)
# duplicate entire context for sake of fields/buttons template
context["form_context"] = context
return context
def render_vue_fields(self, context, template=None, **kwargs):
"""
Render the fields section within the form template.
This is normally invoked from within the form's
``vue_template`` like this:
.. code-block:: none
${form.render_vue_fields(form_context)}
There is a default ``fields_template`` but that is only the
last resort. Logic will first look for a
``form_vue_fields()`` def within the *main template* being
rendered for the page.
An example will surely help:
.. code-block:: mako
<%inherit file="/master/edit.mako" />
<%def name="form_vue_fields()">
<p>this is my custom fields section:</p>
${form.render_vue_field("myfield")}
</%def>
This keeps the custom fields section within the main page
template as opposed to yet another file. But if your page
template has no ``form_vue_fields()`` def, then the class
default template is used. (Unless the ``template`` param
is specified.)
See also :meth:`render_vue_template()` and
:meth:`render_vue_buttons()`.
:param context: This must be the original context as provided
to the form's ``vue_template``. See example above.
:param template: Optional template path to use instead of the
defaults described above.
:returns: HTML literal
"""
context.update(kwargs)
html = False
if not template:
if main_template := context.get("main_template"):
try:
vue_fields = main_template.get_def("form_vue_fields")
except AttributeError:
pass
else:
html = vue_fields.render(**context)
if html is False:
template = self.fields_template
if html is False:
html = render(template, context)
return HTML.literal(html)
def render_vue_field( # pylint: disable=unused-argument,too-many-locals
self,
fieldname,
readonly=None,
label=True,
horizontal=True,
**kwargs,
):
"""
Render the given field completely, i.e. ``<b-field>`` wrapper
with label and containing a widget.
with label and a widget, with validation errors flagged as
needed.
Actual output will depend on the field attributes etc.
Typical output might look like:
@ -1009,14 +1102,23 @@ class Form: # pylint: disable=too-many-instance-attributes,too-many-public-meth
horizontal
type="is-danger"
message="something went wrong!">
<!-- widget element(s) -->
<b-input name="foo"
v-model="${form.get_field_vmodel('foo')}" />
</b-field>
.. warning::
:param fieldname: Name of field to render.
Any ``**kwargs`` received from caller are ignored by this
method. For now they are allowed, for sake of backwawrd
compatibility. This may change in the future.
:param readonly: Optional override for readonly flag.
:param label: Whether to include/set the field label.
:param horizontal: Boolean value for the ``horizontal`` flag
on the field.
:param \\**kwargs: Remaining kwargs are passed to widget's
``serialize()`` method.
:returns: HTML literal
"""
# readonly comes from: caller, field flag, or form flag
if readonly is None:
@ -1034,10 +1136,9 @@ class Form: # pylint: disable=too-many-instance-attributes,too-many-public-meth
# render proper widget if field is in deform/schema
field = dform[fieldname]
kw = {}
if readonly:
kw["readonly"] = True
html = field.serialize(**kw)
kwargs["readonly"] = True
html = field.serialize(**kwargs)
else:
# render static text if field not in deform/schema
@ -1052,12 +1153,13 @@ class Form: # pylint: disable=too-many-instance-attributes,too-many-public-meth
html = HTML.literal(html or "&nbsp;")
# render field label
label = self.get_label(fieldname)
if label:
label = self.get_label(fieldname)
# b-field attrs
attrs = {
":horizontal": "true",
"label": label,
":horizontal": "true" if horizontal else "false",
"label": label or "",
}
# next we will build array of messages to display..some
@ -1085,6 +1187,36 @@ class Form: # pylint: disable=too-many-instance-attributes,too-many-public-meth
return HTML.tag("b-field", c=[html], **attrs)
def render_vue_buttons(self, context, template=None, **kwargs):
"""
Render the buttons section within the form template.
This is normally invoked from within the form's
``vue_template`` like this:
.. code-block:: none
${form.render_vue_buttons(form_context)}
.. note::
This method does not yet inspect the main page template,
unlike :meth:`render_vue_fields()`.
See also :meth:`render_vue_template()`.
:param context: This must be the original context as provided
to the form's ``vue_template``. See example above.
:param template: Optional template path to override the class
default.
:returns: HTML literal
"""
context.update(kwargs)
html = render(template or self.buttons_template, context)
return HTML.literal(html)
def render_vue_finalize(self):
"""
Render the Vue "finalize" script for the form.
@ -1103,6 +1235,25 @@ class Form: # pylint: disable=too-many-instance-attributes,too-many-public-meth
"""
return render_vue_finalize(self.vue_tagname, self.vue_component)
def get_field_vmodel(self, field):
"""
Convenience to return the ``v-model`` data reference for the
given field. For instance:
.. code-block:: none
<b-input name="myfield"
v-model="${form.get_field_vmodel('myfield')}" />
<div v-show="${form.get_field_vmodel('myfield')} == 'easter'">
easter egg!
</div>
:returns: JS-valid string referencing the field value
"""
dform = self.get_deform()
return f"modelData.{dform[field].oid}"
def get_vue_model_data(self):
"""
Returns a dict with form model data. Values may be nested

View file

@ -60,7 +60,7 @@ from deform.widget import ( # pylint: disable=unused-import
DateTimeInputWidget,
MoneyInputWidget,
)
from webhelpers2.html import HTML
from webhelpers2.html import HTML, tags
from wuttjamaican.conf import parse_list
@ -147,6 +147,24 @@ class NotesWidget(TextAreaWidget):
readonly_template = "readonly/notes"
class CopyableTextWidget(Widget): # pylint: disable=abstract-method
"""
A readonly text widget which adds a "copy" icon/link just after
the text.
"""
def serialize(self, field, cstruct, **kw): # pylint: disable=empty-docstring
""" """
if not cstruct:
return colander.null
return HTML.tag("wutta-copyable-text", **{"text": cstruct})
def deserialize(self, field, pstruct): # pylint: disable=empty-docstring
""" """
raise NotImplementedError
class WuttaCheckboxChoiceWidget(CheckboxChoiceWidget):
"""
Custom widget for :class:`python:set` fields.
@ -537,3 +555,58 @@ class BatchIdWidget(Widget): # pylint: disable=abstract-method
batch_id = int(cstruct)
return f"{batch_id:08d}"
class AlembicRevisionWidget(Widget): # pylint: disable=missing-class-docstring
"""
Widget to show an Alembic revision identifier, with link to view
the revision.
"""
def __init__(self, request, *args, **kwargs):
super().__init__(*args, **kwargs)
self.request = request
def serialize(self, field, cstruct, **kw): # pylint: disable=empty-docstring
""" """
if not cstruct:
return colander.null
return tags.link_to(
cstruct, self.request.route_url("alembic.migrations.view", revision=cstruct)
)
def deserialize(self, field, pstruct): # pylint: disable=empty-docstring
""" """
raise NotImplementedError
class AlembicRevisionsWidget(Widget):
"""
Widget to show list of Alembic revision identifiers, with links to
view each revision.
"""
def __init__(self, request, *args, **kwargs):
super().__init__(*args, **kwargs)
self.request = request
self.config = self.request.wutta_config
def serialize(self, field, cstruct, **kw): # pylint: disable=empty-docstring
""" """
if not cstruct:
return colander.null
revisions = []
for rev in self.config.parse_list(cstruct):
revisions.append(
tags.link_to(
rev, self.request.route_url("alembic.migrations.view", revision=rev)
)
)
return ", ".join(revisions)
def deserialize(self, field, pstruct): # pylint: disable=empty-docstring
""" """
raise NotImplementedError

View file

@ -0,0 +1,234 @@
## -*- coding: utf-8; -*-
<%inherit file="/page.mako" />
<%def name="page_content()">
<div style="display: flex; gap: 5rem; align-items: start;">
<div style="width: 40%;">
<br />
<b-field label="Script Dir" horizontal>
<span>{{ script.dir }}</span>
</b-field>
<b-field label="Env Location" horizontal>
<span>{{ script.env_py_location }}</span>
</b-field>
<b-field label="Version Locations" horizontal>
<ul>
<li v-for="path in script.version_locations">
{{ path }}
</li>
</ul>
</b-field>
</div>
<div>
<div class="buttons">
% if request.has_perm("app_tables.list"):
<wutta-button type="is-primary"
tag="a" href="${url('app_tables')}"
icon-left="table"
label="App Tables"
once />
% endif
% if request.has_perm("app_tables.create"):
<wutta-button type="is-primary"
tag="a" href="${url('app_tables.create')}"
icon-left="plus"
label="New Table"
once />
% endif
</div>
<div class="buttons">
% if request.has_perm("alembic.migrations.list"):
<wutta-button type="is-primary"
tag="a" href="${url('alembic.migrations')}"
icon-left="forward"
label="Alembic Migrations"
once />
% endif
% if request.has_perm("alembic.migrations.create"):
<wutta-button type="is-primary"
tag="a" href="${url('alembic.migrations.create')}"
icon-left="plus"
label="New Migration"
once />
% endif
</div>
% if request.has_perm("alembic.migrate"):
<div class="buttons">
<b-button type="is-warning"
icon-pack="fas"
icon-left="forward"
label="Migrate Database"
@click="migrateInit()">
Migrate Database
</b-button>
</div>
% endif
</div>
</div>
<br />
<h4 class="block is-size-4">Script Heads</h4>
<b-table :data="scriptHeads"
:row-class="getScriptHeadRowClass">
<b-table-column field="branch_labels" label="Branch" v-slot="props">
<span>{{ props.row.branch_labels }}</span>
</b-table-column>
<b-table-column field="doc" label="Description" v-slot="props">
<span>{{ props.row.doc }}</span>
</b-table-column>
<b-table-column field="is_current" label="Current in DB" v-slot="props">
<span :class="{'has-text-weight-bold': true, 'has-text-success': props.row.is_current}">{{ props.row.is_current ? "Yes" : "No" }}</span>
</b-table-column>
<b-table-column field="revision" label="Revision" v-slot="props">
<span v-html="props.row.revision"></span>
</b-table-column>
<b-table-column field="down_revision" label="Down Revision" v-slot="props">
<span v-html="props.row.down_revision"></span>
</b-table-column>
<b-table-column field="path" label="Path" v-slot="props">
<span>{{ props.row.path }}</span>
</b-table-column>
</b-table>
<br />
<h4 class="block is-size-4">Database Heads</h4>
<b-table :data="dbHeads">
<b-table-column field="branch_labels" label="Branch" v-slot="props">
<span>{{ props.row.branch_labels }}</span>
</b-table-column>
<b-table-column field="doc" label="Description" v-slot="props">
<span>{{ props.row.doc }}</span>
</b-table-column>
<b-table-column field="revision" label="Revision" v-slot="props">
<span v-html="props.row.revision"></span>
</b-table-column>
<b-table-column field="down_revision" label="Down Revision" v-slot="props">
<span v-html="props.row.down_revision"></span>
</b-table-column>
<b-table-column field="path" label="Path" v-slot="props">
<span>{{ props.row.path }}</span>
</b-table-column>
</b-table>
% if request.has_perm("alembic.migrate"):
<${b}-modal has-modal-card
:active.sync="migrateShowDialog">
<div class="modal-card">
<header class="modal-card-head">
<p class="modal-card-title">Migrate Database</p>
</header>
<section class="modal-card-body">
${h.form(url("alembic.migrate"), method="POST", ref="migrateForm")}
${h.csrf_token(request)}
<p class="block">
You can provide any revspec target. Default will
upgrade to all branch heads.
</p>
<div class="block content is-family-monospace">
<ul>
<li>alembic upgrade heads</li>
<li>alembic upgrade poser@head</li>
<li>alembic downgrade poser@-1</li>
<li>alembic downgrade poser@base</li>
<li>alembic downgrade fc3a3bcaa069</li>
</ul>
</div>
<p class="block has-text-weight-bold">
don&apos;t try that last one ;)
</p>
<b-field grouped>
<b-field label="Direction">
<b-select name="direction" v-model="migrateDirection">
<option value="upgrade">upgrade</option>
<option value="downgrade">downgrade</option>
</b-select>
</b-field>
<b-field label="Target Spec"
:type="migrateTarget ? null : 'is-danger'">
<b-input name="revspec" v-model="migrateTarget" />
</b-field>
</b-field>
${h.end_form()}
</section>
<footer class="modal-card-foot">
<div style="display: flex; gap: 0.5rem;">
<b-button @click="migrateShowDialog = false">
Cancel
</b-button>
<b-button type="is-warning"
icon-pack="fas"
icon-left="forward"
@click="migrateSubmit()"
:disabled="migrateSubmitDisabled">
{{ migrating ? "Working, please wait..." : "Migrate" }}
</b-button>
</div>
</footer>
</div>
</${b}-modal>
% endif
</%def>
<%def name="modify_vue_vars()">
${parent.modify_vue_vars()}
<script>
ThisPageData.script = ${json.dumps(script)|n}
ThisPageData.scriptHeads = ${json.dumps(script_heads)|n}
ThisPageData.dbHeads = ${json.dumps(db_heads)|n}
ThisPage.methods.getScriptHeadRowClass = function(rev) {
if (!rev.is_current) {
return 'has-background-warning'
}
}
% if request.has_perm("alembic.migrate"):
ThisPageData.migrating = false
ThisPageData.migrateShowDialog = false
ThisPageData.migrateTarget = "heads"
ThisPageData.migrateDirection = "upgrade"
ThisPage.methods.migrateInit = function() {
this.migrateShowDialog = true
}
ThisPage.computed.migrateSubmitDisabled = function() {
if (this.migrating) {
return true
}
if (!this.migrateTarget) {
return true
}
}
ThisPage.methods.migrateSubmit = function() {
this.migrating = true
this.$refs.migrateForm.submit()
}
% endif
</script>
</%def>

View file

@ -0,0 +1,31 @@
## -*- coding: utf-8; -*-
<%inherit file="/configure.mako" />
<%def name="form_content()">
<h3 class="block is-size-3">Basics</h3>
<div class="block" style="padding-left: 2rem; width: 50%;">
<b-field label="Default Branch for new Migrations">
<b-select name="${config.appname}.alembic.default_revise_branch"
v-model="simpleSettings['${config.appname}.alembic.default_revise_branch']"
@input="settingsNeedSaved = true">
<option :value="null">(none)</option>
<option v-for="branch in reviseBranchOptions"
:value="branch">
{{ branch }}
</option>
</b-select>
</b-field>
</div>
</%def>
<%def name="modify_vue_vars()">
${parent.modify_vue_vars()}
<script>
ThisPageData.reviseBranchOptions = ${json.dumps(revise_branch_options)|n}
</script>
</%def>

View file

@ -0,0 +1,70 @@
## -*- coding: utf-8; -*-
<%inherit file="/master/create.mako" />
<%def name="page_content()">
<br />
<div class="buttons">
% if request.has_perm("alembic.dashboard"):
<wutta-button type="is-primary"
tag="a" href="${url('alembic.dashboard')}"
icon-left="forward"
label="Alembic Dashboard"
once />
% endif
</div>
${parent.page_content()}
</%def>
<%def name="form_vue_fields()">
${form.render_vue_field("description", horizontal=False)}
${form.render_vue_field("autogenerate", horizontal=False, label=False, static_text="Auto-generate migration logic based on current app model")}
<br />
<b-field label="Branching Options">
<div style="margin: 1rem;">
<div class="field">
<b-radio name="branching_option"
v-model="${form.get_field_vmodel('branching_option')}"
native-value="revise">
Revise existing branch
</b-radio>
</div>
<div v-show="${form.get_field_vmodel('branching_option')} == 'revise'"
style="padding: 1rem 0;">
${form.render_vue_field("revise_branch", horizontal=True)}
</div>
<div class="field">
<b-radio name="branching_option"
v-model="${form.get_field_vmodel('branching_option')}"
native-value="new">
Start new branch
</b-radio>
</div>
<div v-show="${form.get_field_vmodel('branching_option')} == 'new'"
style="padding: 1rem 0;">
${form.render_vue_field("new_branch", horizontal=True)}
${form.render_vue_field("version_location", horizontal=True)}
<p class="block is-italic">
NOTE: New version locations must be added to the
<span class="is-family-monospace">[alembic]</span> section of
your config file (and app restarted) before they will appear as
options here.
</p>
</div>
</div>
</b-field>
</%def>

View file

@ -0,0 +1,25 @@
## -*- coding: utf-8; -*-
<%inherit file="/master/index.mako" />
<%def name="page_content()">
<div class="buttons">
% if request.has_perm("alembic.dashboard"):
<wutta-button type="is-primary"
tag="a" href="${url('alembic.dashboard')}"
icon-left="forward"
label="Alembic Dashboard"
once />
% endif
% if request.has_perm("app_tables.list"):
<wutta-button type="is-primary"
tag="a" href="${url('app_tables')}"
icon-left="table"
label="App Tables"
once />
% endif
</div>
${parent.page_content()}
</%def>

View file

@ -0,0 +1,17 @@
## -*- coding: utf-8; -*-
<%inherit file="/master/view.mako" />
<%def name="page_content()">
<br />
<div class="buttons">
% if request.has_perm("alembic.dashboard"):
<wutta-button type="is-primary"
tag="a" href="${url('alembic.dashboard')}"
icon-left="forward"
label="Alembic Dashboard"
once />
% endif
</div>
${parent.page_content()}
</%def>

View file

@ -32,6 +32,26 @@
</div>
</nav>
<div class="buttons">
% if request.has_perm("app_tables.list"):
<wutta-button type="is-primary"
tag="a" href="${url('app_tables')}"
icon-left="table"
label="App Tables"
once />
% endif
% if request.has_perm("alembic.dashboard"):
<wutta-button type="is-primary"
tag="a" href="${url('alembic.dashboard')}"
icon-left="forward"
label="Alembic Dashboard"
once />
% endif
</div>
<nav class="panel">
<p class="panel-heading">Configuration Files</p>
<div class="panel-block">

View file

@ -1,11 +1,13 @@
<div tal:omit-tag=""
tal:define="name name|field.name;
oid oid|field.oid;
vmodel vmodel|'modelData.'+oid;">
vmodel vmodel|'modelData.'+oid;
static_text static_text|None;">
<b-checkbox name="${name}"
v-model="${vmodel}"
native-value="true"
tal:attributes="attributes|field.widget.attributes|{};">
{{ ${vmodel} ? "Yes" : "No" }}
<span tal:omit-tag="" tal:condition="static_text">${static_text}</span>
<span tal:omit-tag="" tal:condition="not static_text">{{ ${vmodel} ? "Yes" : "No" }}</span>
</b-checkbox>
</div>

View file

@ -34,17 +34,19 @@
<%def name="tool_panels()"></%def>
<%def name="render_vue_template_form()">
% if form is not Undefined:
${form.render_vue_template()}
% endif
</%def>
<%def name="render_vue_templates()">
${parent.render_vue_templates()}
${self.render_vue_template_form()}
</%def>
<%def name="render_vue_template_form()">
% if form is not Undefined:
## nb. must provide main template to form, so it can
## do 'def' lookup as needed for fields template etc.
${form.render_vue_template(main_template=self.template)}
% endif
</%def>
<%def name="make_vue_components()">
${parent.make_vue_components()}
% if form is not Undefined:

View file

@ -0,0 +1,43 @@
## -*- coding: utf-8; -*-
% if not form.readonly:
<br />
<div class="buttons"
% if form.align_buttons_right:
style="justify-content: right;"
% endif
>
% if form.show_button_cancel:
<wutta-button ${'once' if form.auto_disable_cancel else ''}
tag="a" href="${form.get_cancel_url()}"
label="${form.button_label_cancel}" />
% endif
% if form.show_button_reset:
<b-button
% if form.reset_url:
tag="a" href="${form.reset_url}"
% else:
native-type="reset"
% endif
>
Reset
</b-button>
% endif
<b-button type="${form.button_type_submit}"
native-type="submit"
% if form.auto_disable_submit:
:disabled="formSubmitting"
% endif
icon-pack="fas"
icon-left="${form.button_icon_submit}">
% if form.auto_disable_submit:
{{ formSubmitting ? "Working, please wait..." : "${form.button_label_submit}" }}
% else:
${form.button_label_submit}
% endif
</b-button>
</div>
% endif

View file

@ -0,0 +1,4 @@
## -*- coding: utf-8; -*-
% for fieldname in form:
${form.render_vue_field(fieldname, horizontal=True)}
% endfor

View file

@ -14,66 +14,21 @@
% endfor
% endif
<section>
% for fieldname in form:
${form.render_vue_field(fieldname)}
% endfor
</section>
${form.render_vue_fields(form_context)}
% if not form.readonly:
<br />
<div class="buttons"
% if form.align_buttons_right:
style="justify-content: right;"
% endif
>
% if form.show_button_cancel:
<wutta-button ${'once' if form.auto_disable_cancel else ''}
tag="a" href="${form.get_cancel_url()}"
label="${form.button_label_cancel}" />
% endif
% if form.show_button_reset:
<b-button
% if form.reset_url:
tag="a" href="${form.reset_url}"
% else:
native-type="reset"
% endif
>
Reset
</b-button>
% endif
<b-button type="${form.button_type_submit}"
native-type="submit"
% if form.auto_disable_submit:
:disabled="formSubmitting"
% endif
icon-pack="fas"
icon-left="${form.button_icon_submit}">
% if form.auto_disable_submit:
{{ formSubmitting ? "Working, please wait..." : "${form.button_label_submit}" }}
% else:
${form.button_label_submit}
% endif
</b-button>
</div>
% endif
${form.render_vue_buttons(form_context)}
${h.end_form()}
</script>
<script>
let ${form.vue_component} = {
const ${form.vue_component} = {
template: '#${form.vue_tagname}-template',
methods: {},
}
let ${form.vue_component}Data = {
const ${form.vue_component}Data = {
% if not form.readonly:

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,25 @@
## -*- coding: utf-8; -*-
<%inherit file="/master/index.mako" />
<%def name="page_content()">
<div class="buttons">
% if request.has_perm("alembic.dashboard"):
<wutta-button type="is-primary"
tag="a" href="${url('alembic.dashboard')}"
icon-left="forward"
label="Alembic Dashboard"
once />
% endif
% if request.has_perm("alembic.migrations.list"):
<wutta-button type="is-primary"
tag="a" href="${url('alembic.migrations')}"
icon-left="forward"
label="Alembic Migrations"
once />
% endif
</div>
${parent.page_content()}
</%def>

View file

@ -0,0 +1,14 @@
## -*- coding: utf-8; -*-
<%inherit file="/master/index.mako" />
<%def name="index_title_controls()">
${parent.index_title_controls()}
% if request.has_perm("alembic.dashboard"):
<wutta-button type="is-primary"
tag="a" href="${url('alembic.dashboard')}"
icon-left="forward"
label="Alembic Dashboard"
once />
% endif
</%def>

View file

@ -4,6 +4,7 @@
${self.make_wutta_autocomplete_component()}
${self.make_wutta_button_component()}
${self.make_wutta_checked_password_component()}
${self.make_wutta_copyable_text_component()}
${self.make_wutta_datepicker_component()}
${self.make_wutta_timepicker_component()}
${self.make_wutta_filter_component()}
@ -349,6 +350,72 @@
</script>
</%def>
<%def name="make_wutta_copyable_text_component()">
<script type="text/x-template" id="wutta-copyable-text-template">
<span>
<span v-if="!iconFirst">{{ text }}</span>
<b-tooltip label="Copied!" :triggers="['click']">
<a v-if="text"
href="#"
@click.prevent="copyText()">
<b-icon icon="copy" pack="fas" />
</a>
</b-tooltip>
<span v-if="iconFirst">{{ text }}</span>
## dummy input field needed to copy text on *insecure* sites
<b-input v-model="legacyText" ref="legacyText" v-show="legacyText" />
</span>
</script>
<script>
const WuttaCopyableText = {
template: '#wutta-copyable-text-template',
props: {
text: {required: true},
iconFirst: Boolean,
},
data() {
return {
## dummy input value needed to copy text on *insecure* sites
legacyText: null,
}
},
methods: {
async copyText() {
if (navigator.clipboard) {
// this is the way forward, but requires HTTPS
navigator.clipboard.writeText(this.text)
} else {
// use deprecated 'copy' command, but this just
// tells the browser to copy currently-selected
// text..which means we first must "add" some text
// to screen, and auto-select that, before copying
// to clipboard
this.legacyText = this.text
this.$nextTick(() => {
let input = this.$refs.legacyText.$el.firstChild
input.select()
document.execCommand('copy')
// re-hide the dummy input
this.legacyText = null
})
}
},
},
}
Vue.component('wutta-copyable-text', WuttaCopyableText)
<% request.register_component('wutta-copyable-text', 'WuttaCopyableText') %>
</script>
</%def>
<%def name="make_wutta_datepicker_component()">
<script type="text/x-template" id="wutta-datepicker-template">
<b-datepicker :name="name"

View file

@ -43,6 +43,8 @@ class WebTestCase(DataTestCase):
Base class for test suites requiring a full (typical) web app.
"""
mako_directories = ["wuttaweb:templates"]
def setUp(self): # pylint: disable=empty-docstring
""" """
self.setup_web()
@ -57,7 +59,7 @@ class WebTestCase(DataTestCase):
request=self.request,
settings={
"wutta_config": self.config,
"mako.directories": ["wuttaweb:templates"],
"mako.directories": self.mako_directories,
"pyramid_deform.template_search_path": "wuttaweb:templates/deform",
},
)
@ -137,7 +139,7 @@ class VersionWebTestCase(WebTestCase):
continuum.versioning_manager.transaction_cls = continuum.TransactionFactory()
self.teardown_web()
def make_config(self, **kwargs):
def make_config(self, files=None, **kwargs):
"""
Make and customize the config object.
@ -147,7 +149,7 @@ class VersionWebTestCase(WebTestCase):
WuttaContinuumConfigExtension,
)
config = super().make_config(**kwargs)
config = super().make_config(files, **kwargs)
config.setdefault("wutta_continuum.enable_versioning", "true")
# nb. must purge model classes from sys.modules, so they will

View file

@ -0,0 +1,565 @@
# -*- coding: utf-8; -*-
################################################################################
#
# wuttaweb -- Web App for Wutta Framework
# Copyright © 2024-2025 Lance Edgar
#
# This file is part of Wutta Framework.
#
# Wutta Framework is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# Wutta Framework is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License along with
# Wutta Framework. If not, see <http://www.gnu.org/licenses/>.
#
################################################################################
"""
Views for Alembic
"""
import datetime
import logging
import os
import re
from alembic import command as alembic_command
from alembic.migration import MigrationContext
from alembic.util import CommandError
from wuttjamaican.db.conf import (
make_alembic_config,
get_alembic_scriptdir,
check_alembic_current,
)
import colander
from webhelpers2.html import tags, HTML
from wuttaweb.views import View, MasterView
from wuttaweb.forms import widgets
log = logging.getLogger(__name__)
def normalize_revision(config, rev): # pylint: disable=missing-function-docstring
app = config.get_app()
created = None
if match := re.search(r"Create Date: (\d{4}-\d{2}-\d{2}[\d:\. ]+\d)", rev.longdoc):
created = datetime.datetime.fromisoformat(match.group(1))
created = app.localtime(created, from_utc=False)
created = app.render_datetime(created)
return {
"revision": rev.revision,
"branch_labels": ", ".join(rev.branch_labels),
"doc": rev.doc,
"longdoc": rev.longdoc,
"path": rev.path,
"dependencies": rev.dependencies or "",
"down_revision": rev.down_revision or "",
"nextrev": ", ".join(rev.nextrev),
"is_base": app.render_boolean(rev.is_base),
"is_branch_point": app.render_boolean(rev.is_branch_point),
"is_head": rev.is_head,
"created": created,
}
class AlembicDashboardView(View):
"""
Custom views for the Alembic Dashboard.
"""
def dashboard(self):
"""
Main view for the Alembic Dashboard.
Route name is ``alembic.dashboard``; URL is
``/alembic/dashboard``
"""
script = get_alembic_scriptdir(self.config)
with self.config.appdb_engine.begin() as conn:
context = MigrationContext.configure(conn)
current_heads = context.get_current_heads()
def normalize(rev):
normal = normalize_revision(self.config, rev)
normal["is_current"] = rev.revision in current_heads
normal["revision"] = tags.link_to(
normal["revision"],
self.request.route_url(
"alembic.migrations.view", revision=normal["revision"]
),
)
if normal["down_revision"]:
normal["down_revision"] = tags.link_to(
normal["down_revision"],
self.request.route_url(
"alembic.migrations.view", revision=normal["down_revision"]
),
)
return normal
script_heads = []
for head in script.get_heads():
rev = script.get_revision(head)
script_heads.append(normalize(rev))
db_heads = []
for head in current_heads:
rev = script.get_revision(head)
db_heads.append(normalize(rev))
script_heads.sort(key=lambda rev: rev["branch_labels"])
db_heads.sort(key=lambda rev: rev["branch_labels"])
return {
"index_title": "Alembic Dashboard",
"script": {
"dir": script.dir,
"version_locations": sorted(script.version_locations),
"env_py_location": script.env_py_location,
"file_template": script.file_template,
},
"script_heads": script_heads,
"db_heads": db_heads,
}
def migrate(self):
"""
Action view to migrate the database. POST request must be used.
This directly invokes the :func:`alembic upgrade
<alembic:alembic.command.upgrade>` (or :func:`alembic
downgrade <alembic:alembic.command.downgrade>`) command.
It then sets a flash message per the command status, and
redirects user back to the Dashboard (or other referrer).
The request must specify a ``revspec`` param, which we pass
along as-is to the ``alembic`` command. We assume ``alembic
upgrade`` unless the request sets ``direction`` param to
``"downgrade"``.
"""
referrer = self.request.get_referrer(
default=self.request.route_url("alembic.dashboard")
)
if self.request.method != "POST":
return self.redirect(referrer)
revspec = self.request.POST.get("revspec")
if not revspec:
self.request.session.flash("You must provide a target revspec.", "error")
else:
direction = self.request.POST.get("direction")
if direction != "downgrade":
direction = "upgrade"
alembic = make_alembic_config(self.config)
command = (
alembic_command.downgrade
if direction == "downgrade"
else alembic_command.upgrade
)
# invoke alembic upgrade/downgrade
try:
command(alembic, revspec)
except Exception as err: # pylint: disable=broad-exception-caught
log.exception(
"database failed to %s using revspec: %s", direction, revspec
)
self.request.session.flash(f"Migrate failed: {err}", "error")
else:
self.request.session.flash("Database has been migrated.")
return self.redirect(referrer)
@classmethod
def defaults(cls, config): # pylint: disable=empty-docstring
""" """
cls._defaults(config)
@classmethod
def _defaults(cls, config):
# permission group
config.add_wutta_permission_group(
"alembic", "Alembic (General)", overwrite=False
)
# dashboard
config.add_wutta_permission(
"alembic",
"alembic.dashboard",
"Basic (view) access to the Alembic Dashboard",
)
config.add_route("alembic.dashboard", "/alembic/dashboard")
config.add_view(
cls,
attr="dashboard",
route_name="alembic.dashboard",
renderer="/alembic/dashboard.mako",
permission="alembic.dashboard",
)
# migrate
config.add_wutta_permission(
"alembic",
"alembic.migrate",
"Run migration scripts on the database",
)
config.add_route("alembic.migrate", "/alembic/migrate")
config.add_view(
cls,
attr="migrate",
route_name="alembic.migrate",
permission="alembic.migrate",
)
class AlembicMigrationView(MasterView): # pylint: disable=abstract-method
"""
Master view for Alembic Migrations.
Route prefix is ``alembic.migrations``; notable URLs include:
* ``/alembic/migrations/``
* ``/alembic/migrations/new``
* ``/alembic/migrations/XXX``
"""
# pylint: disable=duplicate-code
model_name = "alembic_migration"
model_key = "revision"
model_title = "Alembic Migration"
route_prefix = "alembic.migrations"
url_prefix = "/alembic/migrations"
filterable = False
sortable = True
sort_on_backend = False
paginated = True
paginate_on_backend = False
editable = False
configurable = True
# pylint: enable=duplicate-code
labels = {
"doc": "Description",
"longdoc": "Long Description",
"nextrev": "Next Revision",
}
grid_columns = [
"is_head",
"revision",
"doc",
"branch_labels",
"down_revision",
"created",
]
sort_defaults = ("is_head", "desc")
form_fields = [
"revision",
"doc",
"longdoc",
"branch_labels",
"dependencies",
"down_revision",
"nextrev",
"is_base",
"is_branch_point",
"is_head",
"path",
"created",
]
def get_grid_data( # pylint: disable=empty-docstring
self, columns=None, session=None
):
""" """
data = []
script = get_alembic_scriptdir(self.config)
for rev in script.walk_revisions():
data.append(normalize_revision(self.config, rev))
return data
def configure_grid(self, grid): # pylint: disable=empty-docstring
""" """
g = grid
super().configure_grid(g)
# revision
g.set_link("revision")
g.set_searchable("revision")
# doc
g.set_link("doc")
g.set_searchable("doc")
# branch_labels
g.set_searchable("branch_labels")
# is_head
g.set_label("is_head", "Head")
g.set_renderer("is_head", self.render_is_head)
def render_is_head( # pylint: disable=missing-function-docstring,unused-argument
self, rev, field, value
):
return self.app.render_boolean(value) if value else ""
def get_instance(
self, **kwargs
): # pylint: disable=empty-docstring,arguments-differ,unused-argument
""" """
if "_cached_instance" not in self.__dict__:
revision = self.request.matchdict["revision"]
script = get_alembic_scriptdir(self.config)
try:
rev = script.get_revision(revision)
except CommandError:
rev = None
if not rev:
raise self.notfound()
self.__dict__["_cached_instance"] = normalize_revision(self.config, rev)
return self.__dict__["_cached_instance"]
def get_instance_title(self, instance): # pylint: disable=empty-docstring
""" """
text = f"({instance['branch_labels']}) {instance['doc']}"
if instance.get("is_head"):
text += " [head]"
return text
def configure_form(self, form): # pylint: disable=empty-docstring
""" """
f = form
super().configure_form(f)
# revision
f.set_widget("revision", widgets.CopyableTextWidget())
# longdoc
f.set_widget("longdoc", "notes")
# down_revision
f.set_widget("down_revision", widgets.AlembicRevisionWidget(self.request))
# nextrev
f.set_widget("nextrev", widgets.AlembicRevisionsWidget(self.request))
# is_head
f.set_node("is_head", colander.Boolean())
# path
f.set_widget("path", widgets.CopyableTextWidget())
def make_create_form(self): # pylint: disable=empty-docstring
""" """
alembic = make_alembic_config(self.config)
script = get_alembic_scriptdir(self.config, alembic)
schema = colander.Schema()
schema.add(colander.SchemaNode(colander.String(), name="description"))
schema.add(
colander.SchemaNode(
colander.Boolean(),
name="autogenerate",
default=check_alembic_current(self.config, alembic),
)
)
schema.add(
colander.SchemaNode(
colander.String(), name="branching_option", default="revise"
)
)
branch_options = self.get_revise_branch_options(script)
revise_branch = colander.SchemaNode(
colander.String(),
name="revise_branch",
missing=colander.null,
validator=colander.OneOf(branch_options),
widget=widgets.SelectWidget(values=[(b, b) for b in branch_options]),
)
branch = self.config.get(f"{self.config.appname}.alembic.default_revise_branch")
if not branch and len(branch_options) == 1:
branch = branch_options[0]
if branch:
revise_branch.default = branch
schema.add(revise_branch)
schema.add(
colander.SchemaNode(
colander.String(), name="new_branch", missing=colander.null
)
)
version_locations = sorted(
self.config.parse_list(alembic.get_main_option("version_locations"))
)
schema.add(
colander.SchemaNode(
colander.String(),
name="version_location",
missing=colander.null,
validator=colander.OneOf(version_locations),
widget=widgets.SelectWidget(values=[(v, v) for v in version_locations]),
)
)
schema.validator = colander.All(
self.validate_revise_branch, self.validate_new_branch
)
form = self.make_form(
schema=schema,
cancel_url_fallback=self.get_index_url(),
button_label_submit="Write Script File",
)
form.set_label("revise_branch", "Branch")
return form
def validate_revise_branch( # pylint: disable=missing-function-docstring
self, node, value
):
if value["branching_option"] == "revise":
if not value["revise_branch"]:
node["revise_branch"].raise_invalid(
"Must specify which branch to revise."
)
def validate_new_branch( # pylint: disable=missing-function-docstring
self, node, value
):
if value["branching_option"] == "new":
if not value["new_branch"]:
node["new_branch"].raise_invalid("New branch requires a name.")
if not value["version_location"]:
node["version_location"].raise_invalid(
"New branch requires a version location."
)
def save_create_form(self, form): # pylint: disable=empty-docstring
""" """
alembic = make_alembic_config(self.config)
data = form.validated
# kwargs for `alembic revision` command
kw = {
"message": data["description"],
"autogenerate": data["autogenerate"],
}
if data["branching_option"] == "new":
kw["head"] = "base"
kw["branch_label"] = data["new_branch"]
kw["version_path"] = self.app.resource_path(data["version_location"])
else:
assert data["branching_option"] == "revise"
kw["head"] = f"{data['revise_branch']}@head"
# run `alembic revision`
revision = alembic_command.revision(alembic, **kw)
intro = HTML.tag(
"p",
class_="block",
c="New migration script has been created. "
"Please review and modify the file contents as needed:",
)
path = HTML.tag(
"p",
class_="block has-background-white has-text-black is-family-monospace",
style="padding: 0.5rem;",
c=[HTML.tag("wutta-copyable-text", text=revision.path)],
)
outro = HTML.tag(
"p",
class_="block",
c=[
"When satisfied, proceed to ",
tags.link_to(
"Migrate Database", self.request.route_url("alembic.dashboard")
),
".",
],
)
self.request.session.flash(HTML.tag("div", c=[intro, path, outro]))
return revision
def save_delete_form(self, form): # pylint: disable=empty-docstring
""" """
rev = self.get_instance()
os.remove(rev["path"])
# TODO: this is effectivey duplicated in AppTableView.get_migration_branch_options()
def get_revise_branch_options( # pylint: disable=missing-function-docstring
self, script
):
branches = set()
for rev in script.get_revisions(script.get_heads()):
branches.update(rev.branch_labels)
return sorted(branches)
def configure_get_simple_settings(self): # pylint: disable=empty-docstring
""" """
return [
{"name": f"{self.config.appname}.alembic.default_revise_branch"},
]
def configure_get_context( # pylint: disable=empty-docstring,arguments-differ
self, **kwargs
):
""" """
context = super().configure_get_context(**kwargs)
script = get_alembic_scriptdir(self.config)
context["revise_branch_options"] = self.get_revise_branch_options(script)
return context
def defaults(config, **kwargs): # pylint: disable=missing-function-docstring
base = globals()
AlembicDashboardView = ( # pylint: disable=invalid-name,redefined-outer-name
kwargs.get("AlembicDashboardView", base["AlembicDashboardView"])
)
AlembicDashboardView.defaults(config)
AlembicMigrationView = ( # pylint: disable=invalid-name,redefined-outer-name
kwargs.get("AlembicMigrationView", base["AlembicMigrationView"])
)
AlembicMigrationView.defaults(config)
def includeme(config): # pylint: disable=missing-function-docstring
defaults(config)

View file

@ -269,7 +269,7 @@ class BatchMasterView(MasterView):
# otherwise normal logic is fine
return super().objectify(form)
def redirect_after_create(self, obj):
def redirect_after_create(self, result):
"""
If the new batch requires initial population, we launch a
thread for that and show the "progress" page.
@ -277,7 +277,7 @@ class BatchMasterView(MasterView):
Otherwise this will do the normal thing of redirecting to the
"view" page for the new batch.
"""
batch = obj
batch = result
# just view batch if should not populate
if not self.batch_handler.should_populate(batch):

View file

@ -301,7 +301,7 @@ class CommonView(View):
@classmethod
def _defaults(cls, config):
config.add_wutta_permission_group("common", "(common)", overwrite=False)
config.add_wutta_permission_group("common", "(General)", overwrite=False)
# home page
config.add_route("home", "/")
@ -328,7 +328,7 @@ class CommonView(View):
renderer="json",
)
config.add_wutta_permission(
"common", "common.feedback", "Send user feedback about the app"
"common", "common.feedback", "Send a feedback message"
)
# setup
@ -339,7 +339,7 @@ class CommonView(View):
config.add_route("change_theme", "/change-theme", request_method="POST")
config.add_view(cls, attr="change_theme", route_name="change_theme")
config.add_wutta_permission(
"common", "common.change_theme", "Change global theme"
"common", "common.change_theme", "Change the global app theme"
)

View file

@ -25,7 +25,7 @@ Essential views for convenient includes
Most apps should include this module::
pyramid_config.include('wuttaweb.views.essential')
pyramid_config.include("wuttaweb.views.essential")
That will in turn include the following modules:
@ -38,6 +38,26 @@ That will in turn include the following modules:
* :mod:`wuttaweb.views.roles`
* :mod:`wuttaweb.views.users`
* :mod:`wuttaweb.views.upgrades`
* :mod:`wuttaweb.views.tables`
* :mod:`wuttaweb.views.alembic`
You can also selectively override some modules while keeping most
defaults.
That uses a slightly different approach. First here is how to do it
while keeping all defaults::
from wuttaweb.views import essential
essential.defaults(pyramid_config)
To override, specify the default module with your preferred alias like
so::
essential.defaults(pyramid_config, **{
"wuttaweb.views.users": "poser.web.views.users",
"wuttaweb.views.upgrades": "poser.web.views.upgrades",
})
"""
@ -55,6 +75,8 @@ def defaults(config, **kwargs): # pylint: disable=missing-function-docstring
config.include(mod("wuttaweb.views.roles"))
config.include(mod("wuttaweb.views.users"))
config.include(mod("wuttaweb.views.upgrades"))
config.include(mod("wuttaweb.views.tables"))
config.include(mod("wuttaweb.views.alembic"))
def includeme(config): # pylint: disable=missing-function-docstring

View file

@ -29,6 +29,7 @@ import datetime
import logging
import os
import threading
import warnings
import sqlalchemy as sa
from sqlalchemy import orm
@ -560,66 +561,80 @@ class MasterView(View): # pylint: disable=too-many-public-methods
"""
View to "create" a new model record.
This usually corresponds to a URL like ``/widgets/new``.
This usually corresponds to URL like ``/widgets/new``
By default, this view is included only if :attr:`creatable` is
true.
By default, this route is included only if :attr:`creatable`
is true.
The default "create" view logic will show a form with field
widgets, allowing user to submit new values which are then
persisted to the DB (assuming typical SQLAlchemy model).
Subclass normally should not override this method, but rather
one of the related methods which are called (in)directly by
this one:
* :meth:`make_model_form()`
* :meth:`configure_form()`
* :meth:`create_save_form()`
* :meth:`redirect_after_create()`
The default logic calls :meth:`make_create_form()` and shows
that to the user. When they submit valid data, it calls
:meth:`save_create_form()` and then
:meth:`redirect_after_create()`.
"""
self.creating = True
session = self.Session()
form = self.make_model_form(cancel_url_fallback=self.get_index_url())
form = self.make_create_form()
if form.validate():
obj = self.create_save_form(form)
session.flush()
return self.redirect_after_create(obj)
try:
result = self.save_create_form(form)
except Exception as err: # pylint: disable=broad-exception-caught
log.warning("failed to save 'create' form", exc_info=True)
self.request.session.flash(f"Create failed: {err}", "error")
else:
return self.redirect_after_create(result)
context = {
"form": form,
}
context = {"form": form}
return self.render_to_response("create", context)
def create_save_form(self, form):
def make_create_form(self):
"""
This method is responsible for "converting" the validated form
data to a model instance, and then "saving" the result,
e.g. to DB. It is called by :meth:`create()`.
Make the "create" model form. This is called by
:meth:`create()`.
Subclass may override this, or any of the related methods
called by this one:
Default logic calls :meth:`make_model_form()`.
* :meth:`objectify()`
* :meth:`persist()`
:returns: Should return the resulting model instance, e.g. as
produced by :meth:`objectify()`.
:returns: :class:`~wuttaweb.forms.base.Form` instance
"""
return self.make_model_form(cancel_url_fallback=self.get_index_url())
def save_create_form(self, form):
"""
Save the "create" form. This is called by :meth:`create()`.
Default logic calls :meth:`objectify()` and then
:meth:`persist()`. Subclass is expected to override for
non-standard use cases.
As for return value, by default it will be whatever came back
from the ``objectify()`` call. In practice a subclass can
return whatever it likes. The value is only used as input to
:meth:`redirect_after_create()`.
:returns: Usually the model instance, but can be "anything"
"""
if hasattr(self, "create_save_form"): # pragma: no cover
warnings.warn(
"MasterView.create_save_form() method name is deprecated; "
f"please refactor to save_create_form() instead for {self.__class__.__name__}",
DeprecationWarning,
)
return self.create_save_form(form)
obj = self.objectify(form)
self.persist(obj)
return obj
def redirect_after_create(self, obj):
def redirect_after_create(self, result):
"""
Usually, this returns a redirect to which we send the user,
after a new model record has been created. By default this
sends them to the "view" page for the record.
Must return a redirect, following successful save of the
"create" form. This is called by :meth:`create()`.
It is called automatically by :meth:`create()`.
By default this redirects to the "view" page for the new
record.
:returns: :class:`~pyramid.httpexceptions.HTTPFound` instance
"""
return self.redirect(self.get_action_url("view", obj))
return self.redirect(self.get_action_url("view", result))
##############################
# view methods
@ -627,32 +642,30 @@ class MasterView(View): # pylint: disable=too-many-public-methods
def view(self):
"""
View to "view" details of an existing model record.
View to "view" a model record.
This usually corresponds to a URL like ``/widgets/XXX``
where ``XXX`` represents the key/ID for the record.
This usually corresponds to URL like ``/widgets/XXX``
By default, this view is included only if :attr:`viewable` is
By default, this route is included only if :attr:`viewable` is
true.
The default view logic will show a read-only form with field
values displayed.
The default logic here is as follows:
Subclass normally should not override this method, but rather
one of the related methods which are called (in)directly by
this one:
First, if :attr:`has_rows` is true then
:meth:`make_row_model_grid()` is called.
* :meth:`make_model_form()`
* :meth:`configure_form()`
* :meth:`make_row_model_grid()` - if :attr:`has_rows` is true
If ``has_rows`` is true *and* the request has certain special
params relating to the grid, control may exit early. Mainly
this happens when a "partial" page is requested, which means
we just return grid data and nothing else. (Used for backend
sorting and pagination etc.)
Otherwise :meth:`make_view_form()` is called, and the template
is rendered.
"""
self.viewing = True
obj = self.get_instance()
form = self.make_model_form(obj, readonly=True)
context = {
"instance": obj,
"form": form,
}
context = {"instance": obj}
if self.has_rows:
@ -677,47 +690,51 @@ class MasterView(View): # pylint: disable=too-many-public-methods
context["rows_grid"] = grid
context["form"] = self.make_view_form(obj)
context["xref_buttons"] = self.get_xref_buttons(obj)
return self.render_to_response("view", context)
def make_view_form(self, obj, readonly=True):
"""
Make the "view" model form. This is called by
:meth:`view()`.
Default logic calls :meth:`make_model_form()`.
:returns: :class:`~wuttaweb.forms.base.Form` instance
"""
return self.make_model_form(obj, readonly=readonly)
##############################
# edit methods
##############################
def edit(self):
"""
View to "edit" details of an existing model record.
View to "edit" a model record.
This usually corresponds to a URL like ``/widgets/XXX/edit``
where ``XXX`` represents the key/ID for the record.
This usually corresponds to URL like ``/widgets/XXX/edit``
By default, this view is included only if :attr:`editable` is
By default, this route is included only if :attr:`editable` is
true.
The default "edit" view logic will show a form with field
widgets, allowing user to modify and submit new values which
are then persisted to the DB (assuming typical SQLAlchemy
model).
Subclass normally should not override this method, but rather
one of the related methods which are called (in)directly by
this one:
* :meth:`make_model_form()`
* :meth:`configure_form()`
* :meth:`edit_save_form()`
The default logic calls :meth:`make_edit_form()` and shows
that to the user. When they submit valid data, it calls
:meth:`save_edit_form()` and then
:meth:`redirect_after_edit()`.
"""
self.editing = True
instance = self.get_instance()
form = self.make_model_form(
instance, cancel_url_fallback=self.get_action_url("view", instance)
)
form = self.make_edit_form(instance)
if form.validate():
self.edit_save_form(form)
return self.redirect(self.get_action_url("view", instance))
try:
result = self.save_edit_form(form)
except Exception as err: # pylint: disable=broad-exception-caught
log.warning("failed to save 'edit' form", exc_info=True)
self.request.session.flash(f"Edit failed: {err}", "error")
else:
return self.redirect_after_edit(result)
context = {
"instance": instance,
@ -725,51 +742,74 @@ class MasterView(View): # pylint: disable=too-many-public-methods
}
return self.render_to_response("edit", context)
def edit_save_form(self, form):
def make_edit_form(self, obj):
"""
This method is responsible for "converting" the validated form
data to a model instance, and then "saving" the result,
e.g. to DB. It is called by :meth:`edit()`.
Make the "edit" model form. This is called by
:meth:`edit()`.
Subclass may override this, or any of the related methods
called by this one:
Default logic calls :meth:`make_model_form()`.
* :meth:`objectify()`
* :meth:`persist()`
:returns: Should return the resulting model instance, e.g. as
produced by :meth:`objectify()`.
:returns: :class:`~wuttaweb.forms.base.Form` instance
"""
return self.make_model_form(
obj, cancel_url_fallback=self.get_action_url("view", obj)
)
def save_edit_form(self, form):
"""
Save the "edit" form. This is called by :meth:`edit()`.
Default logic calls :meth:`objectify()` and then
:meth:`persist()`. Subclass is expected to override for
non-standard use cases.
As for return value, by default it will be whatever came back
from the ``objectify()`` call. In practice a subclass can
return whatever it likes. The value is only used as input to
:meth:`redirect_after_edit()`.
:returns: Usually the model instance, but can be "anything"
"""
if hasattr(self, "edit_save_form"): # pragma: no cover
warnings.warn(
"MasterView.edit_save_form() method name is deprecated; "
f"please refactor to save_edit_form() instead for {self.__class__.__name__}",
DeprecationWarning,
)
return self.edit_save_form(form)
obj = self.objectify(form)
self.persist(obj)
return obj
def redirect_after_edit(self, result):
"""
Must return a redirect, following successful save of the
"edit" form. This is called by :meth:`edit()`.
By default this redirects to the "view" page for the record.
:returns: :class:`~pyramid.httpexceptions.HTTPFound` instance
"""
return self.redirect(self.get_action_url("view", result))
##############################
# delete methods
##############################
def delete(self):
"""
View to delete an existing model instance.
View to "delete" a model record.
This usually corresponds to a URL like ``/widgets/XXX/delete``
where ``XXX`` represents the key/ID for the record.
This usually corresponds to URL like ``/widgets/XXX/delete``
By default, this view is included only if :attr:`deletable` is
true.
By default, this route is included only if :attr:`deletable`
is true.
The default "delete" view logic will show a "psuedo-readonly"
form with no fields editable, but with a submit button so user
must confirm, before deletion actually occurs.
Subclass normally should not override this method, but rather
one of the related methods which are called (in)directly by
this one:
* :meth:`make_model_form()`
* :meth:`configure_form()`
* :meth:`delete_save_form()`
* :meth:`delete_instance()`
The default logic calls :meth:`make_delete_form()` and shows
that to the user. When they submit, it calls
:meth:`save_delete_form()` and then
:meth:`redirect_after_delete()`.
"""
self.deleting = True
instance = self.get_instance()
@ -777,21 +817,20 @@ class MasterView(View): # pylint: disable=too-many-public-methods
if not self.is_deletable(instance):
return self.redirect(self.get_action_url("view", instance))
# nb. this form proper is not readonly..
form = self.make_model_form(
instance,
cancel_url_fallback=self.get_action_url("view", instance),
button_label_submit="DELETE Forever",
button_icon_submit="trash",
button_type_submit="is-danger",
)
# ..but *all* fields are readonly
form.readonly_fields = set(form.fields)
form = self.make_delete_form(instance)
# nb. validate() often returns empty dict here
if form.validate() is not False:
self.delete_save_form(form)
return self.redirect(self.get_index_url())
try:
result = self.save_delete_form( # pylint: disable=assignment-from-none
form
)
except Exception as err: # pylint: disable=broad-exception-caught
log.warning("failed to save 'delete' form", exc_info=True)
self.request.session.flash(f"Delete failed: {err}", "error")
else:
return self.redirect_after_delete(result)
context = {
"instance": instance,
@ -799,19 +838,70 @@ class MasterView(View): # pylint: disable=too-many-public-methods
}
return self.render_to_response("delete", context)
def delete_save_form(self, form):
def make_delete_form(self, obj):
"""
Perform the delete operation(s) based on the given form data.
Make the "delete" model form. This is called by
:meth:`delete()`.
Default logic simply calls :meth:`delete_instance()` on the
form's :attr:`~wuttaweb.forms.base.Form.model_instance`.
Default logic calls :meth:`make_model_form()` but with a
twist:
This method is called by :meth:`delete()` after it has
validated the form.
The form proper is *not* readonly; this ensures the form has a
submit button etc. But then all fields in the form are
explicitly marked readonly.
:returns: :class:`~wuttaweb.forms.base.Form` instance
"""
# nb. this form proper is not readonly..
form = self.make_model_form(
obj,
cancel_url_fallback=self.get_action_url("view", obj),
button_label_submit="DELETE Forever",
button_icon_submit="trash",
button_type_submit="is-danger",
)
# ..but *all* fields are readonly
form.readonly_fields = set(form.fields)
return form
def save_delete_form(self, form):
"""
Save the "delete" form. This is called by :meth:`delete()`.
Default logic calls :meth:`delete_instance()`. Normally
subclass would override that for non-standard use cases, but
it could also/instead override this method.
As for return value, by default this returns ``None``. In
practice a subclass can return whatever it likes. The value
is only used as input to :meth:`redirect_after_delete()`.
:returns: Usually ``None``, but can be "anything"
"""
if hasattr(self, "delete_save_form"): # pragma: no cover
warnings.warn(
"MasterView.delete_save_form() method name is deprecated; "
f"please refactor to save_delete_form() instead for {self.__class__.__name__}",
DeprecationWarning,
)
self.delete_save_form(form)
return
obj = form.model_instance
self.delete_instance(obj)
def redirect_after_delete(self, result): # pylint: disable=unused-argument
"""
Must return a redirect, following successful save of the
"delete" form. This is called by :meth:`delete()`.
By default this redirects back to the :meth:`index()` page.
:returns: :class:`~pyramid.httpexceptions.HTTPFound` instance
"""
return self.redirect(self.get_index_url())
def delete_instance(self, obj):
"""
Delete the given model instance.
@ -820,7 +910,7 @@ class MasterView(View): # pylint: disable=too-many-public-methods
raise ``NotImplementedError``. Subclass should override if
needed.
This method is called by :meth:`delete_save_form()`.
This method is called by :meth:`save_delete_form()`.
"""
session = self.app.get_session(obj)
session.delete(obj)
@ -2787,21 +2877,34 @@ class MasterView(View): # pylint: disable=too-many-public-methods
"""
return True
def make_model_form(self, model_instance=None, **kwargs):
def make_model_form(self, model_instance=None, fields=None, **kwargs):
"""
Create and return a :class:`~wuttaweb.forms.base.Form`
for the view model.
Make a form for the "model" represented by this subclass.
Note that this method is called for multiple "CRUD" views,
e.g.:
This method is normally called by all CRUD views:
* :meth:`create()`
* :meth:`view()`
* :meth:`edit()`
* :meth:`delete()`
See also related methods, which are called by this one:
The form need not have a ``model_instance``, as in the case of
:meth:`create()`. And it can be readonly as in the case of
:meth:`view()` and :meth:`delete()`.
* :meth:`get_form_fields()`
* :meth:`configure_form()`
If ``fields`` are not provided, :meth:`get_form_fields()` is
called. Usually a subclass will define :attr:`form_fields`
but it's only required if :attr:`model_class` is not set.
Then :meth:`configure_form()` is called, so subclass can go
crazy with that as needed.
:param model_instance: Model instance/record with which to
initialize the form data. Not needed for "create" forms.
:param fields: Optional fields list for the form.
:returns: :class:`~wuttaweb.forms.base.Form` instance
"""
if "model_class" not in kwargs:
model_class = self.get_model_class()
@ -2810,10 +2913,10 @@ class MasterView(View): # pylint: disable=too-many-public-methods
kwargs["model_instance"] = model_instance
if not kwargs.get("fields"):
if not fields:
fields = self.get_form_fields()
if fields:
kwargs["fields"] = fields
if fields:
kwargs["fields"] = fields
form = self.make_form(**kwargs)
self.configure_form(form)
@ -2864,6 +2967,8 @@ class MasterView(View): # pylint: disable=too-many-public-methods
self.set_labels(form)
# mark key fields as readonly to prevent edit. see also
# related comments in the objectify() method
if self.editing:
for key in self.get_model_key():
form.set_readonly(key)
@ -2881,23 +2986,41 @@ class MasterView(View): # pylint: disable=too-many-public-methods
This is called by various other form-saving methods:
* :meth:`edit_save_form()`
* :meth:`save_create_form()`
* :meth:`save_edit_form()`
* :meth:`create_row_save_form()`
See also :meth:`persist()`.
:param form: Reference to the *already validated*
:class:`~wuttaweb.forms.base.Form` object. See the form's
:attr:`~wuttaweb.forms.base.Form.validated` attribute for
the data.
"""
# use ColanderAlchemy magic if possible
# ColanderAlchemy schema has an objectify() method which will
# return a populated model instance
schema = form.get_schema()
if hasattr(schema, "objectify"):
# this returns a model instance
return schema.objectify(form.validated, context=form.model_instance)
# otherwise return data dict as-is
return form.validated
# at this point we likely have no model class, so have to
# assume we're operating on a simple dict record. we (mostly)
# want to return that as-is, unless subclass overrides.
data = dict(form.validated)
# nb. we have a unique scenario when *editing* for a simple
# dict record (no model class). we mark the key fields as
# readonly in configure_form(), so they aren't part of the
# data here, but we need to add them back for sake of
# e.g. generating the 'view' route kwargs for redirect.
if self.editing:
obj = self.get_instance()
for key in self.get_model_key():
if key not in data:
data[key] = obj[key]
return data
def persist(self, obj, session=None):
"""
@ -2914,7 +3037,8 @@ class MasterView(View): # pylint: disable=too-many-public-methods
:param obj: Model instance object as produced by
:meth:`objectify()`.
See also :meth:`edit_save_form()` which calls this method.
See also :meth:`save_create_form()` and
:meth:`save_edit_form()`, which call this method.
"""
model = self.app.model
model_class = self.get_model_class()
@ -3215,8 +3339,8 @@ class MasterView(View): # pylint: disable=too-many-public-methods
form = self.make_row_model_form(cancel_url_fallback=parent_url)
if form.validate():
row = self.create_row_save_form(form)
return self.redirect_after_create_row(row)
result = self.create_row_save_form(form)
return self.redirect_after_create_row(result)
index_link = tags.link_to(self.get_index_title(), self.get_index_url())
parent_link = tags.link_to(self.get_instance_title(parent), parent_url)

View file

@ -0,0 +1,406 @@
# -*- coding: utf-8; -*-
################################################################################
#
# wuttaweb -- Web App for Wutta Framework
# Copyright © 2024-2025 Lance Edgar
#
# This file is part of Wutta Framework.
#
# Wutta Framework is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# Wutta Framework is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License along with
# Wutta Framework. If not, see <http://www.gnu.org/licenses/>.
#
################################################################################
"""
Table Views
"""
import os
from alembic import command as alembic_command
from sqlalchemy_utils import get_mapper
from mako.lookup import TemplateLookup
from webhelpers2.html import HTML
from wuttjamaican.db.conf import (
check_alembic_current,
make_alembic_config,
get_alembic_scriptdir,
)
from wuttaweb.views import MasterView
class AppTableView(MasterView): # pylint: disable=abstract-method
"""
Master view showing all tables in the :term:`app database`.
Default route prefix is ``tables``.
Notable URLs provided by this class:
* ``/tables/``
"""
# pylint: disable=duplicate-code
model_name = "app_table"
model_title = "App Table"
model_key = "name"
url_prefix = "/tables/app"
filterable = False
sortable = True
sort_on_backend = False
paginated = True
paginate_on_backend = False
creatable = True
editable = False
deletable = False
# pylint: enable=duplicate-code
labels = {
"name": "Table Name",
}
grid_columns = [
"name",
"schema",
# "row_count",
]
sort_defaults = "name"
form_fields = [
"name",
"schema",
# "row_count",
]
has_rows = True
rows_title = "Columns"
rows_filterable = False
rows_sort_defaults = "sequence"
rows_sort_on_backend = False
rows_paginated = True
rows_paginate_on_backend = False
rows_viewable = False
row_grid_columns = [
"sequence",
"column_name",
"data_type",
"nullable",
"description",
]
def get_grid_data( # pylint: disable=empty-docstring
self, columns=None, session=None
):
""" """
model = self.app.model
data = []
for table in model.Base.metadata.tables.values():
data.append(
{
"name": table.name,
"schema": table.schema or "",
# "row_count": 42,
}
)
return data
def configure_grid(self, grid): # pylint: disable=empty-docstring
""" """
g = grid
super().configure_grid(g)
# nb. show more tables by default
g.pagesize = 50
# schema
g.set_searchable("schema")
# name
g.set_searchable("name")
g.set_link("name")
def get_instance( # pylint: disable=empty-docstring,arguments-differ,unused-argument
self, **kwargs
):
""" """
if "_cached_instance" not in self.__dict__:
model = self.app.model
name = self.request.matchdict["name"]
table = model.Base.metadata.tables[name]
data = {
"name": table.name,
"schema": table.schema or "",
# "row_count": 42,
"table": table,
}
self.__dict__["_cached_instance"] = data
return self.__dict__["_cached_instance"]
def get_instance_title(self, instance): # pylint: disable=empty-docstring
""" """
return instance["name"]
def get_row_grid_data(self, obj): # pylint: disable=empty-docstring
""" """
table = obj
data = []
for i, column in enumerate(table["table"].columns, 1):
data.append(
{
"column": column,
"sequence": i,
"column_name": column.name,
"data_type": str(repr(column.type)),
"nullable": column.nullable,
"description": (column.doc or "").strip(),
}
)
return data
def configure_row_grid(self, grid): # pylint: disable=empty-docstring
""" """
g = grid
super().configure_row_grid(g)
# nb. try not to hide any columns by default
g.pagesize = 100
# sequence
g.set_label("sequence", "Seq.")
# column_name
g.set_searchable("column_name")
# data_type
g.set_searchable("data_type")
# nullable
g.set_renderer("nullable", "boolean")
# description
g.set_searchable("description")
g.set_renderer("description", self.render_column_description)
def render_column_description( # pylint: disable=missing-function-docstring,unused-argument
self, column, field, value
):
if not value:
return ""
max_length = 100
if len(value) <= max_length:
return value
return HTML.tag("span", title=value, c=f"{value[:max_length]} ...")
def get_template_context(self, context): # pylint: disable=empty-docstring
""" """
if self.creating:
model = self.app.model
# alembic current
context["alembic_is_current"] = check_alembic_current(self.config)
# existing tables
# TODO: any reason this should check grid data instead of metadata?
unwanted = ["transaction", "transaction_meta"]
context["existing_tables"] = [
{"name": table}
for table in sorted(model.Base.metadata.tables)
if table not in unwanted and not table.endswith("_version")
]
# model dir
context["model_dir"] = os.path.dirname(model.__file__)
# migration branch
script = get_alembic_scriptdir(self.config)
branch_options = self.get_migration_branch_options(script)
context["migration_branch_options"] = branch_options
branch = self.config.get(
f"{self.config.appname}.alembic.default_revise_branch"
)
if not branch and len(branch_options) == 1:
branch = branch_options[0]
context["migration_branch"] = branch
return context
# TODO: this is effectivey duplicated in AlembicMigrationView.get_revise_branch_options()
def get_migration_branch_options( # pylint: disable=missing-function-docstring
self, script
):
branches = set()
for rev in script.get_revisions(script.get_heads()):
branches.update(rev.branch_labels)
return sorted(branches)
def wizard_action(self): # pylint: disable=too-many-return-statements
"""
AJAX view to handle various actions for the "new table" wizard.
"""
data = self.request.json_body
action = data.get("action", "").strip()
try:
# nb. cannot use match/case statement until python 3.10, but this
# project technically still supports python 3.8
if action == "write_model_file":
return self.write_model_file(data)
if action == "check_model":
return self.check_model(data)
if action == "write_revision_script":
return self.write_revision_script(data)
if action == "migrate_db":
return self.migrate_db(data)
if action == "check_table":
return self.check_table(data)
if action == "":
return {"error": "Must specify the action to perform."}
return {"error": f"Unknown action requested: {action}"}
except Exception as err: # pylint: disable=broad-exception-caught
return {"error": f"Unexpected error occurred: {err}"}
def write_model_file(self, data): # pylint: disable=missing-function-docstring
model = self.app.model
path = data["module_file"]
if os.path.exists(path):
if data["overwrite"]:
os.remove(path)
else:
return {"error": "File already exists"}
for column in data["columns"]:
if column["data_type"]["type"] == "_fk_uuid_" and column["relationship"]:
name = column["relationship"]
table = model.Base.metadata.tables[column["data_type"]["reference"]]
mapper = get_mapper(table)
reference_model = mapper.class_.__name__
column["relationship"] = {
"name": name,
"reference_model": reference_model,
}
# TODO: make templates dir configurable?
templates = [self.app.resource_path("wuttaweb:code-templates")]
table_templates = TemplateLookup(directories=templates)
template = table_templates.get_template("/new-table.mako")
content = template.render(**data)
with open(path, "wt", encoding="utf_8") as f:
f.write(content)
return {}
def check_model(self, data): # pylint: disable=missing-function-docstring
model = self.app.model
model_name = data["model_name"]
if not hasattr(model, model_name):
return {
"problem": "class not found in app model",
"model": model.__name__,
}
return {}
def write_revision_script(self, data): # pylint: disable=missing-function-docstring
alembic_config = make_alembic_config(self.config)
script = alembic_command.revision(
alembic_config,
autogenerate=True,
head=f"{data['branch']}@head",
message=data["message"],
)
return {"script": script.path}
def migrate_db( # pylint: disable=missing-function-docstring,unused-argument
self, data
):
alembic_config = make_alembic_config(self.config)
alembic_command.upgrade(alembic_config, "heads")
return {}
def check_table(self, data): # pylint: disable=missing-function-docstring
model = self.app.model
name = data["name"]
table = model.Base.metadata.tables.get(name)
if table is None:
return {"problem": "table does not exist in app model"}
session = self.Session()
count = session.query(table).count()
route_prefix = self.get_route_prefix()
url = self.request.route_url(f"{route_prefix}.view", name=name)
return {"url": url, "count": count}
@classmethod
def defaults(cls, config): # pylint: disable=empty-docstring
""" """
cls._apptable_defaults(config)
cls._defaults(config)
@classmethod
def _apptable_defaults(cls, config):
route_prefix = cls.get_route_prefix()
permission_prefix = cls.get_permission_prefix()
model_title_plural = cls.get_model_title_plural()
url_prefix = cls.get_url_prefix()
# fix permission group
config.add_wutta_permission_group(
permission_prefix, model_title_plural, overwrite=False
)
# wizard actions
config.add_route(
f"{route_prefix}.wizard_action",
f"{url_prefix}/new/wizard-action",
request_method="POST",
)
config.add_view(
cls,
attr="wizard_action",
route_name=f"{route_prefix}.wizard_action",
renderer="json",
permission=f"{permission_prefix}.create",
)
def defaults(config, **kwargs): # pylint: disable=missing-function-docstring
base = globals()
AppTableView = kwargs.get( # pylint: disable=invalid-name,redefined-outer-name
"AppTableView", base["AppTableView"]
)
AppTableView.defaults(config)
def includeme(config): # pylint: disable=missing-function-docstring
defaults(config)

View file

@ -0,0 +1,9 @@
## -*- coding: utf-8; -*-
<%inherit file="/form.mako" />
<%def name="page_content()">
RANDOM TEXT
${parent.page_content()}
</%def>

View file

@ -0,0 +1,11 @@
## -*- coding: utf-8; -*-
<%inherit file="/form.mako" />
<%def name="form_vue_fields()">
SOMETHING CRAZY
<b-field label="name">
<b-input name="name" v-model="${form.get_field_vmodel('name')}" />
</b-field>
</%def>

View file

@ -1,48 +1,28 @@
# -*- coding: utf-8; -*-
from unittest import TestCase
import os
from unittest.mock import MagicMock, patch
import sqlalchemy as sa
import colander
import deform
from pyramid import testing
from pyramid.renderers import get_renderer
from wuttjamaican.conf import WuttaConfig
from wuttaweb.forms import base, widgets
from wuttaweb import helpers, subscribers
from wuttaweb.forms import base as mod, widgets
from wuttaweb.grids import Grid
from wuttaweb.testing import WebTestCase
class TestForm(TestCase):
here = os.path.dirname(__file__)
def setUp(self):
self.config = WuttaConfig(
defaults={
"wutta.web.menus.handler_spec": "tests.util:NullMenuHandler",
}
)
self.app = self.config.get_app()
self.request = testing.DummyRequest(wutta_config=self.config, use_oruga=False)
self.pyramid_config = testing.setUp(
request=self.request,
settings={
"wutta_config": self.config,
"mako.directories": ["wuttaweb:templates"],
"pyramid_deform.template_search_path": "wuttaweb:templates/deform",
},
)
class TestForm(WebTestCase):
event = MagicMock(request=self.request)
subscribers.new_request(event)
def tearDown(self):
testing.tearDown()
mako_directories = ["wuttaweb:templates", here]
def make_form(self, **kwargs):
return base.Form(self.request, **kwargs)
return mod.Form(self.request, **kwargs)
def make_schema(self):
schema = colander.Schema(
@ -279,7 +259,7 @@ class TestForm(TestCase):
# but auto-generating without fields is not supported
form = self.make_form()
self.assertIsNone(form.schema)
self.assertRaises(NotImplementedError, form.get_schema)
self.assertRaises(ValueError, form.get_schema)
# schema is auto-generated if model_class provided
form = self.make_form(model_class=model.Setting)
@ -541,6 +521,40 @@ class TestForm(TestCase):
self.assertIn("<script>", html)
self.assertIn("Vue.component('wutta-form', WuttaForm)", html)
def test_get_field_vmodel(self):
model = self.app.model
form = self.make_form(model_class=model.Setting)
result = form.get_field_vmodel("name")
self.assertEqual(result, "modelData.deformField1")
def test_render_vue_fields(self):
model = self.app.model
form = self.make_form(model_class=model.Setting)
context = form.get_vue_context()
# standard behavior
html = form.render_vue_fields(context)
self.assertIn("<b-field", html)
self.assertNotIn("SOMETHING CRAZY", html)
self.assertNotIn("RANDOM TEXT", html)
# declare main template, so form will look for the fields def
# (but this template has no def)
template = get_renderer("/main_template.mako").template
with patch.dict(context, {"main_template": template}):
html = form.render_vue_fields(context)
self.assertIn("<b-field", html)
self.assertNotIn("SOMETHING CRAZY", html)
self.assertNotIn("RANDOM TEXT", html)
# now use a main template which has the fields def
template = get_renderer("/main_template_with_fields.mako").template
with patch.dict(context, {"main_template": template}):
html = form.render_vue_fields(context)
self.assertIn("<b-field", html)
self.assertIn("SOMETHING CRAZY", html)
self.assertNotIn("RANDOM TEXT", html)
def test_render_vue_field(self):
self.pyramid_config.include("pyramid_deform")
schema = self.make_schema()

View file

@ -95,6 +95,38 @@ class TestObjectRefWidget(WebTestCase):
self.assertNotIn("url", values)
class TestCopyableTextWidget(WebTestCase):
def make_field(self, node, **kwargs):
# TODO: not sure why default renderer is in use even though
# pyramid_deform was included in setup? but this works..
kwargs.setdefault("renderer", deform.Form.default_renderer)
return deform.Field(node, **kwargs)
def make_widget(self, **kwargs):
return mod.CopyableTextWidget(**kwargs)
def test_serialize(self):
node = colander.SchemaNode(colander.String())
field = self.make_field(node)
widget = self.make_widget()
self.assertIs(widget.serialize(field, colander.null), colander.null)
self.assertIs(widget.serialize(field, None), colander.null)
self.assertIs(widget.serialize(field, ""), colander.null)
result = widget.serialize(field, "hello world")
self.assertEqual(
result, '<wutta-copyable-text text="hello world"></wutta-copyable-text>'
)
def test_deserialize(self):
node = colander.SchemaNode(colander.String())
field = self.make_field(node)
widget = self.make_widget()
self.assertRaises(NotImplementedError, widget.deserialize, field, "hello world")
class TestWuttaDateWidget(WebTestCase):
def make_field(self, node, **kwargs):
@ -432,3 +464,98 @@ class TestBatchIdWidget(WebTestCase):
result = widget.serialize(field, 42)
self.assertEqual(result, "00000042")
class TestAlembicRevisionWidget(WebTestCase):
def setUp(self):
super().setUp()
self.pyramid_config.add_route(
"alembic.migrations.view", "/alembic/migrations/{revision}"
)
def make_field(self, node, **kwargs):
# TODO: not sure why default renderer is in use even though
# pyramid_deform was included in setup? but this works..
kwargs.setdefault("renderer", deform.Form.default_renderer)
return deform.Field(node, **kwargs)
def make_widget(self, **kwargs):
return mod.AlembicRevisionWidget(self.request, **kwargs)
def test_serialize(self):
node = colander.SchemaNode(colander.String())
field = self.make_field(node)
widget = self.make_widget()
html = widget.serialize(field, colander.null)
self.assertIs(html, colander.null)
html = widget.serialize(field, None)
self.assertIs(html, colander.null)
html = widget.serialize(field, "")
self.assertIs(html, colander.null)
html = widget.serialize(field, "fc3a3bcaa069")
self.assertEqual(
html,
'<a href="http://example.com/alembic/migrations/fc3a3bcaa069">fc3a3bcaa069</a>',
)
def test_deserialize(self):
node = colander.SchemaNode(colander.String())
field = self.make_field(node)
widget = self.make_widget()
self.assertRaises(
NotImplementedError, widget.deserialize, field, "fc3a3bcaa069"
)
class TestAlembicRevisionsWidget(WebTestCase):
def setUp(self):
super().setUp()
self.pyramid_config.add_route(
"alembic.migrations.view", "/alembic/migrations/{revision}"
)
def make_field(self, node, **kwargs):
# TODO: not sure why default renderer is in use even though
# pyramid_deform was included in setup? but this works..
kwargs.setdefault("renderer", deform.Form.default_renderer)
return deform.Field(node, **kwargs)
def make_widget(self, **kwargs):
return mod.AlembicRevisionsWidget(self.request, **kwargs)
def test_serialize(self):
node = colander.SchemaNode(colander.String())
field = self.make_field(node)
widget = self.make_widget()
html = widget.serialize(field, colander.null)
self.assertIs(html, colander.null)
html = widget.serialize(field, None)
self.assertIs(html, colander.null)
html = widget.serialize(field, "")
self.assertIs(html, colander.null)
html = widget.serialize(field, "fc3a3bcaa069")
self.assertEqual(
html,
'<a href="http://example.com/alembic/migrations/fc3a3bcaa069">fc3a3bcaa069</a>',
)
html = widget.serialize(field, "fc3a3bcaa069, d686f7abe3e0")
self.assertEqual(
html,
'<a href="http://example.com/alembic/migrations/fc3a3bcaa069">fc3a3bcaa069</a>, '
'<a href="http://example.com/alembic/migrations/d686f7abe3e0">d686f7abe3e0</a>',
)
def test_deserialize(self):
node = colander.SchemaNode(colander.String())
field = self.make_field(node)
widget = self.make_widget()
self.assertRaises(
NotImplementedError, widget.deserialize, field, "fc3a3bcaa069"
)

460
tests/views/test_alembic.py Normal file
View file

@ -0,0 +1,460 @@
# -*- coding: utf-8; -*-
import os
from unittest.mock import patch
import sqlalchemy as sa
from alembic import command as alembic_command
from wuttjamaican.testing import ConfigTestCase
from wuttjamaican.db.conf import (
get_alembic_scriptdir,
check_alembic_current,
make_alembic_config,
)
import colander
from pyramid.httpexceptions import HTTPNotFound, HTTPFound
from wuttaweb.views import alembic as mod
from wuttaweb.forms import Form
from wuttaweb.testing import WebTestCase
from wuttaweb.forms.widgets import AlembicRevisionWidget
class TestNormalizeRevision(ConfigTestCase):
def test_basic(self):
self.config.setdefault("alembic.script_location", "wuttjamaican.db:alembic")
self.config.setdefault(
"alembic.version_locations", "wuttjamaican.db:alembic/versions"
)
script = get_alembic_scriptdir(self.config)
head = script.get_heads()[0]
rev = script.get_revision(head)
result = mod.normalize_revision(self.config, rev)
self.assertIsInstance(result, dict)
self.assertIn("revision", result)
self.assertEqual(result["revision"], rev.revision)
class TestAlembicMigrationView(WebTestCase):
def setUp(self):
super().setUp()
self.config.setdefault("alembic.script_location", "wuttjamaican.db:alembic")
self.config.setdefault(
"alembic.version_locations", "wuttjamaican.db:alembic/versions"
)
def make_view(self):
return mod.AlembicMigrationView(self.request)
def test_includeme(self):
self.pyramid_config.include("wuttaweb.views.alembic")
def test_get_grid_data(self):
view = self.make_view()
data = view.get_grid_data()
self.assertIsInstance(data, list)
self.assertTrue(data) # 1+ items
rev = data[0]
self.assertIn("revision", rev)
self.assertIn("down_revision", rev)
self.assertIn("doc", rev)
def test_configure_grid(self):
view = self.make_view()
grid = view.make_model_grid()
self.assertIn("revision", grid.searchable_columns)
self.assertIn("doc", grid.searchable_columns)
def test_render_is_head(self):
view = self.make_view()
# missing field / empty default
rev = {"revision": "foo"}
self.assertEqual(view.render_is_head(rev, "is_head", None), "")
# boolean true
rev = {"revision": "foo", "is_head": True}
self.assertEqual(view.render_is_head(rev, "is_head", True), "Yes")
# boolean false
rev = {"revision": "foo", "is_head": False}
self.assertEqual(view.render_is_head(rev, "is_head", False), "")
def test_get_instance(self):
view = self.make_view()
with patch.object(self.request, "matchdict", new={"revision": "fc3a3bcaa069"}):
rev1 = view.get_instance()
self.assertIsInstance(rev1, dict)
self.assertIn("revision", rev1)
self.assertEqual(rev1["revision"], "fc3a3bcaa069")
self.assertIn("doc", rev1)
rev2 = view.get_instance()
self.assertIs(rev2, rev1)
self.assertEqual(rev2["revision"], "fc3a3bcaa069")
view = self.make_view()
with patch.object(self.request, "matchdict", new={"revision": "invalid"}):
self.assertRaises(HTTPNotFound, view.get_instance)
def test_get_instance_title(self):
view = self.make_view()
rev = {
"revision": "fc3a3bcaa069",
"doc": "init with settings table",
"branch_labels": "wutta",
}
self.assertEqual(
view.get_instance_title(rev), "(wutta) init with settings table"
)
rev = {
"revision": "fc3a3bcaa069",
"doc": "init with settings table",
"branch_labels": "wutta",
"is_head": True,
}
self.assertEqual(
view.get_instance_title(rev), "(wutta) init with settings table [head]"
)
def test_configure_form(self):
view = self.make_view()
# sanity / coverage
with patch.object(self.request, "matchdict", new={"revision": "fc3a3bcaa069"}):
rev = view.get_instance()
form = view.make_model_form(rev)
self.assertIsInstance(form.widgets["down_revision"], AlembicRevisionWidget)
def test_make_create_form(self):
self.pyramid_config.add_route("alembic.migrations", "/alembic/migrations/")
view = self.make_view()
# sanity / coverage
form = view.make_create_form()
self.assertIsInstance(form, Form)
self.assertIn("branching_option", form)
def test_validate_revise_branch(self):
self.pyramid_config.add_route("alembic.migrations", "/alembic/migrations/")
view = self.make_view()
form = view.make_create_form()
schema = form.get_schema()
# good example
self.assertIsNone(
view.validate_revise_branch(
schema,
{
"branching_option": "revise",
"revise_branch": "wutta",
},
)
)
# branch is required
self.assertRaises(
colander.Invalid,
view.validate_revise_branch,
schema,
{
"branching_option": "revise",
"revise_branch": None,
},
)
def test_validate_new_branch(self):
self.pyramid_config.add_route("alembic.migrations", "/alembic/migrations/")
view = self.make_view()
form = view.make_create_form()
schema = form.get_schema()
# good example
self.assertIsNone(
view.validate_revise_branch(
schema,
{
"branching_option": "new",
"new_branch": "poser",
"version_location": "wuttjamaican.db:alembic/versions",
},
)
)
# name is required
self.assertRaises(
colander.Invalid,
view.validate_new_branch,
schema,
{
"branching_option": "new",
"new_branch": None,
"version_location": "wuttjamaican.db:alembic/versions",
},
)
# version_location is required
self.assertRaises(
colander.Invalid,
view.validate_new_branch,
schema,
{
"branching_option": "new",
"new_branch": "poser",
"version_location": None,
},
)
def test_save_create_form(self):
self.pyramid_config.add_route("alembic.migrations", "/alembic/migrations/")
self.pyramid_config.add_route("alembic.dashboard", "/alembic/dashboard")
view = self.make_view()
form = view.make_create_form()
# revise branch
form.validated = {
"description": "test revision",
"autogenerate": False,
"branching_option": "revise",
"revise_branch": "wutta",
}
revision = view.save_create_form(form)
# file was saved in wutta dir
self.assertTrue(
revision.path.startswith(
self.app.resource_path("wuttjamaican.db:alembic/versions")
)
)
# get rid of that file!
os.remove(revision.path)
# new branch
form.validated = {
"description": "test revision",
"autogenerate": False,
"branching_option": "new",
"new_branch": "wuttatest",
"version_location": "wuttjamaican.db:alembic/versions",
}
revision = view.save_create_form(form)
# file was saved in wutta dir
self.assertTrue(
revision.path.startswith(
self.app.resource_path("wuttjamaican.db:alembic/versions")
)
)
# get rid of that file!
os.remove(revision.path)
def test_save_delete_form(self):
self.pyramid_config.add_route(
"alembic.migrations.view", "/alembic/migrations/{revision}"
)
view = self.make_view()
alembic = make_alembic_config(self.config)
# write new empty migration script
revision = alembic_command.revision(
alembic,
head="base",
branch_label="wuttatest",
version_path=self.app.resource_path("wuttjamaican.db:alembic/versions"),
message="test revision",
)
# script exists
self.assertTrue(os.path.exists(revision.path))
with patch.object(
self.request, "matchdict", new={"revision": revision.revision}
):
form = view.make_delete_form(revision)
view.save_delete_form(form)
# script gone
self.assertFalse(os.path.exists(revision.path))
def test_configure(self):
self.pyramid_config.add_route("home", "/")
self.pyramid_config.add_route("login", "/auth/login")
self.pyramid_config.add_route("alembic.migrations", "/alembic/migrations")
view = self.make_view()
# sanity/coverage
view.configure()
class TestAlembicDashboardView(WebTestCase):
def make_config(self, **kwargs):
sqlite_path = self.write_file("test.sqlite", "")
self.sqlite_engine_url = f"sqlite:///{sqlite_path}"
config_path = self.write_file(
"test.ini",
f"""
[wutta.db]
default.url = {self.sqlite_engine_url}
[alembic]
script_location = wuttjamaican.db:alembic
version_locations = wuttjamaican.db:alembic/versions
""",
)
return super().make_config([config_path], **kwargs)
def make_view(self):
return mod.AlembicDashboardView(self.request)
def test_dashboard(self):
self.pyramid_config.add_route(
"alembic.migrations.view", "/alembic/migrations/{revision}"
)
view = self.make_view()
alembic = make_alembic_config(self.config)
# tests use MetaData.create_all() instead of migrations for
# setup, so alembic will assume db is not current
self.assertFalse(check_alembic_current(self.config, alembic))
# and to further prove the point, alembic_version table is missing
self.assertEqual(
self.session.execute(sa.text("select count(*) from person")).scalar(),
0,
)
self.assertRaises(
sa.exc.OperationalError,
self.session.execute,
sa.text("select count(*) from alembic_version"),
)
# therefore dashboard shows db with no heads at first
context = view.dashboard()
self.assertIsInstance(context, dict)
self.assertIn("script_heads", context)
self.assertEqual(len(context["script_heads"]), 1)
self.assertIn("db_heads", context)
self.assertEqual(len(context["db_heads"]), 0)
# but we can 'stamp' the db as current
alembic_command.stamp(alembic, "heads")
# now the alembic_version table exists
self.assertEqual(
self.session.execute(
sa.text("select count(*) from alembic_version")
).scalar(),
1,
)
self.assertTrue(check_alembic_current(self.config, alembic))
# and the dashboard knows about db heads
context = view.dashboard()
self.assertEqual(len(context["script_heads"]), 1)
self.assertEqual(len(context["db_heads"]), 1)
def test_migrate(self):
self.pyramid_config.add_route("alembic.dashboard", "/alembic/dashboard")
view = self.make_view()
# tell alembic our db is already current
alembic = make_alembic_config(self.config)
alembic_command.stamp(alembic, "heads")
self.assertTrue(check_alembic_current(self.config, alembic))
# GET request redirects to dashboard w/ no flash
result = view.migrate()
self.assertIsInstance(result, HTTPFound)
self.assertEqual(result.location, "http://example.com/alembic/dashboard")
self.assertFalse(self.request.session.peek_flash())
self.assertFalse(self.request.session.peek_flash("error"))
# POST with no revspec also redirects but w/ flash
with patch.object(self.request, "method", new="POST"):
result = view.migrate()
self.assertIsInstance(result, HTTPFound)
self.assertEqual(result.location, "http://example.com/alembic/dashboard")
self.assertFalse(self.request.session.peek_flash())
self.assertTrue(self.request.session.peek_flash("error"))
self.assertEqual(
self.request.session.pop_flash("error"),
["You must provide a target revspec."],
)
# force downgrade to wutta@-1
with patch.object(self.request, "method", new="POST"):
with patch.object(
self.request,
"POST",
new={"direction": "downgrade", "revspec": "wutta@-1"},
):
# nb. this still redirects but w/ different flash
result = view.migrate()
self.assertIsInstance(result, HTTPFound)
self.assertEqual(
result.location, "http://example.com/alembic/dashboard"
)
self.assertTrue(self.request.session.peek_flash())
self.assertFalse(self.request.session.peek_flash("error"))
self.assertEqual(
self.request.session.pop_flash(),
["Database has been migrated."],
)
# alembic should know our db is no longer current
self.assertFalse(check_alembic_current(self.config, alembic))
# force upgrade to heads
with patch.object(self.request, "method", new="POST"):
with patch.object(
self.request,
"POST",
new={"revspec": "heads"},
):
# nb. this still redirects but w/ different flash
result = view.migrate()
self.assertIsInstance(result, HTTPFound)
self.assertEqual(
result.location, "http://example.com/alembic/dashboard"
)
self.assertTrue(self.request.session.peek_flash())
self.assertFalse(self.request.session.peek_flash("error"))
self.assertEqual(
self.request.session.pop_flash(),
["Database has been migrated."],
)
# alembic should know our db is current again
self.assertTrue(check_alembic_current(self.config, alembic))
# upgrade to invalid spec (force an error)
with patch.object(self.request, "method", new="POST"):
with patch.object(
self.request,
"POST",
new={"revspec": "bad-spec"},
):
# nb. this still redirects but w/ different flash
result = view.migrate()
self.assertIsInstance(result, HTTPFound)
self.assertEqual(
result.location, "http://example.com/alembic/dashboard"
)
self.assertFalse(self.request.session.peek_flash())
self.assertTrue(self.request.session.peek_flash("error"))
[msg] = self.request.session.pop_flash("error")
self.assertTrue(msg.startswith("Migrate failed: "))

View file

@ -9,7 +9,7 @@ from unittest.mock import MagicMock, patch
from sqlalchemy import orm
from pyramid import testing
from pyramid.response import Response
from pyramid.httpexceptions import HTTPNotFound
from pyramid.httpexceptions import HTTPNotFound, HTTPFound
from wuttjamaican.conf import WuttaConfig
from wuttaweb.views import master as mod
@ -977,7 +977,7 @@ class TestMasterView(WebTestCase):
form = view.make_model_form(fields=["name", "description"])
form.validated = {"name": "first"}
obj = view.objectify(form)
self.assertIs(obj, form.validated)
self.assertEqual(obj, form.validated)
# explicit model class (editing)
with patch.multiple(
@ -1115,6 +1115,40 @@ class TestMasterView(WebTestCase):
# setting did not change in DB
self.assertEqual(self.app.get_setting(self.session, "foo.bar"), "fraggle")
# post again to save setting
with patch.multiple(
self.request,
method="POST",
POST={
"name": "foo.bar",
"value": "friggle",
},
):
with patch.object(view, "persist", new=persist):
response = view.create()
self.assertIsInstance(response, HTTPFound)
self.assertFalse(self.request.session.peek_flash("error"))
self.assertEqual(
self.app.get_setting(self.session, "foo.bar"), "friggle"
)
# and this time force an error on save
with patch.multiple(
self.request,
method="POST",
POST={"name": "foo.bar", "value": "froooooggle"},
):
with patch.object(
view, "save_create_form", side_effect=RuntimeError("testing")
):
response = view.create()
self.assertEqual(response.status_code, 200)
# nb. flash error is already gone, b/c template is rendered
self.assertFalse(self.request.session.peek_flash("error"))
self.assertEqual(
self.app.get_setting(self.session, "foo.bar"), "friggle"
)
def test_view(self):
self.pyramid_config.include("wuttaweb.views.common")
self.pyramid_config.include("wuttaweb.views.auth")
@ -1210,6 +1244,7 @@ class TestMasterView(WebTestCase):
with patch.multiple(
mod.MasterView,
create=True,
# nb. not actually using the model_class here
model_name="Setting",
model_key="name",
get_index_url=MagicMock(return_value="/settings/"),
@ -1231,32 +1266,54 @@ class TestMasterView(WebTestCase):
self.session.commit()
# post request to save settings
self.request.method = "POST"
self.request.POST = {
"name": "foo.bar",
"value": "froogle",
}
with patch.object(view, "persist", new=persist):
response = view.edit()
# nb. should get redirect back to view page
self.assertEqual(response.status_code, 302)
# setting should be updated in DB
self.assertEqual(
self.app.get_setting(self.session, "foo.bar"), "froogle"
)
with patch.multiple(
self.request,
method="POST",
POST={"name": "foo.bar", "value": "froogle"},
):
with patch.object(view, "persist", new=persist):
response = view.edit()
self.assertIsInstance(response, HTTPFound)
self.assertEqual(
response.location, "http://example.com/settings/foo.bar"
)
# setting is saved in DB
self.assertEqual(
self.app.get_setting(self.session, "foo.bar"), "froogle"
)
# try another post with invalid data (value is required)
self.request.method = "POST"
self.request.POST = {}
with patch.object(view, "persist", new=persist):
response = view.edit()
# nb. should get a form with errors
self.assertEqual(response.status_code, 200)
self.assertIn("Required", response.text)
# setting did not change in DB
self.assertEqual(
self.app.get_setting(self.session, "foo.bar"), "froogle"
)
with patch.multiple(self.request, method="POST", POST={}):
with patch.object(view, "persist", new=persist):
response = view.edit()
# nb. should get a form with errors
self.assertEqual(response.status_code, 200)
self.assertIn("Required", response.text)
# setting did not change in DB
self.assertEqual(
self.app.get_setting(self.session, "foo.bar"), "froogle"
)
# once more with forced error
with patch.multiple(
self.request,
method="POST",
POST={
"name": "foo.bar",
"value": "froooooggle",
},
):
with patch.object(
view, "save_edit_form", side_effect=RuntimeError("testing")
):
response = view.edit()
self.assertEqual(response.status_code, 200)
# nb. flash error is already gone, b/c template is rendered
self.assertFalse(self.request.session.peek_flash("error"))
# setting did not change in DB
self.assertEqual(
self.app.get_setting(self.session, "foo.bar"), "froogle"
)
def test_delete(self):
self.pyramid_config.include("wuttaweb.views.common")
@ -1266,18 +1323,21 @@ class TestMasterView(WebTestCase):
self.pyramid_config.add_route("settings.edit", "/settings/{name}/edit")
model = self.app.model
self.app.save_setting(self.session, "foo.bar", "frazzle")
self.app.save_setting(self.session, "another", "fun-value")
self.session.commit()
self.assertEqual(self.session.query(model.Setting).count(), 1)
self.assertEqual(self.session.query(model.Setting).count(), 2)
def get_instance():
setting = self.session.get(model.Setting, "foo.bar")
name = self.request.matchdict["name"]
setting = self.session.get(model.Setting, name)
if not setting:
raise view.notfound()
return {
"name": setting.name,
"value": setting.value,
}
# sanity/coverage check using /settings/XXX/delete
self.request.matchdict = {"name": "foo.bar"}
with patch.multiple(
mod.MasterView,
create=True,
@ -1290,32 +1350,51 @@ class TestMasterView(WebTestCase):
with patch.object(view, "get_instance", new=get_instance):
# get the form page
response = view.delete()
self.assertIsInstance(response, Response)
self.assertEqual(response.status_code, 200)
self.assertIn("frazzle", response.text)
with patch.object(self.request, "matchdict", new={"name": "foo.bar"}):
response = view.delete()
self.assertIsInstance(response, Response)
self.assertEqual(response.status_code, 200)
self.assertIn("frazzle", response.text)
def delete_instance(setting):
self.app.delete_setting(self.session, setting["name"])
self.request.method = "POST"
self.request.POST = {}
with patch.object(view, "delete_instance", new=delete_instance):
with patch.multiple(
self.request, matchdict={"name": "foo.bar"}, method="POST", POST={}
):
with patch.object(view, "delete_instance", new=delete_instance):
# enforces "instance not deletable" rules
with patch.object(view, "is_deletable", return_value=False):
# enforces "instance not deletable" rules
with patch.object(view, "is_deletable", return_value=False):
response = view.delete()
# nb. should get redirect back to view page
self.assertEqual(response.status_code, 302)
# setting remains in DB
self.assertEqual(self.session.query(model.Setting).count(), 2)
# post request to delete setting
response = view.delete()
# nb. should get redirect back to view page
self.assertEqual(response.status_code, 302)
# setting remains in DB
self.assertEqual(self.session.query(model.Setting).count(), 1)
# nb. should get redirect back to view page
self.assertEqual(response.status_code, 302)
# setting should be gone from DB
self.assertEqual(self.session.query(model.Setting).count(), 1)
# post request to delete setting
response = view.delete()
# nb. should get redirect back to view page
self.assertEqual(response.status_code, 302)
# setting should be gone from DB
self.assertEqual(self.session.query(model.Setting).count(), 0)
# try to delete 2nd setting, but force an error
with patch.multiple(
self.request, matchdict={"name": "another"}, method="POST", POST={}
):
with patch.object(
view, "save_delete_form", side_effect=RuntimeError("testing")
):
response = view.delete()
self.assertEqual(response.status_code, 200)
# nb. flash error is already gone, b/c template is rendered
self.assertFalse(self.request.session.peek_flash("error"))
# setting is still in DB
self.assertEqual(self.session.query(model.Setting).count(), 1)
self.assertEqual(
self.app.get_setting(self.session, "another"), "fun-value"
)
def test_delete_instance(self):
model = self.app.model

326
tests/views/test_tables.py Normal file
View file

@ -0,0 +1,326 @@
# -*- coding: utf-8; -*-
import os
from unittest.mock import patch
from alembic import command as alembic_command
from wuttjamaican.db.conf import check_alembic_current, make_alembic_config
from wuttaweb.testing import WebTestCase
from wuttaweb.views import tables as mod
class TestAppTableView(WebTestCase):
def make_config(self, **kwargs):
sqlite_path = self.write_file("test.sqlite", "")
self.sqlite_engine_url = f"sqlite:///{sqlite_path}"
config_path = self.write_file(
"test.ini",
f"""
[wutta.db]
default.url = {self.sqlite_engine_url}
[alembic]
script_location = wuttjamaican.db:alembic
version_locations = wuttjamaican.db:alembic/versions
""",
)
return super().make_config([config_path], **kwargs)
def make_view(self):
return mod.AppTableView(self.request)
def test_includeme(self):
self.pyramid_config.include("wuttaweb.views.tables")
def test_get_grid_data(self):
view = self.make_view()
data = view.get_grid_data()
self.assertIsInstance(data, list)
self.assertGreater(len(data), 0)
table = data[0]
self.assertIsInstance(table, dict)
self.assertIn("name", table)
self.assertIn("schema", table)
def test_configure_grid(self):
view = self.make_view()
# sanity / coverage check
grid = view.make_grid(columns=["name", "schema"])
view.configure_grid(grid)
def test_get_instance(self):
view = self.make_view()
with patch.object(self.request, "matchdict", new={"name": "person"}):
table1 = view.get_instance()
self.assertIsInstance(table1, dict)
self.assertIn("name", table1)
self.assertEqual(table1["name"], "person")
self.assertIn("schema", table1)
table2 = view.get_instance()
self.assertIs(table2, table1)
self.assertEqual(table2["name"], "person")
def test_get_instance_title(self):
view = self.make_view()
table = {"name": "poser_foo"}
self.assertEqual(view.get_instance_title(table), "poser_foo")
def test_get_row_grid_data(self):
model = self.app.model
view = self.make_view()
table = model.Base.metadata.tables["person"]
table_dict = {"name": "person", "table": table}
data = view.get_row_grid_data(table_dict)
self.assertIsInstance(data, list)
self.assertGreater(len(data), 4)
columns = [c["column_name"] for c in data]
self.assertIn("full_name", columns)
self.assertIn("first_name", columns)
self.assertIn("last_name", columns)
def test_configure_row_grid(self):
view = self.make_view()
# sanity / coverage check
grid = view.make_grid(columns=["column_name", "data_type"])
view.configure_row_grid(grid)
def test_render_column_description(self):
view = self.make_view()
# nb. first 2 params are igored
text = view.render_column_description(None, None, "hello world")
self.assertEqual(text, "hello world")
text = view.render_column_description(None, None, "")
self.assertEqual(text, "")
text = view.render_column_description(None, None, None)
self.assertEqual(text, "")
msg = (
"This is a very long and rambling sentence. "
"There is no point to it except that it is long. "
"Far too long to be reasonable."
"I mean I am serious when I say this is simply too long."
)
text = view.render_column_description(None, None, msg)
self.assertNotEqual(text, msg)
self.assertIn("<span title=", text)
def test_get_template_context(self):
view = self.make_view()
# normal view gets no extra context
context = view.get_template_context({})
self.assertIsInstance(context, dict)
self.assertNotIn("alembic_is_current", context)
self.assertNotIn("existing_tables", context)
self.assertNotIn("model_dir", context)
self.assertNotIn("migration_branch_options", context)
self.assertNotIn("migration_branch", context)
# but 'create' view gets extra context
with patch.object(view, "creating", new=True):
context = view.get_template_context({})
self.assertIsInstance(context, dict)
self.assertIn("alembic_is_current", context)
self.assertIn("existing_tables", context)
self.assertIn("model_dir", context)
self.assertIn("migration_branch_options", context)
self.assertIn("migration_branch", context)
def test_write_model_file(self):
view = self.make_view()
module_path = self.write_file("widget.py", "")
self.assertEqual(os.path.getsize(module_path), 0)
sample = {
"action": "write_model_file",
"module_file": module_path,
"overwrite": False,
"table_name": "poser_widget",
"model_name": "PoserWidget",
"model_title": "Poser Widget",
"model_title_plural": "Poser Widgets",
"description": "A widget for Poser",
"versioned": True,
"columns": [
{
"name": "uuid",
"data_type": {
"type": "UUID",
},
"formatted_data_type": "sa.UUID()",
"nullable": False,
"description": "primary key",
"versioned": "n/a",
"relationship": None,
},
{
"name": "name",
"data_type": {
"type": "String",
},
"formatted_data_type": "sa.String(length=100)",
"nullable": False,
"description": "name of widget",
"versioned": True,
"relationship": None,
},
{
"name": "owner_uuid",
"data_type": {
"type": "_fk_uuid_",
"reference": "user",
},
"formatted_data_type": "sa.UUID()",
"nullable": False,
"description": "owner of widget",
"versioned": True,
"relationship": {
"name": "owner",
"reference_model": "User",
},
},
],
}
with patch.object(self.request, "json_body", new=sample, create=True):
# does not overwrite by default
result = view.wizard_action()
self.assertIn("error", result)
self.assertEqual(result["error"], "File already exists")
# but it can overwrite if requested
with patch.dict(sample, {"overwrite": True}):
result = view.wizard_action()
self.assertNotIn("error", result)
self.assertGreater(os.path.getsize(module_path), 1000)
def test_check_model(self):
view = self.make_view()
sample = {
"action": "check_model",
"model_name": "Person",
}
with patch.object(self.request, "json_body", new=sample, create=True):
# empty result means the model exists
result = view.wizard_action()
self.assertEqual(result, {})
# problem is specified if not
with patch.dict(sample, {"model_name": "gobbledygook"}):
result = view.wizard_action()
self.assertIn("problem", result)
self.assertEqual(result["problem"], "class not found in app model")
def test_write_revision_script(self):
view = self.make_view()
sample = {
"action": "write_revision_script",
"branch": "wutta",
"message": "just a test",
}
# tell alembic our db is already current
alembic = make_alembic_config(self.config)
alembic_command.stamp(alembic, "heads")
self.assertTrue(check_alembic_current(self.config, alembic))
with patch.object(self.request, "json_body", new=sample, create=True):
# nb. this writes a real script in the wuttjamaican project
result = view.wizard_action()
self.assertIn("script", result)
outdir = os.path.dirname(result["script"])
self.assertEqual(
outdir, self.app.resource_path("wuttjamaican.db:alembic/versions")
)
# alembic now thinks we need to upgrade
self.assertFalse(check_alembic_current(self.config, alembic))
# must be sure to delete that script
os.remove(result["script"])
def test_migrate_db(self):
view = self.make_view()
sample = {"action": "migrate_db"}
# tell alembic our db is already current
alembic = make_alembic_config(self.config)
alembic_command.stamp(alembic, "heads")
self.assertTrue(check_alembic_current(self.config, alembic))
# force downgrade to wutta@-1
alembic_command.downgrade(alembic, "wutta@-1")
# alembic now thinks we need to upgrade
self.assertFalse(check_alembic_current(self.config, alembic))
with patch.object(self.request, "json_body", new=sample, create=True):
# now test our view method; alembic should then know we are current
view.wizard_action()
self.assertTrue(check_alembic_current(self.config, alembic))
def test_check_table(self):
self.pyramid_config.add_route("app_tables.view", "/tables/app/{name}")
view = self.make_view()
sample = {"action": "check_table", "name": "person"}
with patch.object(view, "Session", return_value=self.session):
with patch.object(self.request, "json_body", new=sample, create=True):
# result with URL means the table exists
result = view.wizard_action()
self.assertIn("url", result)
self.assertNotIn("problem", result)
# problem is specified if not
with patch.dict(sample, {"name": "gobbledygook"}):
result = view.wizard_action()
self.assertIn("problem", result)
self.assertEqual(result["problem"], "table does not exist in app model")
def test_wizard_action(self):
view = self.make_view()
# missing action
with patch.object(self.request, "json_body", create=True, new={}):
result = view.wizard_action()
self.assertIn("error", result)
self.assertEqual(result["error"], "Must specify the action to perform.")
# unknown action
with patch.object(
self.request, "json_body", create=True, new={"action": "nothing"}
):
result = view.wizard_action()
self.assertIn("error", result)
self.assertEqual(result["error"], "Unknown action requested: nothing")
# error invoking action
with patch.object(
self.request, "json_body", create=True, new={"action": "check_table"}
):
with patch.object(view, "check_table", side_effect=RuntimeError("whoa")):
result = view.wizard_action()
self.assertIn("error", result)
self.assertEqual(result["error"], "Unexpected error occurred: whoa")