diff --git a/docs/conf.py b/docs/conf.py
index 3d568ef..0f73d82 100644
--- a/docs/conf.py
+++ b/docs/conf.py
@@ -31,6 +31,7 @@ intersphinx_mapping = {
'deform': ('https://docs.pylonsproject.org/projects/deform/en/latest/', None),
'pyramid': ('https://docs.pylonsproject.org/projects/pyramid/en/latest/', None),
'python': ('https://docs.python.org/3/', None),
+ 'rattail-manual': ('https://rattailproject.org/docs/rattail-manual/', None),
'webhelpers2': ('https://webhelpers2.readthedocs.io/en/latest/', None),
'wuttjamaican': ('https://rattailproject.org/docs/wuttjamaican/', None),
}
diff --git a/pyproject.toml b/pyproject.toml
index 41fda6b..a671cb5 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -31,6 +31,7 @@ classifiers = [
requires-python = ">= 3.8"
dependencies = [
"ColanderAlchemy",
+ "humanize",
"paginate",
"paginate_sqlalchemy",
"pyramid>=2",
diff --git a/src/wuttaweb/forms/schema.py b/src/wuttaweb/forms/schema.py
index 8538eef..74839a7 100644
--- a/src/wuttaweb/forms/schema.py
+++ b/src/wuttaweb/forms/schema.py
@@ -456,3 +456,35 @@ class Permissions(WuttaSet):
kwargs['values'] = values
return widgets.PermissionsWidget(self.request, **kwargs)
+
+
+class FileDownload(colander.String):
+ """
+ Custom schema type for a file download field.
+
+ This field is only meant for readonly use, it does not handle file
+ uploads.
+
+ It expects the incoming ``appstruct`` to be the path to a file on
+ disk (or null).
+
+ Uses the :class:`~wuttaweb.forms.widgets.FileDownloadWidget` by
+ default.
+
+ :param request: Current :term:`request` object.
+
+ :param url: Optional URL for hyperlink. If not specified, file
+ name/size is shown with no hyperlink.
+ """
+
+ def __init__(self, request, *args, **kwargs):
+ self.url = kwargs.pop('url', None)
+ super().__init__(*args, **kwargs)
+ self.request = request
+ self.config = self.request.wutta_config
+ self.app = self.config.get_app()
+
+ def widget_maker(self, **kwargs):
+ """ """
+ kwargs.setdefault('url', self.url)
+ return widgets.FileDownloadWidget(self.request, **kwargs)
diff --git a/src/wuttaweb/forms/widgets.py b/src/wuttaweb/forms/widgets.py
index 837b6f1..4db861a 100644
--- a/src/wuttaweb/forms/widgets.py
+++ b/src/wuttaweb/forms/widgets.py
@@ -39,7 +39,10 @@ in the namespace:
* :class:`deform:deform.widget.MoneyInputWidget`
"""
+import os
+
import colander
+import humanize
from deform.widget import (Widget, TextInputWidget, TextAreaWidget,
PasswordWidget, CheckedPasswordWidget,
CheckboxWidget, SelectWidget, CheckboxChoiceWidget,
@@ -147,6 +150,63 @@ class WuttaCheckboxChoiceWidget(CheckboxChoiceWidget):
self.session = session or Session()
+class FileDownloadWidget(Widget):
+ """
+ Widget for use with :class:`~wuttaweb.forms.schema.FileDownload`
+ fields.
+
+ This only supports readonly, and shows a hyperlink to download the
+ file. Link text is the filename plus file size.
+
+ This is a subclass of :class:`deform:deform.widget.Widget` and
+ uses these Deform templates:
+
+ * ``readonly/filedownload``
+
+ :param request: Current :term:`request` object.
+
+ :param url: Optional URL for hyperlink. If not specified, file
+ name/size is shown with no hyperlink.
+ """
+ readonly_template = 'readonly/filedownload'
+
+ def __init__(self, request, *args, **kwargs):
+ self.url = kwargs.pop('url', None)
+ super().__init__(*args, **kwargs)
+ self.request = request
+ self.config = self.request.wutta_config
+ self.app = self.config.get_app()
+
+ def serialize(self, field, cstruct, **kw):
+ """ """
+ # nb. readonly is the only way this rolls
+ kw['readonly'] = True
+ template = self.readonly_template
+
+ path = cstruct or None
+ if path:
+ kw.setdefault('filename', os.path.basename(path))
+ kw.setdefault('filesize', self.readable_size(path))
+ if self.url:
+ kw.setdefault('url', self.url)
+
+ else:
+ kw.setdefault('filename', None)
+ kw.setdefault('filesize', None)
+
+ kw.setdefault('url', None)
+ values = self.get_template_values(field, cstruct, kw)
+ return field.renderer(template, **values)
+
+ def readable_size(self, path):
+ """ """
+ try:
+ size = os.path.getsize(path)
+ except os.error:
+ size = 0
+ return humanize.naturalsize(size)
+
+
class RoleRefsWidget(WuttaCheckboxChoiceWidget):
"""
Widget for use with User
@@ -184,7 +244,7 @@ class RoleRefsWidget(WuttaCheckboxChoiceWidget):
roles = []
if cstruct:
for uuid in cstruct:
- role = self.session.query(model.Role).get(uuid)
+ role = self.session.get(model.Role, uuid)
if role:
roles.append(role)
kw['roles'] = roles
@@ -228,6 +288,10 @@ class UserRefsWidget(WuttaCheckboxChoiceWidget):
users.append(dict([(key, getattr(user, key))
for key in columns + ['uuid']]))
+ # do not render if no data
+ if not users:
+ return HTML.tag('span')
+
# grid
grid = Grid(self.request, key='roles.view.users',
columns=columns, data=users)
diff --git a/src/wuttaweb/templates/deform/readonly/filedownload.pt b/src/wuttaweb/templates/deform/readonly/filedownload.pt
new file mode 100644
index 0000000..31a789b
--- /dev/null
+++ b/src/wuttaweb/templates/deform/readonly/filedownload.pt
@@ -0,0 +1,14 @@
+
+
+ ${filename}
+
+ (${filesize})
+
+
+
+ ${filename}
+
+ (${filesize})
+
+
+
diff --git a/src/wuttaweb/templates/upgrades/configure.mako b/src/wuttaweb/templates/upgrades/configure.mako
new file mode 100644
index 0000000..2e4eae1
--- /dev/null
+++ b/src/wuttaweb/templates/upgrades/configure.mako
@@ -0,0 +1,20 @@
+## -*- coding: utf-8; -*-
+<%inherit file="/master/configure.mako" />
+
+<%def name="form_content()">
+
+
Basics
+
+
+
+
+
+
+
+%def>
diff --git a/src/wuttaweb/templates/upgrades/view.mako b/src/wuttaweb/templates/upgrades/view.mako
new file mode 100644
index 0000000..4794641
--- /dev/null
+++ b/src/wuttaweb/templates/upgrades/view.mako
@@ -0,0 +1,37 @@
+## -*- coding: utf-8; -*-
+<%inherit file="/master/view.mako" />
+
+<%def name="page_content()">
+ ${parent.page_content()}
+ % if instance.status == app.enum.UpgradeStatus.PENDING and master.has_perm('execute'):
+
+
+ ${h.form(master.get_action_url('execute', instance), **{'@submit': 'executeFormSubmit'})}
+ ${h.csrf_token(request)}
+
+ {{ executeFormSubmitting ? "Working, please wait..." : "Execute this upgrade" }}
+
+ ${h.end_form()}
+
+ % endif
+%def>
+
+<%def name="modify_vue_vars()">
+ ${parent.modify_vue_vars()}
+ % if instance.status == app.enum.UpgradeStatus.PENDING and master.has_perm('execute'):
+
+ % endif
+%def>
diff --git a/src/wuttaweb/views/base.py b/src/wuttaweb/views/base.py
index bc1e76c..5121f3c 100644
--- a/src/wuttaweb/views/base.py
+++ b/src/wuttaweb/views/base.py
@@ -24,8 +24,11 @@
Base Logic for Views
"""
+import os
+
from pyramid import httpexceptions
from pyramid.renderers import render_to_response
+from pyramid.response import FileResponse
from wuttaweb import forms, grids
@@ -119,9 +122,46 @@ class View:
"""
return httpexceptions.HTTPFound(location=url, **kwargs)
+ def file_response(self, path, attachment=True, filename=None):
+ """
+ Returns a generic file response for the given path.
+
+ :param path: Path to a file on local disk; must be accessible
+ by the web app.
+
+ :param attachment: Whether the file should come down as an
+ "attachment" instead of main payload.
+
+ The attachment behavior is the default here, and will cause
+ the user to be prompted for where to save the file.
+
+ Set ``attachment=False`` in order to cause the browser to
+ render the file as if it were the page being navigated to.
+
+ :param filename: Optional filename to use for attachment
+ behavior. This will be the "suggested filename" when user
+ is prompted to save the download. If not specified, the
+ filename is derived from ``path``.
+
+ :returns: A :class:`~pyramid:pyramid.response.FileResponse`
+ object with file content.
+ """
+ if not os.path.exists(path):
+ return self.notfound()
+
+ response = FileResponse(path, request=self.request)
+ response.content_length = os.path.getsize(path)
+
+ if attachment:
+ if not filename:
+ filename = os.path.basename(path)
+ response.content_disposition = f'attachment; filename="{filename}"'
+
+ return response
+
def json_response(self, context):
"""
- Convenience method to return a JSON response.
+ Returns a JSON response with the given context data.
:param context: Context data to be rendered as JSON.
diff --git a/src/wuttaweb/views/common.py b/src/wuttaweb/views/common.py
index 8c6dc25..279c61a 100644
--- a/src/wuttaweb/views/common.py
+++ b/src/wuttaweb/views/common.py
@@ -160,6 +160,9 @@ class CommonView(View):
'upgrades.view',
'upgrades.edit',
'upgrades.delete',
+ 'upgrades.execute',
+ 'upgrades.download',
+ 'upgrades.configure',
'users.list',
'users.create',
'users.view',
diff --git a/src/wuttaweb/views/master.py b/src/wuttaweb/views/master.py
index 7477ec2..fe333b1 100644
--- a/src/wuttaweb/views/master.py
+++ b/src/wuttaweb/views/master.py
@@ -25,6 +25,7 @@ Base Logic for Master Views
"""
import logging
+import os
import threading
import sqlalchemy as sa
@@ -322,6 +323,18 @@ class MasterView(View):
"autocomplete" - i.e. it should have an :meth:`autocomplete()`
view. Default is ``False``.
+ .. attribute:: downloadable
+
+ Boolean indicating whether the view model supports
+ "downloading" - i.e. it should have a :meth:`download()` view.
+ Default is ``False``.
+
+ .. attribute:: executable
+
+ Boolean indicating whether the view model supports "executing"
+ - i.e. it should have an :meth:`execute()` view. Default is
+ ``False``.
+
.. attribute:: configurable
Boolean indicating whether the master view supports
@@ -350,6 +363,8 @@ class MasterView(View):
deletable_bulk = False
deletable_bulk_quick = False
has_autocomplete = False
+ downloadable = False
+ executable = False
configurable = False
# current action
@@ -842,6 +857,126 @@ class MasterView(View):
'label': str(obj),
}
+ ##############################
+ # download methods
+ ##############################
+
+ def download(self):
+ """
+ View to download a file associated with a model record.
+
+ This usually corresponds to a URL like
+ ``/widgets/XXX/download`` where ``XXX`` represents the key/ID
+ for the record.
+
+ By default, this view is included only if :attr:`downloadable`
+ is true.
+
+ This method will (try to) locate the file on disk, and return
+ it as a file download response to the client.
+
+ The GET request for this view may contain a ``filename`` query
+ string parameter, which can be used to locate one of various
+ files associated with the model record. This filename is
+ passed to :meth:`download_path()` for locating the file.
+
+ For instance: ``/widgets/XXX/download?filename=widget-specs.txt``
+
+ Subclass normally should not override this method, but rather
+ one of the related methods which are called (in)directly by
+ this one:
+
+ * :meth:`download_path()`
+ """
+ obj = self.get_instance()
+ filename = self.request.GET.get('filename', None)
+
+ path = self.download_path(obj, filename)
+ if not path or not os.path.exists(path):
+ return self.notfound()
+
+ return self.file_response(path)
+
+ def download_path(self, obj, filename):
+ """
+ Should return absolute path on disk, for the given object and
+ filename. Result will be used to return a file response to
+ client. This is called by :meth:`download()`.
+
+ Default logic always returns ``None``; subclass must override.
+
+ :param obj: Refefence to the model instance.
+
+ :param filename: Name of file for which to retrieve the path.
+
+ :returns: Path to file, or ``None`` if not found.
+
+ Note that ``filename`` may be ``None`` in which case the "default"
+ file path should be returned, if applicable.
+
+ If this method returns ``None`` (as it does by default) then
+ the :meth:`download()` view will return a 404 not found
+ response.
+ """
+
+ ##############################
+ # execute methods
+ ##############################
+
+ def execute(self):
+ """
+ View to "execute" a model record. Requires a POST request.
+
+ This usually corresponds to a URL like
+ ``/widgets/XXX/execute`` where ``XXX`` represents the key/ID
+ for the record.
+
+ By default, this view is included only if :attr:`executable` is
+ true.
+
+ Probably this is a "rare" view to implement for a model. But
+ there are two notable use cases so far, namely:
+
+ * upgrades (cf. :class:`~wuttaweb.views.upgrades.UpgradeView`)
+ * batches (not yet implemented;
+ cf. :doc:`rattail-manual:data/batch/index` in Rattail
+ Manual)
+
+ The general idea is to take some "irrevocable" action
+ associated with the model record. In the case of upgrades, it
+ is to run the upgrade script. For batches it is to "push
+ live" the data held within the batch.
+
+ Subclass normally should not override this method, but rather
+ one of the related methods which are called (in)directly by
+ this one:
+
+ * :meth:`execute_instance()`
+ """
+ model_title = self.get_model_title()
+ obj = self.get_instance()
+
+ try:
+ self.execute_instance(obj)
+ except Exception as error:
+ log.exception("failed to execute %s: %s", model_title, obj)
+ error = str(error) or error.__class__.__name__
+ self.request.session.flash(error, 'error')
+ else:
+ self.request.session.flash(f"{model_title} was executed.")
+
+ return self.redirect(self.get_action_url('view', obj))
+
+ def execute_instance(self, obj):
+ """
+ Perform the actual "execution" logic for a model record.
+ Called by :meth:`execute()`.
+
+ This method does nothing by default; subclass must override.
+
+ :param obj: Reference to the model instance.
+ """
+
##############################
# configure methods
##############################
@@ -2370,6 +2505,29 @@ class MasterView(View):
renderer='json',
permission=f'{route_prefix}.list')
+ # download
+ if cls.downloadable:
+ config.add_route(f'{route_prefix}.download',
+ f'{instance_url_prefix}/download')
+ config.add_view(cls, attr='download',
+ route_name=f'{route_prefix}.download',
+ permission=f'{permission_prefix}.download')
+ config.add_wutta_permission(permission_prefix,
+ f'{permission_prefix}.download',
+ f"Download file(s) for {model_title}")
+
+ # execute
+ if cls.executable:
+ config.add_route(f'{route_prefix}.execute',
+ f'{instance_url_prefix}/execute',
+ request_method='POST')
+ config.add_view(cls, attr='execute',
+ route_name=f'{route_prefix}.execute',
+ permission=f'{permission_prefix}.execute')
+ config.add_wutta_permission(permission_prefix,
+ f'{permission_prefix}.execute',
+ f"Execute {model_title}")
+
# configure
if cls.configurable:
config.add_route(f'{route_prefix}.configure',
diff --git a/src/wuttaweb/views/upgrades.py b/src/wuttaweb/views/upgrades.py
index 8d7c48c..ee4a2bd 100644
--- a/src/wuttaweb/views/upgrades.py
+++ b/src/wuttaweb/views/upgrades.py
@@ -24,12 +24,21 @@
Upgrade Views
"""
+import datetime
+import logging
+import os
+import shutil
+import subprocess
+
from sqlalchemy import orm
from wuttjamaican.db.model import Upgrade
from wuttaweb.views import MasterView
from wuttaweb.forms import widgets
-from wuttaweb.forms.schema import UserRef, WuttaEnum
+from wuttaweb.forms.schema import UserRef, WuttaEnum, FileDownload
+
+
+log = logging.getLogger(__name__)
class UpgradeView(MasterView):
@@ -47,6 +56,9 @@ class UpgradeView(MasterView):
* ``/upgrades/XXX/delete``
"""
model_class = Upgrade
+ executable = True
+ downloadable = True
+ configurable = True
grid_columns = [
'created',
@@ -81,6 +93,9 @@ class UpgradeView(MasterView):
# status
g.set_renderer('status', self.grid_render_enum, enum=enum.UpgradeStatus)
+ # executed
+ g.set_renderer('executed', self.grid_render_datetime)
+
# executed_by
g.set_link('executed_by')
Executor = orm.aliased(model.User)
@@ -138,10 +153,6 @@ class UpgradeView(MasterView):
else:
f.set_node('status', WuttaEnum(self.request, enum.UpgradeStatus))
- # exit_code
- if self.creating or not upgrade.executed:
- f.remove('exit_code')
-
# executed
if self.creating or self.editing or not upgrade.executed:
f.remove('executed')
@@ -152,6 +163,39 @@ class UpgradeView(MasterView):
else:
f.set_node('executed_by', UserRef(self.request))
+ # exit_code
+ if self.creating or self.editing or not upgrade.executed:
+ f.remove('exit_code')
+
+ # stdout / stderr
+ if not (self.creating or self.editing) and upgrade.status in (
+ enum.UpgradeStatus.SUCCESS, enum.UpgradeStatus.FAILURE):
+
+ # stdout_file
+ f.append('stdout_file')
+ f.set_label('stdout_file', "STDOUT")
+ url = self.get_action_url('download', upgrade, _query={'filename': 'stdout.log'})
+ f.set_node('stdout_file', FileDownload(self.request, url=url))
+ f.set_default('stdout_file', self.get_upgrade_filepath(upgrade, 'stdout.log'))
+
+ # stderr_file
+ f.append('stderr_file')
+ f.set_label('stderr_file', "STDERR")
+ url = self.get_action_url('download', upgrade, _query={'filename': 'stderr.log'})
+ f.set_node('stderr_file', FileDownload(self.request, url=url))
+ f.set_default('stderr_file', self.get_upgrade_filepath(upgrade, 'stderr.log'))
+
+ def delete_instance(self, upgrade):
+ """
+ We override this method to delete any files associated with
+ the upgrade, in addition to deleting the upgrade proper.
+ """
+ path = self.get_upgrade_filepath(upgrade, create=False)
+ if os.path.exists(path):
+ shutil.rmtree(path)
+
+ super().delete_instance(upgrade)
+
def objectify(self, form):
""" """
upgrade = super().objectify(form)
@@ -164,6 +208,71 @@ class UpgradeView(MasterView):
return upgrade
+ def download_path(self, upgrade, filename):
+ """ """
+ if filename:
+ return self.get_upgrade_filepath(upgrade, filename)
+
+ def get_upgrade_filepath(self, upgrade, filename=None, create=True):
+ """ """
+ uuid = upgrade.uuid
+ path = self.app.get_appdir('data', 'upgrades', uuid[:2], uuid[2:],
+ create=create)
+ if filename:
+ path = os.path.join(path, filename)
+ return path
+
+ def execute_instance(self, upgrade):
+ """
+ This method runs the actual upgrade.
+
+ Default logic will get the script command from config, and run
+ it via shell in a subprocess.
+
+ The ``stdout`` and ``stderr`` streams are captured to separate
+ log files which are then available to download.
+
+ The upgrade itself is marked as "executed" with status of
+ either ``SUCCESS`` or ``FAILURE``.
+ """
+ enum = self.app.enum
+ script = self.config.require(f'{self.app.appname}.upgrades.command',
+ session=self.Session())
+ stdout_path = self.get_upgrade_filepath(upgrade, 'stdout.log')
+ stderr_path = self.get_upgrade_filepath(upgrade, 'stderr.log')
+
+ # run the command
+ log.debug("running upgrade command: %s", script)
+ with open(stdout_path, 'wb') as stdout:
+ with open(stderr_path, 'wb') as stderr:
+ upgrade.exit_code = subprocess.call(script, shell=True,
+ stdout=stdout, stderr=stderr)
+ logger = log.warning if upgrade.exit_code != 0 else log.debug
+ logger("upgrade command had non-zero exit code: %s", upgrade.exit_code)
+
+ # declare it complete
+ upgrade.executed = datetime.datetime.now()
+ upgrade.executed_by = self.request.user
+ if upgrade.exit_code == 0:
+ upgrade.status = enum.UpgradeStatus.SUCCESS
+ else:
+ upgrade.status = enum.UpgradeStatus.FAILURE
+
+ def configure_get_simple_settings(self):
+ """ """
+
+ script = self.config.get(f'{self.app.appname}.upgrades.command')
+ if not script:
+ pass
+
+ return [
+
+ # basics
+ {'name': f'{self.app.appname}.upgrades.command',
+ 'default': script},
+
+ ]
+
@classmethod
def defaults(cls, config):
""" """
diff --git a/tests/forms/test_schema.py b/tests/forms/test_schema.py
index 9397e03..1c7680a 100644
--- a/tests/forms/test_schema.py
+++ b/tests/forms/test_schema.py
@@ -316,3 +316,18 @@ class TestPermissions(DataTestCase):
widget = typ.widget_maker()
self.assertEqual(len(widget.values), 1)
self.assertEqual(widget.values[0], ('widgets.polish', "Polish the widgets"))
+
+
+class TestFileDownload(DataTestCase):
+
+ def setUp(self):
+ self.setup_db()
+ self.request = testing.DummyRequest(wutta_config=self.config)
+
+ def test_widget_maker(self):
+
+ # sanity / coverage check
+ typ = mod.FileDownload(self.request, url='/foo')
+ widget = typ.widget_maker()
+ self.assertIsInstance(widget, widgets.FileDownloadWidget)
+ self.assertEqual(widget.url, '/foo')
diff --git a/tests/forms/test_widgets.py b/tests/forms/test_widgets.py
index 62d9f0b..cfa4530 100644
--- a/tests/forms/test_widgets.py
+++ b/tests/forms/test_widgets.py
@@ -7,7 +7,7 @@ import deform
from pyramid import testing
from wuttaweb.forms import widgets as mod
-from wuttaweb.forms.schema import PersonRef, RoleRefs, UserRefs, Permissions
+from wuttaweb.forms.schema import FileDownload, PersonRef, RoleRefs, UserRefs, Permissions
from tests.util import WebTestCase
@@ -52,6 +52,55 @@ class TestObjectRefWidget(WebTestCase):
self.assertIn('href="/foo"', html)
+class TestFileDownloadWidget(WebTestCase):
+
+ def make_field(self, node, **kwargs):
+ # TODO: not sure why default renderer is in use even though
+ # pyramid_deform was included in setup? but this works..
+ kwargs.setdefault('renderer', deform.Form.default_renderer)
+ return deform.Field(node, **kwargs)
+
+ def test_serialize(self):
+
+ # nb. we let the field construct the widget via our type
+ # (nb. at first we do not provide a url)
+ node = colander.SchemaNode(FileDownload(self.request))
+ field = self.make_field(node)
+ widget = field.widget
+
+ # null value
+ html = widget.serialize(field, None, readonly=True)
+ self.assertNotIn('', html)
+
+ # path to nonexistent file
+ html = widget.serialize(field, '/this/path/does/not/exist', readonly=True)
+ self.assertNotIn('', html)
+
+ # path to actual file
+ datfile = self.write_file('data.txt', "hello\n" * 1000)
+ html = widget.serialize(field, datfile, readonly=True)
+ self.assertNotIn('', html)
+ self.assertIn('data.txt', html)
+ self.assertIn('kB)', html)
+
+ # path to file, w/ url
+ node = colander.SchemaNode(FileDownload(self.request, url='/download/blarg'))
+ field = self.make_field(node)
+ widget = field.widget
+ html = widget.serialize(field, datfile, readonly=True)
+ self.assertNotIn('', html)
+ self.assertIn('', html)
+ self.assertIn('data.txt', html)
+ self.assertIn('kB)', html)
+
+ # nb. same readonly output even if we ask for editable
+ html2 = widget.serialize(field, datfile, readonly=False)
+ self.assertEqual(html2, html)
+
+
class TestRoleRefsWidget(WebTestCase):
def make_field(self, node, **kwargs):
@@ -113,7 +162,7 @@ class TestUserRefsWidget(WebTestCase):
# empty
html = widget.serialize(field, set(), readonly=True)
- self.assertIn('')
# with data, no actions
user = model.User(username='barney')
diff --git a/tests/util.py b/tests/util.py
index ab31dd4..51a5768 100644
--- a/tests/util.py
+++ b/tests/util.py
@@ -6,11 +6,12 @@ from unittest.mock import MagicMock
from pyramid import testing
from wuttjamaican.conf import WuttaConfig
+from wuttjamaican.testing import FileConfigTestCase
from wuttaweb import subscribers
from wuttaweb.menus import MenuHandler
-class DataTestCase(TestCase):
+class DataTestCase(FileConfigTestCase):
"""
Base class for test suites requiring a full (typical) database.
"""
@@ -19,6 +20,7 @@ class DataTestCase(TestCase):
self.setup_db()
def setup_db(self):
+ self.setup_files()
self.config = WuttaConfig(defaults={
'wutta.db.default.url': 'sqlite://',
})
@@ -33,7 +35,7 @@ class DataTestCase(TestCase):
self.teardown_db()
def teardown_db(self):
- pass
+ self.teardown_files()
class WebTestCase(DataTestCase):
diff --git a/tests/views/test_base.py b/tests/views/test_base.py
index f86fc8f..9601212 100644
--- a/tests/views/test_base.py
+++ b/tests/views/test_base.py
@@ -50,6 +50,26 @@ class TestView(WebTestCase):
self.assertIsInstance(error, HTTPFound)
self.assertEqual(error.location, '/')
+ def test_file_response(self):
+ view = self.make_view()
+
+ # default uses attachment behavior
+ datfile = self.write_file('dat.txt', 'hello')
+ response = view.file_response(datfile)
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(response.content_disposition, 'attachment; filename="dat.txt"')
+
+ # but can disable attachment behavior
+ datfile = self.write_file('dat.txt', 'hello')
+ response = view.file_response(datfile, attachment=False)
+ self.assertEqual(response.status_code, 200)
+ self.assertIsNone(response.content_disposition)
+
+ # path not found
+ crapfile = '/does/not/exist'
+ response = view.file_response(crapfile)
+ self.assertEqual(response.status_code, 404)
+
def test_json_response(self):
view = self.make_view()
response = view.json_response({'foo': 'bar'})
diff --git a/tests/views/test_master.py b/tests/views/test_master.py
index 6fdb55d..f979479 100644
--- a/tests/views/test_master.py
+++ b/tests/views/test_master.py
@@ -30,6 +30,8 @@ class TestMasterView(WebTestCase):
model_key='uuid',
deletable_bulk=True,
has_autocomplete=True,
+ downloadable=True,
+ executable=True,
configurable=True):
mod.MasterView.defaults(self.pyramid_config)
@@ -1310,6 +1312,59 @@ class TestMasterView(WebTestCase):
self.assertEqual(normal, {'value': 'bogus',
'label': "Betty Boop"})
+ def test_download(self):
+ model = self.app.model
+ self.app.save_setting(self.session, 'foo', 'bar')
+ self.session.commit()
+
+ with patch.multiple(mod.MasterView, create=True,
+ model_class=model.Setting,
+ model_key='name',
+ Session=MagicMock(return_value=self.session)):
+ view = self.make_view()
+ self.request.matchdict = {'name': 'foo'}
+
+ # 404 if no filename
+ response = view.download()
+ self.assertEqual(response.status_code, 404)
+
+ # 404 if bad filename
+ self.request.GET = {'filename': 'doesnotexist'}
+ response = view.download()
+ self.assertEqual(response.status_code, 404)
+
+ # 200 if good filename
+ foofile = self.write_file('foo.txt', 'foo')
+ with patch.object(view, 'download_path', return_value=foofile):
+ response = view.download()
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(response.content_disposition, 'attachment; filename="foo.txt"')
+
+ def test_execute(self):
+ self.pyramid_config.add_route('settings.view', '/settings/{name}')
+ model = self.app.model
+ self.app.save_setting(self.session, 'foo', 'bar')
+ self.session.commit()
+
+ with patch.multiple(mod.MasterView, create=True,
+ model_class=model.Setting,
+ model_key='name',
+ Session=MagicMock(return_value=self.session)):
+ view = self.make_view()
+ self.request.matchdict = {'name': 'foo'}
+
+ # basic usage, redirects to view obj url
+ response = view.execute()
+ self.assertEqual(response.status_code, 302)
+ self.assertEqual(self.request.session.pop_flash(), ["Setting was executed."])
+
+ # execution error
+ with patch.object(view, 'execute_instance', side_effect=RuntimeError):
+ response = view.execute()
+ self.assertEqual(response.status_code, 302)
+ self.assertEqual(self.request.session.pop_flash(), [])
+ self.assertEqual(self.request.session.pop_flash('error'), ["RuntimeError"])
+
def test_configure(self):
self.pyramid_config.include('wuttaweb.views.common')
self.pyramid_config.include('wuttaweb.views.auth')
diff --git a/tests/views/test_upgrades.py b/tests/views/test_upgrades.py
index 4641029..5d6db33 100644
--- a/tests/views/test_upgrades.py
+++ b/tests/views/test_upgrades.py
@@ -1,9 +1,12 @@
# -*- coding: utf-8; -*-
import datetime
+import os
+import sys
from unittest.mock import patch, MagicMock
from wuttaweb.views import upgrades as mod
+from wuttjamaican.exc import ConfigurationError
from tests.util import WebTestCase
@@ -42,6 +45,7 @@ class TestUpgradeView(WebTestCase):
self.assertEqual(view.grid_row_class(upgrade, data, 1), 'has-background-warning')
def test_configure_form(self):
+ self.pyramid_config.add_route('upgrades.download', '/upgrades/{uuid}/download')
model = self.app.model
enum = self.app.enum
user = model.User(username='barney')
@@ -66,7 +70,7 @@ class TestUpgradeView(WebTestCase):
view.configure_form(form)
self.assertNotIn('created', form)
- # test executed field when viewing
+ # test executed, stdout/stderr when viewing
with patch.object(view, 'viewing', new=True):
# executed is *not* shown by default
@@ -74,13 +78,18 @@ class TestUpgradeView(WebTestCase):
self.assertIn('executed', form)
view.configure_form(form)
self.assertNotIn('executed', form)
+ self.assertNotIn('stdout_file', form)
+ self.assertNotIn('stderr_file', form)
# but it *is* shown if upgrade is executed
upgrade.executed = datetime.datetime.now()
+ upgrade.status = enum.UpgradeStatus.SUCCESS
form = view.make_form(model_class=model.Upgrade, model_instance=upgrade)
self.assertIn('executed', form)
view.configure_form(form)
self.assertIn('executed', form)
+ self.assertIn('stdout_file', form)
+ self.assertIn('stderr_file', form)
def test_objectify(self):
model = self.app.model
@@ -101,3 +110,167 @@ class TestUpgradeView(WebTestCase):
self.assertEqual(upgrade.description, "new one")
self.assertIs(upgrade.created_by, user)
self.assertEqual(upgrade.status, enum.UpgradeStatus.PENDING)
+
+ def test_download_path(self):
+ model = self.app.model
+ enum = self.app.enum
+
+ appdir = self.mkdir('app')
+ self.config.setdefault('wutta.appdir', appdir)
+ self.assertEqual(self.app.get_appdir(), appdir)
+
+ user = model.User(username='barney')
+ upgrade = model.Upgrade(description='test', created_by=user,
+ status=enum.UpgradeStatus.PENDING)
+ self.session.add(upgrade)
+ self.session.commit()
+
+ view = self.make_view()
+ uuid = upgrade.uuid
+
+ # no filename
+ path = view.download_path(upgrade, None)
+ self.assertIsNone(path)
+
+ # with filename
+ path = view.download_path(upgrade, 'foo.txt')
+ self.assertEqual(path, os.path.join(appdir, 'data', 'upgrades',
+ uuid[:2], uuid[2:], 'foo.txt'))
+
+ def test_get_upgrade_filepath(self):
+ model = self.app.model
+ enum = self.app.enum
+
+ appdir = self.mkdir('app')
+ self.config.setdefault('wutta.appdir', appdir)
+ self.assertEqual(self.app.get_appdir(), appdir)
+
+ user = model.User(username='barney')
+ upgrade = model.Upgrade(description='test', created_by=user,
+ status=enum.UpgradeStatus.PENDING)
+ self.session.add(upgrade)
+ self.session.commit()
+
+ view = self.make_view()
+ uuid = upgrade.uuid
+
+ # no filename
+ path = view.get_upgrade_filepath(upgrade)
+ self.assertEqual(path, os.path.join(appdir, 'data', 'upgrades',
+ uuid[:2], uuid[2:]))
+
+ # with filename
+ path = view.get_upgrade_filepath(upgrade, 'foo.txt')
+ self.assertEqual(path, os.path.join(appdir, 'data', 'upgrades',
+ uuid[:2], uuid[2:], 'foo.txt'))
+
+ def test_delete_instance(self):
+ model = self.app.model
+ enum = self.app.enum
+
+ appdir = self.mkdir('app')
+ self.config.setdefault('wutta.appdir', appdir)
+ self.assertEqual(self.app.get_appdir(), appdir)
+
+ user = model.User(username='barney')
+ upgrade = model.Upgrade(description='test', created_by=user,
+ status=enum.UpgradeStatus.PENDING)
+ self.session.add(upgrade)
+ self.session.commit()
+
+ view = self.make_view()
+
+ # mock stdout/stderr files
+ upgrade_dir = view.get_upgrade_filepath(upgrade)
+ stdout = view.get_upgrade_filepath(upgrade, 'stdout.log')
+ with open(stdout, 'w') as f:
+ f.write('stdout')
+ stderr = view.get_upgrade_filepath(upgrade, 'stderr.log')
+ with open(stderr, 'w') as f:
+ f.write('stderr')
+
+ # both upgrade and files are deleted
+ self.assertTrue(os.path.exists(upgrade_dir))
+ self.assertTrue(os.path.exists(stdout))
+ self.assertTrue(os.path.exists(stderr))
+ self.assertEqual(self.session.query(model.Upgrade).count(), 1)
+ with patch.object(view, 'Session', return_value=self.session):
+ view.delete_instance(upgrade)
+ self.assertFalse(os.path.exists(upgrade_dir))
+ self.assertFalse(os.path.exists(stdout))
+ self.assertFalse(os.path.exists(stderr))
+ self.assertEqual(self.session.query(model.Upgrade).count(), 0)
+
+ def test_execute_instance(self):
+ model = self.app.model
+ enum = self.app.enum
+
+ appdir = self.mkdir('app')
+ self.config.setdefault('wutta.appdir', appdir)
+ self.assertEqual(self.app.get_appdir(), appdir)
+
+ user = model.User(username='barney')
+ upgrade = model.Upgrade(description='test', created_by=user,
+ status=enum.UpgradeStatus.PENDING)
+ self.session.add(upgrade)
+ self.session.commit()
+
+ view = self.make_view()
+ self.request.user = user
+ python = sys.executable
+
+ # script not yet confiugred
+ self.assertRaises(ConfigurationError, view.execute_instance, upgrade)
+
+ # script w/ success
+ goodpy = self.write_file('good.py', """
+import sys
+sys.stdout.write('hello from good.py')
+sys.exit(0)
+""")
+ self.app.save_setting(self.session, 'wutta.upgrades.command', f'{python} {goodpy}')
+ self.assertIsNone(upgrade.executed)
+ self.assertIsNone(upgrade.executed_by)
+ self.assertEqual(upgrade.status, enum.UpgradeStatus.PENDING)
+ with patch.object(view, 'Session', return_value=self.session):
+ with patch.object(self.config, 'usedb', new=True):
+ view.execute_instance(upgrade)
+ self.assertIsNotNone(upgrade.executed)
+ self.assertIs(upgrade.executed_by, user)
+ self.assertEqual(upgrade.status, enum.UpgradeStatus.SUCCESS)
+ with open(view.get_upgrade_filepath(upgrade, 'stdout.log')) as f:
+ self.assertEqual(f.read(), 'hello from good.py')
+ with open(view.get_upgrade_filepath(upgrade, 'stderr.log')) as f:
+ self.assertEqual(f.read(), '')
+
+ # need a new record for next test
+ upgrade = model.Upgrade(description='test', created_by=user,
+ status=enum.UpgradeStatus.PENDING)
+ self.session.add(upgrade)
+ self.session.commit()
+
+ # script w/ failure
+ badpy = self.write_file('bad.py', """
+import sys
+sys.stderr.write('hello from bad.py')
+sys.exit(42)
+""")
+ self.app.save_setting(self.session, 'wutta.upgrades.command', f'{python} {badpy}')
+ self.assertIsNone(upgrade.executed)
+ self.assertIsNone(upgrade.executed_by)
+ self.assertEqual(upgrade.status, enum.UpgradeStatus.PENDING)
+ with patch.object(view, 'Session', return_value=self.session):
+ with patch.object(self.config, 'usedb', new=True):
+ view.execute_instance(upgrade)
+ self.assertIsNotNone(upgrade.executed)
+ self.assertIs(upgrade.executed_by, user)
+ self.assertEqual(upgrade.status, enum.UpgradeStatus.FAILURE)
+ with open(view.get_upgrade_filepath(upgrade, 'stdout.log')) as f:
+ self.assertEqual(f.read(), '')
+ with open(view.get_upgrade_filepath(upgrade, 'stderr.log')) as f:
+ self.assertEqual(f.read(), 'hello from bad.py')
+
+ def test_configure_get_simple_settings(self):
+ # sanity/coverage check
+ view = self.make_view()
+ simple = view.configure_get_simple_settings()