8669ca2283
show scrolling stdout from subprocess nb. this does *not* show stderr, although that is captured
365 lines
14 KiB
Python
365 lines
14 KiB
Python
# -*- coding: utf-8; -*-
|
|
|
|
import datetime
|
|
import os
|
|
import sys
|
|
from unittest.mock import patch, MagicMock
|
|
|
|
from wuttaweb.views import upgrades as mod
|
|
from wuttjamaican.exc import ConfigurationError
|
|
from wuttaweb.progress import get_progress_session
|
|
from tests.util import WebTestCase
|
|
|
|
|
|
class TestUpgradeView(WebTestCase):
|
|
|
|
def make_view(self):
|
|
return mod.UpgradeView(self.request)
|
|
|
|
def test_includeme(self):
|
|
self.pyramid_config.include('wuttaweb.views.upgrades')
|
|
|
|
def test_configure_grid(self):
|
|
model = self.app.model
|
|
view = self.make_view()
|
|
|
|
# sanity / coverage check
|
|
grid = view.make_grid(model_class=model.Upgrade)
|
|
view.configure_grid(grid)
|
|
|
|
def test_grid_row_class(self):
|
|
model = self.app.model
|
|
enum = self.app.enum
|
|
upgrade = model.Upgrade(description="test", status=enum.UpgradeStatus.PENDING)
|
|
data = dict(upgrade)
|
|
view = self.make_view()
|
|
|
|
self.assertIsNone(view.grid_row_class(upgrade, data, 1))
|
|
|
|
upgrade.status = enum.UpgradeStatus.EXECUTING
|
|
self.assertEqual(view.grid_row_class(upgrade, data, 1), 'has-background-warning')
|
|
|
|
upgrade.status = enum.UpgradeStatus.SUCCESS
|
|
self.assertIsNone(view.grid_row_class(upgrade, data, 1))
|
|
|
|
upgrade.status = enum.UpgradeStatus.FAILURE
|
|
self.assertEqual(view.grid_row_class(upgrade, data, 1), 'has-background-warning')
|
|
|
|
def test_configure_form(self):
|
|
self.pyramid_config.add_route('upgrades.download', '/upgrades/{uuid}/download')
|
|
model = self.app.model
|
|
enum = self.app.enum
|
|
user = model.User(username='barney')
|
|
self.session.add(user)
|
|
upgrade = model.Upgrade(description='test', created_by=user,
|
|
status=enum.UpgradeStatus.PENDING)
|
|
self.session.add(upgrade)
|
|
self.session.commit()
|
|
view = self.make_view()
|
|
|
|
# some fields exist when viewing
|
|
with patch.object(view, 'viewing', new=True):
|
|
form = view.make_form(model_class=model.Upgrade, model_instance=upgrade)
|
|
self.assertIn('created', form)
|
|
view.configure_form(form)
|
|
self.assertIn('created', form)
|
|
|
|
# but then are removed when creating
|
|
with patch.object(view, 'creating', new=True):
|
|
form = view.make_form(model_class=model.Upgrade)
|
|
self.assertIn('created', form)
|
|
view.configure_form(form)
|
|
self.assertNotIn('created', form)
|
|
|
|
# test executed, stdout/stderr when viewing
|
|
with patch.object(view, 'viewing', new=True):
|
|
|
|
# executed is *not* shown by default
|
|
form = view.make_form(model_class=model.Upgrade, model_instance=upgrade)
|
|
self.assertIn('executed', form)
|
|
view.configure_form(form)
|
|
self.assertNotIn('executed', form)
|
|
self.assertNotIn('stdout_file', form)
|
|
self.assertNotIn('stderr_file', form)
|
|
|
|
# but it *is* shown if upgrade is executed
|
|
upgrade.executed = datetime.datetime.now()
|
|
upgrade.status = enum.UpgradeStatus.SUCCESS
|
|
form = view.make_form(model_class=model.Upgrade, model_instance=upgrade)
|
|
self.assertIn('executed', form)
|
|
view.configure_form(form)
|
|
self.assertIn('executed', form)
|
|
self.assertIn('stdout_file', form)
|
|
self.assertIn('stderr_file', form)
|
|
|
|
def test_objectify(self):
|
|
model = self.app.model
|
|
enum = self.app.enum
|
|
user = model.User(username='barney')
|
|
self.session.add(user)
|
|
self.session.commit()
|
|
view = self.make_view()
|
|
|
|
# user and status are auto-set when creating
|
|
self.request.user = user
|
|
self.request.method = 'POST'
|
|
self.request.POST = {'description': "new one"}
|
|
with patch.object(view, 'creating', new=True):
|
|
form = view.make_model_form()
|
|
self.assertTrue(form.validate())
|
|
upgrade = view.objectify(form)
|
|
self.assertEqual(upgrade.description, "new one")
|
|
self.assertIs(upgrade.created_by, user)
|
|
self.assertEqual(upgrade.status, enum.UpgradeStatus.PENDING)
|
|
|
|
def test_download_path(self):
|
|
model = self.app.model
|
|
enum = self.app.enum
|
|
|
|
appdir = self.mkdir('app')
|
|
self.config.setdefault('wutta.appdir', appdir)
|
|
self.assertEqual(self.app.get_appdir(), appdir)
|
|
|
|
user = model.User(username='barney')
|
|
upgrade = model.Upgrade(description='test', created_by=user,
|
|
status=enum.UpgradeStatus.PENDING)
|
|
self.session.add(upgrade)
|
|
self.session.commit()
|
|
|
|
view = self.make_view()
|
|
uuid = upgrade.uuid
|
|
|
|
# no filename
|
|
path = view.download_path(upgrade, None)
|
|
self.assertIsNone(path)
|
|
|
|
# with filename
|
|
path = view.download_path(upgrade, 'foo.txt')
|
|
self.assertEqual(path, os.path.join(appdir, 'data', 'upgrades',
|
|
uuid[:2], uuid[2:], 'foo.txt'))
|
|
|
|
def test_get_upgrade_filepath(self):
|
|
model = self.app.model
|
|
enum = self.app.enum
|
|
|
|
appdir = self.mkdir('app')
|
|
self.config.setdefault('wutta.appdir', appdir)
|
|
self.assertEqual(self.app.get_appdir(), appdir)
|
|
|
|
user = model.User(username='barney')
|
|
upgrade = model.Upgrade(description='test', created_by=user,
|
|
status=enum.UpgradeStatus.PENDING)
|
|
self.session.add(upgrade)
|
|
self.session.commit()
|
|
|
|
view = self.make_view()
|
|
uuid = upgrade.uuid
|
|
|
|
# no filename
|
|
path = view.get_upgrade_filepath(upgrade)
|
|
self.assertEqual(path, os.path.join(appdir, 'data', 'upgrades',
|
|
uuid[:2], uuid[2:]))
|
|
|
|
# with filename
|
|
path = view.get_upgrade_filepath(upgrade, 'foo.txt')
|
|
self.assertEqual(path, os.path.join(appdir, 'data', 'upgrades',
|
|
uuid[:2], uuid[2:], 'foo.txt'))
|
|
|
|
def test_delete_instance(self):
|
|
model = self.app.model
|
|
enum = self.app.enum
|
|
|
|
appdir = self.mkdir('app')
|
|
self.config.setdefault('wutta.appdir', appdir)
|
|
self.assertEqual(self.app.get_appdir(), appdir)
|
|
|
|
user = model.User(username='barney')
|
|
upgrade = model.Upgrade(description='test', created_by=user,
|
|
status=enum.UpgradeStatus.PENDING)
|
|
self.session.add(upgrade)
|
|
self.session.commit()
|
|
|
|
view = self.make_view()
|
|
|
|
# mock stdout/stderr files
|
|
upgrade_dir = view.get_upgrade_filepath(upgrade)
|
|
stdout = view.get_upgrade_filepath(upgrade, 'stdout.log')
|
|
with open(stdout, 'w') as f:
|
|
f.write('stdout')
|
|
stderr = view.get_upgrade_filepath(upgrade, 'stderr.log')
|
|
with open(stderr, 'w') as f:
|
|
f.write('stderr')
|
|
|
|
# both upgrade and files are deleted
|
|
self.assertTrue(os.path.exists(upgrade_dir))
|
|
self.assertTrue(os.path.exists(stdout))
|
|
self.assertTrue(os.path.exists(stderr))
|
|
self.assertEqual(self.session.query(model.Upgrade).count(), 1)
|
|
with patch.object(view, 'Session', return_value=self.session):
|
|
view.delete_instance(upgrade)
|
|
self.assertFalse(os.path.exists(upgrade_dir))
|
|
self.assertFalse(os.path.exists(stdout))
|
|
self.assertFalse(os.path.exists(stderr))
|
|
self.assertEqual(self.session.query(model.Upgrade).count(), 0)
|
|
|
|
def test_execute_instance(self):
|
|
model = self.app.model
|
|
enum = self.app.enum
|
|
|
|
appdir = self.mkdir('app')
|
|
self.config.setdefault('wutta.appdir', appdir)
|
|
self.assertEqual(self.app.get_appdir(), appdir)
|
|
|
|
user = model.User(username='barney')
|
|
upgrade = model.Upgrade(description='test', created_by=user,
|
|
status=enum.UpgradeStatus.PENDING)
|
|
self.session.add(upgrade)
|
|
self.session.commit()
|
|
|
|
view = self.make_view()
|
|
self.request.user = user
|
|
python = sys.executable
|
|
|
|
# script not yet confiugred
|
|
self.assertRaises(ConfigurationError, view.execute_instance, upgrade, user)
|
|
|
|
# script w/ success
|
|
goodpy = self.write_file('good.py', """
|
|
import sys
|
|
sys.stdout.write('hello from good.py')
|
|
sys.exit(0)
|
|
""")
|
|
self.app.save_setting(self.session, 'wutta.upgrades.command', f'{python} {goodpy}')
|
|
self.assertIsNone(upgrade.executed)
|
|
self.assertIsNone(upgrade.executed_by)
|
|
self.assertEqual(upgrade.status, enum.UpgradeStatus.PENDING)
|
|
with patch.object(view, 'Session', return_value=self.session):
|
|
with patch.object(self.config, 'usedb', new=True):
|
|
view.execute_instance(upgrade, user)
|
|
self.assertIsNotNone(upgrade.executed)
|
|
self.assertIs(upgrade.executed_by, user)
|
|
self.assertEqual(upgrade.status, enum.UpgradeStatus.SUCCESS)
|
|
with open(view.get_upgrade_filepath(upgrade, 'stdout.log')) as f:
|
|
self.assertEqual(f.read(), 'hello from good.py')
|
|
with open(view.get_upgrade_filepath(upgrade, 'stderr.log')) as f:
|
|
self.assertEqual(f.read(), '')
|
|
|
|
# need a new record for next test
|
|
upgrade = model.Upgrade(description='test', created_by=user,
|
|
status=enum.UpgradeStatus.PENDING)
|
|
self.session.add(upgrade)
|
|
self.session.commit()
|
|
|
|
# script w/ failure
|
|
badpy = self.write_file('bad.py', """
|
|
import sys
|
|
sys.stderr.write('hello from bad.py')
|
|
sys.exit(42)
|
|
""")
|
|
self.app.save_setting(self.session, 'wutta.upgrades.command', f'{python} {badpy}')
|
|
self.assertIsNone(upgrade.executed)
|
|
self.assertIsNone(upgrade.executed_by)
|
|
self.assertEqual(upgrade.status, enum.UpgradeStatus.PENDING)
|
|
with patch.object(view, 'Session', return_value=self.session):
|
|
with patch.object(self.config, 'usedb', new=True):
|
|
view.execute_instance(upgrade, user)
|
|
self.assertIsNotNone(upgrade.executed)
|
|
self.assertIs(upgrade.executed_by, user)
|
|
self.assertEqual(upgrade.status, enum.UpgradeStatus.FAILURE)
|
|
with open(view.get_upgrade_filepath(upgrade, 'stdout.log')) as f:
|
|
self.assertEqual(f.read(), '')
|
|
with open(view.get_upgrade_filepath(upgrade, 'stderr.log')) as f:
|
|
self.assertEqual(f.read(), 'hello from bad.py')
|
|
|
|
def test_execute_progress(self):
|
|
model = self.app.model
|
|
enum = self.app.enum
|
|
view = self.make_view()
|
|
|
|
user = model.User(username='barney')
|
|
self.session.add(user)
|
|
upgrade = model.Upgrade(description='test', created_by=user,
|
|
status=enum.UpgradeStatus.PENDING)
|
|
self.session.add(upgrade)
|
|
self.session.commit()
|
|
|
|
stdout = self.write_file('stdout.log', 'hello 001\n')
|
|
|
|
self.request.matchdict = {'uuid': upgrade.uuid}
|
|
with patch.multiple(mod.UpgradeView,
|
|
Session=MagicMock(return_value=self.session),
|
|
get_upgrade_filepath=MagicMock(return_value=stdout)):
|
|
|
|
# nb. this is used to identify progress tracker
|
|
self.request.session.id = 'mockid#1'
|
|
|
|
# first call should get the full contents
|
|
context = view.execute_progress()
|
|
self.assertFalse(context.get('complete'))
|
|
self.assertFalse(context.get('error'))
|
|
# nb. newline is converted to <br>
|
|
self.assertEqual(context['stdout'], 'hello 001<br />')
|
|
|
|
# next call should get any new contents
|
|
with open(stdout, 'a') as f:
|
|
f.write('hello 002\n')
|
|
context = view.execute_progress()
|
|
self.assertFalse(context.get('complete'))
|
|
self.assertFalse(context.get('error'))
|
|
self.assertEqual(context['stdout'], 'hello 002<br />')
|
|
|
|
# nb. switch to a different progress tracker
|
|
self.request.session.id = 'mockid#2'
|
|
|
|
# first call should get the full contents
|
|
context = view.execute_progress()
|
|
self.assertFalse(context.get('complete'))
|
|
self.assertFalse(context.get('error'))
|
|
self.assertEqual(context['stdout'], 'hello 001<br />hello 002<br />')
|
|
|
|
# mark progress complete
|
|
session = get_progress_session(self.request, 'upgrades.execute')
|
|
session.load()
|
|
session['complete'] = True
|
|
session['success_msg'] = 'yay!'
|
|
session.save()
|
|
|
|
# next call should reflect that
|
|
self.assertEqual(self.request.session.pop_flash(), [])
|
|
context = view.execute_progress()
|
|
self.assertTrue(context.get('complete'))
|
|
self.assertFalse(context.get('error'))
|
|
# nb. this is missing b/c we already got all contents
|
|
self.assertNotIn('stdout', context)
|
|
self.assertEqual(self.request.session.pop_flash(), ['yay!'])
|
|
|
|
# nb. switch to a different progress tracker
|
|
self.request.session.id = 'mockid#3'
|
|
|
|
# first call should get the full contents
|
|
context = view.execute_progress()
|
|
self.assertFalse(context.get('complete'))
|
|
self.assertFalse(context.get('error'))
|
|
self.assertEqual(context['stdout'], 'hello 001<br />hello 002<br />')
|
|
|
|
# mark progress error
|
|
session = get_progress_session(self.request, 'upgrades.execute')
|
|
session.load()
|
|
session['error'] = True
|
|
session['error_msg'] = 'omg!'
|
|
session.save()
|
|
|
|
# next call should reflect that
|
|
self.assertEqual(self.request.session.pop_flash('error'), [])
|
|
context = view.execute_progress()
|
|
self.assertFalse(context.get('complete'))
|
|
self.assertTrue(context.get('error'))
|
|
# nb. this is missing b/c we already got all contents
|
|
self.assertNotIn('stdout', context)
|
|
self.assertEqual(self.request.session.pop_flash('error'), ['omg!'])
|
|
|
|
def test_configure_get_simple_settings(self):
|
|
# sanity/coverage check
|
|
view = self.make_view()
|
|
simple = view.configure_get_simple_settings()
|