3
0
Fork 0

feat: add basic support for execute upgrades, download stdout/stderr

upgrade progress is still not being shown yet
This commit is contained in:
Lance Edgar 2024-08-25 12:20:28 -05:00
parent 1a8900c9f4
commit e5e31a7d32
17 changed files with 805 additions and 12 deletions

View file

@ -31,6 +31,7 @@ intersphinx_mapping = {
'deform': ('https://docs.pylonsproject.org/projects/deform/en/latest/', None),
'pyramid': ('https://docs.pylonsproject.org/projects/pyramid/en/latest/', None),
'python': ('https://docs.python.org/3/', None),
'rattail-manual': ('https://rattailproject.org/docs/rattail-manual/', None),
'webhelpers2': ('https://webhelpers2.readthedocs.io/en/latest/', None),
'wuttjamaican': ('https://rattailproject.org/docs/wuttjamaican/', None),
}

View file

@ -31,6 +31,7 @@ classifiers = [
requires-python = ">= 3.8"
dependencies = [
"ColanderAlchemy",
"humanize",
"paginate",
"paginate_sqlalchemy",
"pyramid>=2",

View file

@ -456,3 +456,35 @@ class Permissions(WuttaSet):
kwargs['values'] = values
return widgets.PermissionsWidget(self.request, **kwargs)
class FileDownload(colander.String):
"""
Custom schema type for a file download field.
This field is only meant for readonly use, it does not handle file
uploads.
It expects the incoming ``appstruct`` to be the path to a file on
disk (or null).
Uses the :class:`~wuttaweb.forms.widgets.FileDownloadWidget` by
default.
:param request: Current :term:`request` object.
:param url: Optional URL for hyperlink. If not specified, file
name/size is shown with no hyperlink.
"""
def __init__(self, request, *args, **kwargs):
self.url = kwargs.pop('url', None)
super().__init__(*args, **kwargs)
self.request = request
self.config = self.request.wutta_config
self.app = self.config.get_app()
def widget_maker(self, **kwargs):
""" """
kwargs.setdefault('url', self.url)
return widgets.FileDownloadWidget(self.request, **kwargs)

View file

@ -39,7 +39,10 @@ in the namespace:
* :class:`deform:deform.widget.MoneyInputWidget`
"""
import os
import colander
import humanize
from deform.widget import (Widget, TextInputWidget, TextAreaWidget,
PasswordWidget, CheckedPasswordWidget,
CheckboxWidget, SelectWidget, CheckboxChoiceWidget,
@ -147,6 +150,63 @@ class WuttaCheckboxChoiceWidget(CheckboxChoiceWidget):
self.session = session or Session()
class FileDownloadWidget(Widget):
"""
Widget for use with :class:`~wuttaweb.forms.schema.FileDownload`
fields.
This only supports readonly, and shows a hyperlink to download the
file. Link text is the filename plus file size.
This is a subclass of :class:`deform:deform.widget.Widget` and
uses these Deform templates:
* ``readonly/filedownload``
:param request: Current :term:`request` object.
:param url: Optional URL for hyperlink. If not specified, file
name/size is shown with no hyperlink.
"""
readonly_template = 'readonly/filedownload'
def __init__(self, request, *args, **kwargs):
self.url = kwargs.pop('url', None)
super().__init__(*args, **kwargs)
self.request = request
self.config = self.request.wutta_config
self.app = self.config.get_app()
def serialize(self, field, cstruct, **kw):
""" """
# nb. readonly is the only way this rolls
kw['readonly'] = True
template = self.readonly_template
path = cstruct or None
if path:
kw.setdefault('filename', os.path.basename(path))
kw.setdefault('filesize', self.readable_size(path))
if self.url:
kw.setdefault('url', self.url)
else:
kw.setdefault('filename', None)
kw.setdefault('filesize', None)
kw.setdefault('url', None)
values = self.get_template_values(field, cstruct, kw)
return field.renderer(template, **values)
def readable_size(self, path):
""" """
try:
size = os.path.getsize(path)
except os.error:
size = 0
return humanize.naturalsize(size)
class RoleRefsWidget(WuttaCheckboxChoiceWidget):
"""
Widget for use with User
@ -184,7 +244,7 @@ class RoleRefsWidget(WuttaCheckboxChoiceWidget):
roles = []
if cstruct:
for uuid in cstruct:
role = self.session.query(model.Role).get(uuid)
role = self.session.get(model.Role, uuid)
if role:
roles.append(role)
kw['roles'] = roles
@ -228,6 +288,10 @@ class UserRefsWidget(WuttaCheckboxChoiceWidget):
users.append(dict([(key, getattr(user, key))
for key in columns + ['uuid']]))
# do not render if no data
if not users:
return HTML.tag('span')
# grid
grid = Grid(self.request, key='roles.view.users',
columns=columns, data=users)

View file

@ -0,0 +1,14 @@
<tal:omit>
<a tal:condition="url" href="${url}">
${filename}
<tal:omit tal:condition="filesize">
(${filesize})
</tal:omit>
</a>
<span tal:condition="not url">
${filename}
<tal:omit tal:condition="filesize">
(${filesize})
</tal:omit>
</span>
</tal:omit>

View file

@ -0,0 +1,20 @@
## -*- coding: utf-8; -*-
<%inherit file="/master/configure.mako" />
<%def name="form_content()">
<h3 class="is-size-3">Basics</h3>
<div class="block" style="padding-left: 2rem; width: 50%;">
<b-field label="Upgrade Script (for Execute)"
message="The command + args will be interpreted by the shell.">
<b-input name="${app.appname}.upgrades.command"
v-model="simpleSettings['${app.appname}.upgrades.command']"
@input="settingsNeedSaved = true"
## ref="upgradeSystemCommand"
## expanded
/>
</b-field>
</div>
</%def>

View file

@ -0,0 +1,37 @@
## -*- coding: utf-8; -*-
<%inherit file="/master/view.mako" />
<%def name="page_content()">
${parent.page_content()}
% if instance.status == app.enum.UpgradeStatus.PENDING and master.has_perm('execute'):
<div class="buttons"
style="margin: 2rem 5rem;">
${h.form(master.get_action_url('execute', instance), **{'@submit': 'executeFormSubmit'})}
${h.csrf_token(request)}
<b-button type="is-primary"
native-type="submit"
icon-pack="fas"
icon-left="arrow-circle-right"
:disabled="executeFormSubmitting">
{{ executeFormSubmitting ? "Working, please wait..." : "Execute this upgrade" }}
</b-button>
${h.end_form()}
</div>
% endif
</%def>
<%def name="modify_vue_vars()">
${parent.modify_vue_vars()}
% if instance.status == app.enum.UpgradeStatus.PENDING and master.has_perm('execute'):
<script>
ThisPageData.executeFormSubmitting = false
ThisPage.methods.executeFormSubmit = function() {
this.executeFormSubmitting = true
}
</script>
% endif
</%def>

View file

@ -24,8 +24,11 @@
Base Logic for Views
"""
import os
from pyramid import httpexceptions
from pyramid.renderers import render_to_response
from pyramid.response import FileResponse
from wuttaweb import forms, grids
@ -119,9 +122,46 @@ class View:
"""
return httpexceptions.HTTPFound(location=url, **kwargs)
def file_response(self, path, attachment=True, filename=None):
"""
Returns a generic file response for the given path.
:param path: Path to a file on local disk; must be accessible
by the web app.
:param attachment: Whether the file should come down as an
"attachment" instead of main payload.
The attachment behavior is the default here, and will cause
the user to be prompted for where to save the file.
Set ``attachment=False`` in order to cause the browser to
render the file as if it were the page being navigated to.
:param filename: Optional filename to use for attachment
behavior. This will be the "suggested filename" when user
is prompted to save the download. If not specified, the
filename is derived from ``path``.
:returns: A :class:`~pyramid:pyramid.response.FileResponse`
object with file content.
"""
if not os.path.exists(path):
return self.notfound()
response = FileResponse(path, request=self.request)
response.content_length = os.path.getsize(path)
if attachment:
if not filename:
filename = os.path.basename(path)
response.content_disposition = f'attachment; filename="{filename}"'
return response
def json_response(self, context):
"""
Convenience method to return a JSON response.
Returns a JSON response with the given context data.
:param context: Context data to be rendered as JSON.

View file

@ -160,6 +160,9 @@ class CommonView(View):
'upgrades.view',
'upgrades.edit',
'upgrades.delete',
'upgrades.execute',
'upgrades.download',
'upgrades.configure',
'users.list',
'users.create',
'users.view',

View file

@ -25,6 +25,7 @@ Base Logic for Master Views
"""
import logging
import os
import threading
import sqlalchemy as sa
@ -322,6 +323,18 @@ class MasterView(View):
"autocomplete" - i.e. it should have an :meth:`autocomplete()`
view. Default is ``False``.
.. attribute:: downloadable
Boolean indicating whether the view model supports
"downloading" - i.e. it should have a :meth:`download()` view.
Default is ``False``.
.. attribute:: executable
Boolean indicating whether the view model supports "executing"
- i.e. it should have an :meth:`execute()` view. Default is
``False``.
.. attribute:: configurable
Boolean indicating whether the master view supports
@ -350,6 +363,8 @@ class MasterView(View):
deletable_bulk = False
deletable_bulk_quick = False
has_autocomplete = False
downloadable = False
executable = False
configurable = False
# current action
@ -842,6 +857,126 @@ class MasterView(View):
'label': str(obj),
}
##############################
# download methods
##############################
def download(self):
"""
View to download a file associated with a model record.
This usually corresponds to a URL like
``/widgets/XXX/download`` where ``XXX`` represents the key/ID
for the record.
By default, this view is included only if :attr:`downloadable`
is true.
This method will (try to) locate the file on disk, and return
it as a file download response to the client.
The GET request for this view may contain a ``filename`` query
string parameter, which can be used to locate one of various
files associated with the model record. This filename is
passed to :meth:`download_path()` for locating the file.
For instance: ``/widgets/XXX/download?filename=widget-specs.txt``
Subclass normally should not override this method, but rather
one of the related methods which are called (in)directly by
this one:
* :meth:`download_path()`
"""
obj = self.get_instance()
filename = self.request.GET.get('filename', None)
path = self.download_path(obj, filename)
if not path or not os.path.exists(path):
return self.notfound()
return self.file_response(path)
def download_path(self, obj, filename):
"""
Should return absolute path on disk, for the given object and
filename. Result will be used to return a file response to
client. This is called by :meth:`download()`.
Default logic always returns ``None``; subclass must override.
:param obj: Refefence to the model instance.
:param filename: Name of file for which to retrieve the path.
:returns: Path to file, or ``None`` if not found.
Note that ``filename`` may be ``None`` in which case the "default"
file path should be returned, if applicable.
If this method returns ``None`` (as it does by default) then
the :meth:`download()` view will return a 404 not found
response.
"""
##############################
# execute methods
##############################
def execute(self):
"""
View to "execute" a model record. Requires a POST request.
This usually corresponds to a URL like
``/widgets/XXX/execute`` where ``XXX`` represents the key/ID
for the record.
By default, this view is included only if :attr:`executable` is
true.
Probably this is a "rare" view to implement for a model. But
there are two notable use cases so far, namely:
* upgrades (cf. :class:`~wuttaweb.views.upgrades.UpgradeView`)
* batches (not yet implemented;
cf. :doc:`rattail-manual:data/batch/index` in Rattail
Manual)
The general idea is to take some "irrevocable" action
associated with the model record. In the case of upgrades, it
is to run the upgrade script. For batches it is to "push
live" the data held within the batch.
Subclass normally should not override this method, but rather
one of the related methods which are called (in)directly by
this one:
* :meth:`execute_instance()`
"""
model_title = self.get_model_title()
obj = self.get_instance()
try:
self.execute_instance(obj)
except Exception as error:
log.exception("failed to execute %s: %s", model_title, obj)
error = str(error) or error.__class__.__name__
self.request.session.flash(error, 'error')
else:
self.request.session.flash(f"{model_title} was executed.")
return self.redirect(self.get_action_url('view', obj))
def execute_instance(self, obj):
"""
Perform the actual "execution" logic for a model record.
Called by :meth:`execute()`.
This method does nothing by default; subclass must override.
:param obj: Reference to the model instance.
"""
##############################
# configure methods
##############################
@ -2370,6 +2505,29 @@ class MasterView(View):
renderer='json',
permission=f'{route_prefix}.list')
# download
if cls.downloadable:
config.add_route(f'{route_prefix}.download',
f'{instance_url_prefix}/download')
config.add_view(cls, attr='download',
route_name=f'{route_prefix}.download',
permission=f'{permission_prefix}.download')
config.add_wutta_permission(permission_prefix,
f'{permission_prefix}.download',
f"Download file(s) for {model_title}")
# execute
if cls.executable:
config.add_route(f'{route_prefix}.execute',
f'{instance_url_prefix}/execute',
request_method='POST')
config.add_view(cls, attr='execute',
route_name=f'{route_prefix}.execute',
permission=f'{permission_prefix}.execute')
config.add_wutta_permission(permission_prefix,
f'{permission_prefix}.execute',
f"Execute {model_title}")
# configure
if cls.configurable:
config.add_route(f'{route_prefix}.configure',

View file

@ -24,12 +24,21 @@
Upgrade Views
"""
import datetime
import logging
import os
import shutil
import subprocess
from sqlalchemy import orm
from wuttjamaican.db.model import Upgrade
from wuttaweb.views import MasterView
from wuttaweb.forms import widgets
from wuttaweb.forms.schema import UserRef, WuttaEnum
from wuttaweb.forms.schema import UserRef, WuttaEnum, FileDownload
log = logging.getLogger(__name__)
class UpgradeView(MasterView):
@ -47,6 +56,9 @@ class UpgradeView(MasterView):
* ``/upgrades/XXX/delete``
"""
model_class = Upgrade
executable = True
downloadable = True
configurable = True
grid_columns = [
'created',
@ -81,6 +93,9 @@ class UpgradeView(MasterView):
# status
g.set_renderer('status', self.grid_render_enum, enum=enum.UpgradeStatus)
# executed
g.set_renderer('executed', self.grid_render_datetime)
# executed_by
g.set_link('executed_by')
Executor = orm.aliased(model.User)
@ -138,10 +153,6 @@ class UpgradeView(MasterView):
else:
f.set_node('status', WuttaEnum(self.request, enum.UpgradeStatus))
# exit_code
if self.creating or not upgrade.executed:
f.remove('exit_code')
# executed
if self.creating or self.editing or not upgrade.executed:
f.remove('executed')
@ -152,6 +163,39 @@ class UpgradeView(MasterView):
else:
f.set_node('executed_by', UserRef(self.request))
# exit_code
if self.creating or self.editing or not upgrade.executed:
f.remove('exit_code')
# stdout / stderr
if not (self.creating or self.editing) and upgrade.status in (
enum.UpgradeStatus.SUCCESS, enum.UpgradeStatus.FAILURE):
# stdout_file
f.append('stdout_file')
f.set_label('stdout_file', "STDOUT")
url = self.get_action_url('download', upgrade, _query={'filename': 'stdout.log'})
f.set_node('stdout_file', FileDownload(self.request, url=url))
f.set_default('stdout_file', self.get_upgrade_filepath(upgrade, 'stdout.log'))
# stderr_file
f.append('stderr_file')
f.set_label('stderr_file', "STDERR")
url = self.get_action_url('download', upgrade, _query={'filename': 'stderr.log'})
f.set_node('stderr_file', FileDownload(self.request, url=url))
f.set_default('stderr_file', self.get_upgrade_filepath(upgrade, 'stderr.log'))
def delete_instance(self, upgrade):
"""
We override this method to delete any files associated with
the upgrade, in addition to deleting the upgrade proper.
"""
path = self.get_upgrade_filepath(upgrade, create=False)
if os.path.exists(path):
shutil.rmtree(path)
super().delete_instance(upgrade)
def objectify(self, form):
""" """
upgrade = super().objectify(form)
@ -164,6 +208,71 @@ class UpgradeView(MasterView):
return upgrade
def download_path(self, upgrade, filename):
""" """
if filename:
return self.get_upgrade_filepath(upgrade, filename)
def get_upgrade_filepath(self, upgrade, filename=None, create=True):
""" """
uuid = upgrade.uuid
path = self.app.get_appdir('data', 'upgrades', uuid[:2], uuid[2:],
create=create)
if filename:
path = os.path.join(path, filename)
return path
def execute_instance(self, upgrade):
"""
This method runs the actual upgrade.
Default logic will get the script command from config, and run
it via shell in a subprocess.
The ``stdout`` and ``stderr`` streams are captured to separate
log files which are then available to download.
The upgrade itself is marked as "executed" with status of
either ``SUCCESS`` or ``FAILURE``.
"""
enum = self.app.enum
script = self.config.require(f'{self.app.appname}.upgrades.command',
session=self.Session())
stdout_path = self.get_upgrade_filepath(upgrade, 'stdout.log')
stderr_path = self.get_upgrade_filepath(upgrade, 'stderr.log')
# run the command
log.debug("running upgrade command: %s", script)
with open(stdout_path, 'wb') as stdout:
with open(stderr_path, 'wb') as stderr:
upgrade.exit_code = subprocess.call(script, shell=True,
stdout=stdout, stderr=stderr)
logger = log.warning if upgrade.exit_code != 0 else log.debug
logger("upgrade command had non-zero exit code: %s", upgrade.exit_code)
# declare it complete
upgrade.executed = datetime.datetime.now()
upgrade.executed_by = self.request.user
if upgrade.exit_code == 0:
upgrade.status = enum.UpgradeStatus.SUCCESS
else:
upgrade.status = enum.UpgradeStatus.FAILURE
def configure_get_simple_settings(self):
""" """
script = self.config.get(f'{self.app.appname}.upgrades.command')
if not script:
pass
return [
# basics
{'name': f'{self.app.appname}.upgrades.command',
'default': script},
]
@classmethod
def defaults(cls, config):
""" """

View file

@ -316,3 +316,18 @@ class TestPermissions(DataTestCase):
widget = typ.widget_maker()
self.assertEqual(len(widget.values), 1)
self.assertEqual(widget.values[0], ('widgets.polish', "Polish the widgets"))
class TestFileDownload(DataTestCase):
def setUp(self):
self.setup_db()
self.request = testing.DummyRequest(wutta_config=self.config)
def test_widget_maker(self):
# sanity / coverage check
typ = mod.FileDownload(self.request, url='/foo')
widget = typ.widget_maker()
self.assertIsInstance(widget, widgets.FileDownloadWidget)
self.assertEqual(widget.url, '/foo')

View file

@ -7,7 +7,7 @@ import deform
from pyramid import testing
from wuttaweb.forms import widgets as mod
from wuttaweb.forms.schema import PersonRef, RoleRefs, UserRefs, Permissions
from wuttaweb.forms.schema import FileDownload, PersonRef, RoleRefs, UserRefs, Permissions
from tests.util import WebTestCase
@ -52,6 +52,55 @@ class TestObjectRefWidget(WebTestCase):
self.assertIn('href="/foo"', html)
class TestFileDownloadWidget(WebTestCase):
def make_field(self, node, **kwargs):
# TODO: not sure why default renderer is in use even though
# pyramid_deform was included in setup? but this works..
kwargs.setdefault('renderer', deform.Form.default_renderer)
return deform.Field(node, **kwargs)
def test_serialize(self):
# nb. we let the field construct the widget via our type
# (nb. at first we do not provide a url)
node = colander.SchemaNode(FileDownload(self.request))
field = self.make_field(node)
widget = field.widget
# null value
html = widget.serialize(field, None, readonly=True)
self.assertNotIn('<a ', html)
self.assertIn('<span>', html)
# path to nonexistent file
html = widget.serialize(field, '/this/path/does/not/exist', readonly=True)
self.assertNotIn('<a ', html)
self.assertIn('<span>', html)
# path to actual file
datfile = self.write_file('data.txt', "hello\n" * 1000)
html = widget.serialize(field, datfile, readonly=True)
self.assertNotIn('<a ', html)
self.assertIn('<span>', html)
self.assertIn('data.txt', html)
self.assertIn('kB)', html)
# path to file, w/ url
node = colander.SchemaNode(FileDownload(self.request, url='/download/blarg'))
field = self.make_field(node)
widget = field.widget
html = widget.serialize(field, datfile, readonly=True)
self.assertNotIn('<span>', html)
self.assertIn('<a href="/download/blarg">', html)
self.assertIn('data.txt', html)
self.assertIn('kB)', html)
# nb. same readonly output even if we ask for editable
html2 = widget.serialize(field, datfile, readonly=False)
self.assertEqual(html2, html)
class TestRoleRefsWidget(WebTestCase):
def make_field(self, node, **kwargs):
@ -113,7 +162,7 @@ class TestUserRefsWidget(WebTestCase):
# empty
html = widget.serialize(field, set(), readonly=True)
self.assertIn('<b-table ', html)
self.assertEqual(html, '<span></span>')
# with data, no actions
user = model.User(username='barney')

View file

@ -6,11 +6,12 @@ from unittest.mock import MagicMock
from pyramid import testing
from wuttjamaican.conf import WuttaConfig
from wuttjamaican.testing import FileConfigTestCase
from wuttaweb import subscribers
from wuttaweb.menus import MenuHandler
class DataTestCase(TestCase):
class DataTestCase(FileConfigTestCase):
"""
Base class for test suites requiring a full (typical) database.
"""
@ -19,6 +20,7 @@ class DataTestCase(TestCase):
self.setup_db()
def setup_db(self):
self.setup_files()
self.config = WuttaConfig(defaults={
'wutta.db.default.url': 'sqlite://',
})
@ -33,7 +35,7 @@ class DataTestCase(TestCase):
self.teardown_db()
def teardown_db(self):
pass
self.teardown_files()
class WebTestCase(DataTestCase):

View file

@ -50,6 +50,26 @@ class TestView(WebTestCase):
self.assertIsInstance(error, HTTPFound)
self.assertEqual(error.location, '/')
def test_file_response(self):
view = self.make_view()
# default uses attachment behavior
datfile = self.write_file('dat.txt', 'hello')
response = view.file_response(datfile)
self.assertEqual(response.status_code, 200)
self.assertEqual(response.content_disposition, 'attachment; filename="dat.txt"')
# but can disable attachment behavior
datfile = self.write_file('dat.txt', 'hello')
response = view.file_response(datfile, attachment=False)
self.assertEqual(response.status_code, 200)
self.assertIsNone(response.content_disposition)
# path not found
crapfile = '/does/not/exist'
response = view.file_response(crapfile)
self.assertEqual(response.status_code, 404)
def test_json_response(self):
view = self.make_view()
response = view.json_response({'foo': 'bar'})

View file

@ -30,6 +30,8 @@ class TestMasterView(WebTestCase):
model_key='uuid',
deletable_bulk=True,
has_autocomplete=True,
downloadable=True,
executable=True,
configurable=True):
mod.MasterView.defaults(self.pyramid_config)
@ -1310,6 +1312,59 @@ class TestMasterView(WebTestCase):
self.assertEqual(normal, {'value': 'bogus',
'label': "Betty Boop"})
def test_download(self):
model = self.app.model
self.app.save_setting(self.session, 'foo', 'bar')
self.session.commit()
with patch.multiple(mod.MasterView, create=True,
model_class=model.Setting,
model_key='name',
Session=MagicMock(return_value=self.session)):
view = self.make_view()
self.request.matchdict = {'name': 'foo'}
# 404 if no filename
response = view.download()
self.assertEqual(response.status_code, 404)
# 404 if bad filename
self.request.GET = {'filename': 'doesnotexist'}
response = view.download()
self.assertEqual(response.status_code, 404)
# 200 if good filename
foofile = self.write_file('foo.txt', 'foo')
with patch.object(view, 'download_path', return_value=foofile):
response = view.download()
self.assertEqual(response.status_code, 200)
self.assertEqual(response.content_disposition, 'attachment; filename="foo.txt"')
def test_execute(self):
self.pyramid_config.add_route('settings.view', '/settings/{name}')
model = self.app.model
self.app.save_setting(self.session, 'foo', 'bar')
self.session.commit()
with patch.multiple(mod.MasterView, create=True,
model_class=model.Setting,
model_key='name',
Session=MagicMock(return_value=self.session)):
view = self.make_view()
self.request.matchdict = {'name': 'foo'}
# basic usage, redirects to view obj url
response = view.execute()
self.assertEqual(response.status_code, 302)
self.assertEqual(self.request.session.pop_flash(), ["Setting was executed."])
# execution error
with patch.object(view, 'execute_instance', side_effect=RuntimeError):
response = view.execute()
self.assertEqual(response.status_code, 302)
self.assertEqual(self.request.session.pop_flash(), [])
self.assertEqual(self.request.session.pop_flash('error'), ["RuntimeError"])
def test_configure(self):
self.pyramid_config.include('wuttaweb.views.common')
self.pyramid_config.include('wuttaweb.views.auth')

View file

@ -1,9 +1,12 @@
# -*- coding: utf-8; -*-
import datetime
import os
import sys
from unittest.mock import patch, MagicMock
from wuttaweb.views import upgrades as mod
from wuttjamaican.exc import ConfigurationError
from tests.util import WebTestCase
@ -42,6 +45,7 @@ class TestUpgradeView(WebTestCase):
self.assertEqual(view.grid_row_class(upgrade, data, 1), 'has-background-warning')
def test_configure_form(self):
self.pyramid_config.add_route('upgrades.download', '/upgrades/{uuid}/download')
model = self.app.model
enum = self.app.enum
user = model.User(username='barney')
@ -66,7 +70,7 @@ class TestUpgradeView(WebTestCase):
view.configure_form(form)
self.assertNotIn('created', form)
# test executed field when viewing
# test executed, stdout/stderr when viewing
with patch.object(view, 'viewing', new=True):
# executed is *not* shown by default
@ -74,13 +78,18 @@ class TestUpgradeView(WebTestCase):
self.assertIn('executed', form)
view.configure_form(form)
self.assertNotIn('executed', form)
self.assertNotIn('stdout_file', form)
self.assertNotIn('stderr_file', form)
# but it *is* shown if upgrade is executed
upgrade.executed = datetime.datetime.now()
upgrade.status = enum.UpgradeStatus.SUCCESS
form = view.make_form(model_class=model.Upgrade, model_instance=upgrade)
self.assertIn('executed', form)
view.configure_form(form)
self.assertIn('executed', form)
self.assertIn('stdout_file', form)
self.assertIn('stderr_file', form)
def test_objectify(self):
model = self.app.model
@ -101,3 +110,167 @@ class TestUpgradeView(WebTestCase):
self.assertEqual(upgrade.description, "new one")
self.assertIs(upgrade.created_by, user)
self.assertEqual(upgrade.status, enum.UpgradeStatus.PENDING)
def test_download_path(self):
model = self.app.model
enum = self.app.enum
appdir = self.mkdir('app')
self.config.setdefault('wutta.appdir', appdir)
self.assertEqual(self.app.get_appdir(), appdir)
user = model.User(username='barney')
upgrade = model.Upgrade(description='test', created_by=user,
status=enum.UpgradeStatus.PENDING)
self.session.add(upgrade)
self.session.commit()
view = self.make_view()
uuid = upgrade.uuid
# no filename
path = view.download_path(upgrade, None)
self.assertIsNone(path)
# with filename
path = view.download_path(upgrade, 'foo.txt')
self.assertEqual(path, os.path.join(appdir, 'data', 'upgrades',
uuid[:2], uuid[2:], 'foo.txt'))
def test_get_upgrade_filepath(self):
model = self.app.model
enum = self.app.enum
appdir = self.mkdir('app')
self.config.setdefault('wutta.appdir', appdir)
self.assertEqual(self.app.get_appdir(), appdir)
user = model.User(username='barney')
upgrade = model.Upgrade(description='test', created_by=user,
status=enum.UpgradeStatus.PENDING)
self.session.add(upgrade)
self.session.commit()
view = self.make_view()
uuid = upgrade.uuid
# no filename
path = view.get_upgrade_filepath(upgrade)
self.assertEqual(path, os.path.join(appdir, 'data', 'upgrades',
uuid[:2], uuid[2:]))
# with filename
path = view.get_upgrade_filepath(upgrade, 'foo.txt')
self.assertEqual(path, os.path.join(appdir, 'data', 'upgrades',
uuid[:2], uuid[2:], 'foo.txt'))
def test_delete_instance(self):
model = self.app.model
enum = self.app.enum
appdir = self.mkdir('app')
self.config.setdefault('wutta.appdir', appdir)
self.assertEqual(self.app.get_appdir(), appdir)
user = model.User(username='barney')
upgrade = model.Upgrade(description='test', created_by=user,
status=enum.UpgradeStatus.PENDING)
self.session.add(upgrade)
self.session.commit()
view = self.make_view()
# mock stdout/stderr files
upgrade_dir = view.get_upgrade_filepath(upgrade)
stdout = view.get_upgrade_filepath(upgrade, 'stdout.log')
with open(stdout, 'w') as f:
f.write('stdout')
stderr = view.get_upgrade_filepath(upgrade, 'stderr.log')
with open(stderr, 'w') as f:
f.write('stderr')
# both upgrade and files are deleted
self.assertTrue(os.path.exists(upgrade_dir))
self.assertTrue(os.path.exists(stdout))
self.assertTrue(os.path.exists(stderr))
self.assertEqual(self.session.query(model.Upgrade).count(), 1)
with patch.object(view, 'Session', return_value=self.session):
view.delete_instance(upgrade)
self.assertFalse(os.path.exists(upgrade_dir))
self.assertFalse(os.path.exists(stdout))
self.assertFalse(os.path.exists(stderr))
self.assertEqual(self.session.query(model.Upgrade).count(), 0)
def test_execute_instance(self):
model = self.app.model
enum = self.app.enum
appdir = self.mkdir('app')
self.config.setdefault('wutta.appdir', appdir)
self.assertEqual(self.app.get_appdir(), appdir)
user = model.User(username='barney')
upgrade = model.Upgrade(description='test', created_by=user,
status=enum.UpgradeStatus.PENDING)
self.session.add(upgrade)
self.session.commit()
view = self.make_view()
self.request.user = user
python = sys.executable
# script not yet confiugred
self.assertRaises(ConfigurationError, view.execute_instance, upgrade)
# script w/ success
goodpy = self.write_file('good.py', """
import sys
sys.stdout.write('hello from good.py')
sys.exit(0)
""")
self.app.save_setting(self.session, 'wutta.upgrades.command', f'{python} {goodpy}')
self.assertIsNone(upgrade.executed)
self.assertIsNone(upgrade.executed_by)
self.assertEqual(upgrade.status, enum.UpgradeStatus.PENDING)
with patch.object(view, 'Session', return_value=self.session):
with patch.object(self.config, 'usedb', new=True):
view.execute_instance(upgrade)
self.assertIsNotNone(upgrade.executed)
self.assertIs(upgrade.executed_by, user)
self.assertEqual(upgrade.status, enum.UpgradeStatus.SUCCESS)
with open(view.get_upgrade_filepath(upgrade, 'stdout.log')) as f:
self.assertEqual(f.read(), 'hello from good.py')
with open(view.get_upgrade_filepath(upgrade, 'stderr.log')) as f:
self.assertEqual(f.read(), '')
# need a new record for next test
upgrade = model.Upgrade(description='test', created_by=user,
status=enum.UpgradeStatus.PENDING)
self.session.add(upgrade)
self.session.commit()
# script w/ failure
badpy = self.write_file('bad.py', """
import sys
sys.stderr.write('hello from bad.py')
sys.exit(42)
""")
self.app.save_setting(self.session, 'wutta.upgrades.command', f'{python} {badpy}')
self.assertIsNone(upgrade.executed)
self.assertIsNone(upgrade.executed_by)
self.assertEqual(upgrade.status, enum.UpgradeStatus.PENDING)
with patch.object(view, 'Session', return_value=self.session):
with patch.object(self.config, 'usedb', new=True):
view.execute_instance(upgrade)
self.assertIsNotNone(upgrade.executed)
self.assertIs(upgrade.executed_by, user)
self.assertEqual(upgrade.status, enum.UpgradeStatus.FAILURE)
with open(view.get_upgrade_filepath(upgrade, 'stdout.log')) as f:
self.assertEqual(f.read(), '')
with open(view.get_upgrade_filepath(upgrade, 'stderr.log')) as f:
self.assertEqual(f.read(), 'hello from bad.py')
def test_configure_get_simple_settings(self):
# sanity/coverage check
view = self.make_view()
simple = view.configure_get_simple_settings()