
hoping this does not break things terribly, but it needs to be done regardless so will just have to pick up pieces if so
451 lines
20 KiB
Python
451 lines
20 KiB
Python
# -*- coding: utf-8; -*-
|
|
|
|
import os
|
|
import sys
|
|
from unittest.mock import patch, MagicMock
|
|
|
|
import pytest
|
|
from mako.lookup import TemplateLookup
|
|
|
|
from wuttjamaican import install as mod
|
|
from wuttjamaican.testing import ConfigTestCase
|
|
|
|
|
|
class TestInstallHandler(ConfigTestCase):
|
|
|
|
def make_handler(self, **kwargs):
|
|
return mod.InstallHandler(self.config, **kwargs)
|
|
|
|
def test_constructor(self):
|
|
handler = self.make_handler()
|
|
self.assertEqual(handler.pkg_name, 'poser')
|
|
self.assertEqual(handler.app_title, 'poser')
|
|
self.assertEqual(handler.pypi_name, 'poser')
|
|
self.assertEqual(handler.egg_name, 'poser')
|
|
|
|
def test_run(self):
|
|
handler = self.make_handler()
|
|
with patch.object(handler, 'show_welcome') as show_welcome:
|
|
with patch.object(handler, 'sanity_check') as sanity_check:
|
|
with patch.object(handler, 'do_install_steps') as do_install_steps:
|
|
handler.run()
|
|
show_welcome.assert_called_once_with()
|
|
sanity_check.assert_called_once_with()
|
|
do_install_steps.assert_called_once_with()
|
|
|
|
def test_show_welcome(self):
|
|
handler = self.make_handler()
|
|
with patch.object(mod, 'sys') as sys:
|
|
with patch.object(handler, 'rprint') as rprint:
|
|
with patch.object(handler, 'prompt_bool') as prompt_bool:
|
|
|
|
# user continues
|
|
prompt_bool.return_value = True
|
|
handler.show_welcome()
|
|
self.assertFalse(sys.exit.called)
|
|
|
|
# user aborts
|
|
prompt_bool.return_value = False
|
|
handler.show_welcome()
|
|
sys.exit.assert_called_once_with(1)
|
|
|
|
def test_sanity_check(self):
|
|
handler = self.make_handler()
|
|
with patch.object(mod, 'sys') as sys:
|
|
with patch.object(mod, 'os') as os:
|
|
with patch.object(handler, 'rprint') as rprint:
|
|
|
|
# pretend appdir does not exist
|
|
os.path.exists.return_value = False
|
|
handler.sanity_check()
|
|
self.assertFalse(sys.exit.called)
|
|
|
|
# pretend appdir does exist
|
|
os.path.exists.return_value = True
|
|
handler.sanity_check()
|
|
sys.exit.assert_called_once_with(2)
|
|
|
|
def test_do_install_steps(self):
|
|
handler = self.make_handler()
|
|
handler.templates = TemplateLookup(directories=[
|
|
self.app.resource_path('wuttjamaican:templates/install'),
|
|
])
|
|
dbinfo = {
|
|
'dburl': f'sqlite:///{self.tempdir}/poser.sqlite',
|
|
}
|
|
|
|
with patch.object(handler, 'get_dbinfo', return_value=dbinfo):
|
|
with patch.object(handler, 'make_appdir') as make_appdir:
|
|
with patch.object(handler, 'install_db_schema') as install_db_schema:
|
|
|
|
# nb. just for sanity/coverage
|
|
install_db_schema.return_value = True
|
|
self.assertFalse(hasattr(handler, 'schema_installed'))
|
|
handler.do_install_steps()
|
|
self.assertTrue(make_appdir.called)
|
|
self.assertTrue(handler.schema_installed)
|
|
install_db_schema.assert_called_once_with(dbinfo['dburl'])
|
|
|
|
def test_get_dbinfo(self):
|
|
try:
|
|
import sqlalchemy
|
|
except ImportError:
|
|
pytest.skip("test is not relevant without sqlalchemy")
|
|
|
|
from wuttjamaican.db.util import SA2
|
|
|
|
handler = self.make_handler()
|
|
|
|
def prompt_generic(info, default=None, is_password=False):
|
|
if info in ('db name', 'db user'):
|
|
return 'poser'
|
|
if is_password:
|
|
return 'seekrit'
|
|
return default
|
|
|
|
with patch.object(mod, 'sys') as sys:
|
|
with patch.object(handler, 'prompt_generic', side_effect=prompt_generic):
|
|
with patch.object(handler, 'test_db_connection') as test_db_connection:
|
|
with patch.object(handler, 'rprint') as rprint:
|
|
|
|
# bad dbinfo
|
|
test_db_connection.return_value = "bad dbinfo"
|
|
sys.exit.side_effect = RuntimeError
|
|
self.assertRaises(RuntimeError, handler.get_dbinfo)
|
|
sys.exit.assert_called_once_with(1)
|
|
|
|
seekrit = '***' if SA2 else 'seekrit'
|
|
|
|
# good dbinfo
|
|
sys.exit.reset_mock()
|
|
test_db_connection.return_value = None
|
|
dbinfo = handler.get_dbinfo()
|
|
self.assertFalse(sys.exit.called)
|
|
rprint.assert_called_with("[bold green]good[/bold green]")
|
|
self.assertEqual(str(dbinfo['dburl']),
|
|
f'postgresql+psycopg2://poser:{seekrit}@localhost:5432/poser')
|
|
|
|
def test_make_db_url(self):
|
|
try:
|
|
import sqlalchemy
|
|
except ImportError:
|
|
pytest.skip("test is not relevant without sqlalchemy")
|
|
|
|
from wuttjamaican.db.util import SA2
|
|
|
|
handler = self.make_handler()
|
|
seekrit = '***' if SA2 else 'seekrit'
|
|
|
|
url = handler.make_db_url('postgresql', 'localhost', '5432', 'poser', 'poser', 'seekrit')
|
|
self.assertEqual(str(url), f'postgresql+psycopg2://poser:{seekrit}@localhost:5432/poser')
|
|
|
|
url = handler.make_db_url('mysql', 'localhost', '3306', 'poser', 'poser', 'seekrit')
|
|
self.assertEqual(str(url), f'mysql+mysqlconnector://poser:{seekrit}@localhost:3306/poser')
|
|
|
|
def test_test_db_connection(self):
|
|
try:
|
|
import sqlalchemy as sa
|
|
except ImportError:
|
|
pytest.skip("test is not relevant without sqlalchemy")
|
|
|
|
handler = self.make_handler()
|
|
|
|
# db does not exist
|
|
result = handler.test_db_connection('sqlite:///bad/url/should/not/exist')
|
|
self.assertIn('unable to open database file', result)
|
|
|
|
# db is setup
|
|
url = f'sqlite:///{self.tempdir}/db.sqlite'
|
|
engine = sa.create_engine(url)
|
|
with engine.begin() as cxn:
|
|
cxn.execute(sa.text("create table whatever (id int primary key);"))
|
|
self.assertIsNone(handler.test_db_connection(url))
|
|
|
|
def test_make_template_context(self):
|
|
handler = self.make_handler()
|
|
dbinfo = {'dburl': 'sqlite:///poser.sqlite'}
|
|
context = handler.make_template_context(dbinfo)
|
|
self.assertEqual(context['envdir'], sys.prefix)
|
|
self.assertEqual(context['pkg_name'], 'poser')
|
|
self.assertEqual(context['app_title'], 'poser')
|
|
self.assertEqual(context['pypi_name'], 'poser')
|
|
self.assertEqual(context['egg_name'], 'poser')
|
|
self.assertEqual(context['appdir'], os.path.join(sys.prefix, 'app'))
|
|
self.assertEqual(context['db_url'], 'sqlite:///poser.sqlite')
|
|
|
|
def test_make_appdir(self):
|
|
handler = self.make_handler()
|
|
handler.templates = TemplateLookup(directories=[
|
|
self.app.resource_path('wuttjamaican:templates/install'),
|
|
])
|
|
dbinfo = {'dburl': 'sqlite:///poser.sqlite'}
|
|
context = handler.make_template_context(dbinfo)
|
|
handler.make_appdir(context, appdir=self.tempdir)
|
|
wutta_conf = os.path.join(self.tempdir, 'wutta.conf')
|
|
with open(wutta_conf, 'rt') as f:
|
|
self.assertIn('default.url = sqlite:///poser.sqlite', f.read())
|
|
|
|
def test_install_db_schema(self):
|
|
try:
|
|
import sqlalchemy as sa
|
|
except ImportError:
|
|
pytest.skip("test is not relevant without sqlalchemy")
|
|
|
|
handler = self.make_handler()
|
|
db_url = f'sqlite:///{self.tempdir}/poser.sqlite'
|
|
|
|
wutta_conf = self.write_file('wutta.conf', f"""
|
|
[wutta.db]
|
|
default.url = {db_url}
|
|
""")
|
|
|
|
# convert to proper URL object
|
|
db_url = sa.create_engine(db_url).url
|
|
|
|
with patch.object(mod, 'subprocess') as subprocess:
|
|
|
|
# user declines offer to install schema
|
|
with patch.object(handler, 'prompt_bool', return_value=False):
|
|
self.assertFalse(handler.install_db_schema(db_url, appdir=self.tempdir))
|
|
|
|
# user agrees to install schema
|
|
with patch.object(handler, 'prompt_bool', return_value=True):
|
|
self.assertTrue(handler.install_db_schema(db_url, appdir=self.tempdir))
|
|
subprocess.check_call.assert_called_once_with([
|
|
os.path.join(sys.prefix, 'bin', 'alembic'),
|
|
'-c', wutta_conf, 'upgrade', 'heads'])
|
|
|
|
def test_show_goodbye(self):
|
|
handler = self.make_handler()
|
|
with patch.object(handler, 'rprint') as rprint:
|
|
handler.schema_installed = True
|
|
handler.show_goodbye()
|
|
rprint.assert_any_call("\n\t[bold green]initial setup is complete![/bold green]")
|
|
rprint.assert_any_call("\t[blue]bin/wutta -c app/web.conf webapp -r[/blue]")
|
|
|
|
def test_require_prompt_toolkit_installed(self):
|
|
# nb. this assumes we *do* have prompt_toolkit installed
|
|
handler = self.make_handler()
|
|
with patch.object(mod, 'subprocess') as subprocess:
|
|
handler.require_prompt_toolkit(answer='Y')
|
|
self.assertFalse(subprocess.check_call.called)
|
|
|
|
def test_require_prompt_toolkit_missing(self):
|
|
handler = self.make_handler()
|
|
orig_import = __import__
|
|
stuff = {'attempts': 0}
|
|
|
|
def mock_import(name, globals=None, locals=None, fromlist=(), level=0):
|
|
if name == 'prompt_toolkit':
|
|
# nb. pretend this is not installed
|
|
raise ImportError
|
|
return orig_import(name, globals, locals, fromlist, level)
|
|
|
|
# prompt_toolkit not installed, and user declines offer to install
|
|
with patch('builtins.__import__', side_effect=mock_import):
|
|
with patch.object(mod, 'subprocess') as subprocess:
|
|
with patch.object(mod, 'sys') as sys:
|
|
sys.exit.side_effect = RuntimeError
|
|
self.assertRaises(RuntimeError, handler.require_prompt_toolkit, answer='N')
|
|
self.assertFalse(subprocess.check_call.called)
|
|
sys.stderr.write.assert_called_once_with("prompt_toolkit is required; aborting\n")
|
|
sys.exit.assert_called_once_with(1)
|
|
|
|
def test_require_prompt_toolkit_missing_then_installed(self):
|
|
handler = self.make_handler()
|
|
orig_import = __import__
|
|
stuff = {'attempts': 0}
|
|
|
|
def mock_import(name, globals=None, locals=None, fromlist=(), level=0):
|
|
if name == 'prompt_toolkit':
|
|
stuff['attempts'] += 1
|
|
if stuff['attempts'] == 1:
|
|
# nb. pretend this is not installed
|
|
raise ImportError
|
|
return orig_import('prompt_toolkit')
|
|
return orig_import(name, globals, locals, fromlist, level)
|
|
|
|
# prompt_toolkit not installed, and user declines offer to install
|
|
with patch('builtins.__import__', side_effect=mock_import):
|
|
with patch.object(mod, 'subprocess') as subprocess:
|
|
with patch.object(mod, 'sys') as sys:
|
|
sys.executable = 'python'
|
|
handler.require_prompt_toolkit(answer='Y')
|
|
subprocess.check_call.assert_called_once_with(['python', '-m', 'pip',
|
|
'install', 'prompt_toolkit'])
|
|
self.assertFalse(sys.exit.called)
|
|
self.assertEqual(stuff['attempts'], 2)
|
|
|
|
def test_prompt_generic(self):
|
|
handler = self.make_handler()
|
|
style = handler.get_prompt_style()
|
|
orig_import = __import__
|
|
mock_prompt = MagicMock()
|
|
|
|
def mock_import(name, globals=None, locals=None, fromlist=(), level=0):
|
|
if name == 'prompt_toolkit':
|
|
if fromlist == ('prompt',):
|
|
return MagicMock(prompt=mock_prompt)
|
|
return orig_import(name, globals, locals, fromlist, level)
|
|
|
|
with patch('builtins.__import__', side_effect=mock_import):
|
|
with patch.object(handler, 'get_prompt_style', return_value=style):
|
|
with patch.object(handler, 'rprint') as rprint:
|
|
|
|
# no input or default value
|
|
mock_prompt.return_value = ''
|
|
result = handler.prompt_generic('foo')
|
|
self.assertIsNone(result)
|
|
mock_prompt.assert_called_once_with([('', '\n'),
|
|
('class:bold', 'foo'),
|
|
('', ': ')],
|
|
style=style, is_password=False)
|
|
|
|
# fallback to default value
|
|
mock_prompt.reset_mock()
|
|
mock_prompt.return_value = ''
|
|
result = handler.prompt_generic('foo', default='baz')
|
|
self.assertEqual(result, 'baz')
|
|
mock_prompt.assert_called_once_with([('', '\n'),
|
|
('class:bold', 'foo'),
|
|
('', ' [baz]: ')],
|
|
style=style, is_password=False)
|
|
|
|
# text input value
|
|
mock_prompt.reset_mock()
|
|
mock_prompt.return_value = 'bar'
|
|
result = handler.prompt_generic('foo')
|
|
self.assertEqual(result, 'bar')
|
|
mock_prompt.assert_called_once_with([('', '\n'),
|
|
('class:bold', 'foo'),
|
|
('', ': ')],
|
|
style=style, is_password=False)
|
|
|
|
# bool value (no default; true input)
|
|
mock_prompt.reset_mock()
|
|
mock_prompt.return_value = 'Y'
|
|
result = handler.prompt_generic('foo', is_bool=True)
|
|
self.assertTrue(result)
|
|
mock_prompt.assert_called_once_with([('', '\n'),
|
|
('class:bold', 'foo'),
|
|
('', ': ')],
|
|
style=style, is_password=False)
|
|
|
|
# bool value (no default; false input)
|
|
mock_prompt.reset_mock()
|
|
mock_prompt.return_value = 'N'
|
|
result = handler.prompt_generic('foo', is_bool=True)
|
|
self.assertFalse(result)
|
|
mock_prompt.assert_called_once_with([('', '\n'),
|
|
('class:bold', 'foo'),
|
|
('', ': ')],
|
|
style=style, is_password=False)
|
|
|
|
# bool value (default; no input)
|
|
mock_prompt.reset_mock()
|
|
mock_prompt.return_value = ''
|
|
result = handler.prompt_generic('foo', is_bool=True, default=True)
|
|
self.assertTrue(result)
|
|
mock_prompt.assert_called_once_with([('', '\n'),
|
|
('class:bold', 'foo'),
|
|
('', ' [Y]: ')],
|
|
style=style, is_password=False)
|
|
|
|
# bool value (bad input)
|
|
mock_prompt.reset_mock()
|
|
counter = {'attempts': 0}
|
|
def omg(*args, **kwargs):
|
|
counter['attempts'] += 1
|
|
if counter['attempts'] == 1:
|
|
# nb. bad input first time we ask
|
|
return 'doesnotmakesense'
|
|
# nb. but good input after that
|
|
return 'N'
|
|
mock_prompt.side_effect = omg
|
|
result = handler.prompt_generic('foo', is_bool=True)
|
|
self.assertFalse(result)
|
|
# nb. user was prompted twice
|
|
self.assertEqual(mock_prompt.call_count, 2)
|
|
|
|
# Ctrl+C
|
|
mock_prompt.reset_mock()
|
|
mock_prompt.side_effect = KeyboardInterrupt
|
|
with patch.object(mod, 'sys') as sys:
|
|
sys.exit.side_effect = RuntimeError
|
|
self.assertRaises(RuntimeError, handler.prompt_generic, 'foo')
|
|
sys.exit.assert_called_once_with(1)
|
|
|
|
# Ctrl+D
|
|
mock_prompt.reset_mock()
|
|
mock_prompt.side_effect = EOFError
|
|
with patch.object(mod, 'sys') as sys:
|
|
sys.exit.side_effect = RuntimeError
|
|
self.assertRaises(RuntimeError, handler.prompt_generic, 'foo')
|
|
sys.exit.assert_called_once_with(1)
|
|
|
|
# missing required value
|
|
mock_prompt.reset_mock()
|
|
counter = {'attempts': 0}
|
|
def omg(*args, **kwargs):
|
|
counter['attempts'] += 1
|
|
if counter['attempts'] == 1:
|
|
# nb. no input first time we ask
|
|
return ''
|
|
# nb. but good input after that
|
|
return 'bar'
|
|
mock_prompt.side_effect = omg
|
|
result = handler.prompt_generic('foo', required=True)
|
|
self.assertEqual(result, 'bar')
|
|
# nb. user was prompted twice
|
|
self.assertEqual(mock_prompt.call_count, 2)
|
|
|
|
def test_prompt_bool(self):
|
|
handler = self.make_handler()
|
|
orig_import = __import__
|
|
mock_prompt = MagicMock()
|
|
|
|
def mock_import(name, globals=None, locals=None, fromlist=(), level=0):
|
|
if name == 'prompt_toolkit':
|
|
if fromlist == ('prompt',):
|
|
return MagicMock(prompt=mock_prompt)
|
|
return orig_import(name, globals, locals, fromlist, level)
|
|
|
|
with patch('builtins.__import__', side_effect=mock_import):
|
|
with patch.object(handler, 'rprint') as rprint:
|
|
|
|
# no default; true input
|
|
mock_prompt.reset_mock()
|
|
mock_prompt.return_value = 'Y'
|
|
result = handler.prompt_bool('foo')
|
|
self.assertTrue(result)
|
|
mock_prompt.assert_called_once()
|
|
|
|
# no default; false input
|
|
mock_prompt.reset_mock()
|
|
mock_prompt.return_value = 'N'
|
|
result = handler.prompt_bool('foo')
|
|
self.assertFalse(result)
|
|
mock_prompt.assert_called_once()
|
|
|
|
# default; no input
|
|
mock_prompt.reset_mock()
|
|
mock_prompt.return_value = ''
|
|
result = handler.prompt_bool('foo', default=True)
|
|
self.assertTrue(result)
|
|
mock_prompt.assert_called_once()
|
|
|
|
# bad input
|
|
mock_prompt.reset_mock()
|
|
counter = {'attempts': 0}
|
|
def omg(*args, **kwargs):
|
|
counter['attempts'] += 1
|
|
if counter['attempts'] == 1:
|
|
# nb. bad input first time we ask
|
|
return 'doesnotmakesense'
|
|
# nb. but good input after that
|
|
return 'N'
|
|
mock_prompt.side_effect = omg
|
|
result = handler.prompt_bool('foo')
|
|
self.assertFalse(result)
|
|
# nb. user was prompted twice
|
|
self.assertEqual(mock_prompt.call_count, 2)
|