# -*- coding: utf-8; -*- import os import sys from unittest.mock import patch, MagicMock import pytest from mako.lookup import TemplateLookup from wuttjamaican import install as mod from wuttjamaican.testing import ConfigTestCase class TestInstallHandler(ConfigTestCase): def make_handler(self, **kwargs): return mod.InstallHandler(self.config, **kwargs) def test_constructor(self): handler = self.make_handler() self.assertEqual(handler.pkg_name, 'poser') self.assertEqual(handler.app_title, 'poser') self.assertEqual(handler.pypi_name, 'poser') self.assertEqual(handler.egg_name, 'poser') def test_run(self): handler = self.make_handler() with patch.object(handler, 'show_welcome') as show_welcome: with patch.object(handler, 'sanity_check') as sanity_check: with patch.object(handler, 'do_install_steps') as do_install_steps: handler.run() show_welcome.assert_called_once_with() sanity_check.assert_called_once_with() do_install_steps.assert_called_once_with() def test_show_welcome(self): handler = self.make_handler() with patch.object(mod, 'sys') as sys: with patch.object(handler, 'rprint') as rprint: with patch.object(handler, 'prompt_bool') as prompt_bool: # user continues prompt_bool.return_value = True handler.show_welcome() self.assertFalse(sys.exit.called) # user aborts prompt_bool.return_value = False handler.show_welcome() sys.exit.assert_called_once_with(1) def test_sanity_check(self): handler = self.make_handler() with patch.object(mod, 'sys') as sys: with patch.object(mod, 'os') as os: with patch.object(handler, 'rprint') as rprint: # pretend appdir does not exist os.path.exists.return_value = False handler.sanity_check() self.assertFalse(sys.exit.called) # pretend appdir does exist os.path.exists.return_value = True handler.sanity_check() sys.exit.assert_called_once_with(2) def test_do_install_steps(self): handler = self.make_handler() handler.templates = TemplateLookup(directories=[ self.app.resource_path('wuttjamaican:templates/install'), ]) dbinfo = { 'dburl': f'sqlite:///{self.tempdir}/poser.sqlite', } orig_import = __import__ mock_prompt = MagicMock() def mock_import(name, globals=None, locals=None, fromlist=(), level=0): if name == 'prompt_toolkit': if fromlist == ('prompt',): return MagicMock(prompt=mock_prompt) return orig_import(name, globals, locals, fromlist, level) with patch('builtins.__import__', side_effect=mock_import): with patch.object(handler, 'get_dbinfo', return_value=dbinfo): with patch.object(handler, 'install_db_schema') as install_db_schema: # nb. just for sanity/coverage install_db_schema.return_value = True self.assertFalse(hasattr(handler, 'schema_installed')) handler.do_install_steps() self.assertTrue(handler.schema_installed) install_db_schema.assert_called_once_with(dbinfo['dburl']) def test_get_dbinfo(self): try: import sqlalchemy except ImportError: pytest.skip("test is not relevant without sqlalchemy") handler = self.make_handler() def prompt_generic(info, default=None, is_password=False): if info in ('db name', 'db user'): return 'poser' if is_password: return 'seekrit' return default with patch.object(mod, 'sys') as sys: with patch.object(handler, 'prompt_generic', side_effect=prompt_generic): with patch.object(handler, 'test_db_connection') as test_db_connection: with patch.object(handler, 'rprint') as rprint: # bad dbinfo test_db_connection.return_value = "bad dbinfo" sys.exit.side_effect = RuntimeError self.assertRaises(RuntimeError, handler.get_dbinfo) sys.exit.assert_called_once_with(1) # good dbinfo sys.exit.reset_mock() test_db_connection.return_value = None dbinfo = handler.get_dbinfo() self.assertFalse(sys.exit.called) rprint.assert_called_with("[bold green]good[/bold green]") self.assertEqual(str(dbinfo['dburl']), 'postgresql+psycopg2://poser:seekrit@localhost:5432/poser') def test_make_db_url(self): try: import sqlalchemy except ImportError: pytest.skip("test is not relevant without sqlalchemy") handler = self.make_handler() url = handler.make_db_url('postgresql', 'localhost', '5432', 'poser', 'poser', 'seekrit') self.assertEqual(str(url), 'postgresql+psycopg2://poser:seekrit@localhost:5432/poser') url = handler.make_db_url('mysql', 'localhost', '3306', 'poser', 'poser', 'seekrit') self.assertEqual(str(url), 'mysql+mysqlconnector://poser:seekrit@localhost:3306/poser') def test_test_db_connection(self): try: import sqlalchemy as sa except ImportError: pytest.skip("test is not relevant without sqlalchemy") handler = self.make_handler() # db does not exist result = handler.test_db_connection('sqlite:///bad/url/should/not/exist') self.assertIn('unable to open database file', result) # db is setup url = f'sqlite:///{self.tempdir}/db.sqlite' engine = sa.create_engine(url) with engine.begin() as cxn: cxn.execute(sa.text("create table whatever (id int primary key);")) self.assertIsNone(handler.test_db_connection(url)) def test_make_template_context(self): handler = self.make_handler() dbinfo = {'dburl': 'sqlite:///poser.sqlite'} context = handler.make_template_context(dbinfo) self.assertEqual(context['envdir'], sys.prefix) self.assertEqual(context['pkg_name'], 'poser') self.assertEqual(context['app_title'], 'poser') self.assertEqual(context['pypi_name'], 'poser') self.assertEqual(context['egg_name'], 'poser') self.assertEqual(context['appdir'], os.path.join(sys.prefix, 'app')) self.assertEqual(context['db_url'], 'sqlite:///poser.sqlite') def test_make_appdir(self): handler = self.make_handler() handler.templates = TemplateLookup(directories=[ self.app.resource_path('wuttjamaican:templates/install'), ]) dbinfo = {'dburl': 'sqlite:///poser.sqlite'} context = handler.make_template_context(dbinfo) handler.make_appdir(context, appdir=self.tempdir) wutta_conf = os.path.join(self.tempdir, 'wutta.conf') with open(wutta_conf, 'rt') as f: self.assertIn('default.url = sqlite:///poser.sqlite', f.read()) def test_install_db_schema(self): try: import sqlalchemy as sa except ImportError: pytest.skip("test is not relevant without sqlalchemy") handler = self.make_handler() db_url = f'sqlite:///{self.tempdir}/poser.sqlite' wutta_conf = self.write_file('wutta.conf', f""" [wutta.db] default.url = {db_url} """) # convert to proper URL object db_url = sa.create_engine(db_url).url with patch.object(mod, 'subprocess') as subprocess: # user declines offer to install schema with patch.object(handler, 'prompt_bool', return_value=False): self.assertFalse(handler.install_db_schema(db_url, appdir=self.tempdir)) # user agrees to install schema with patch.object(handler, 'prompt_bool', return_value=True): self.assertTrue(handler.install_db_schema(db_url, appdir=self.tempdir)) subprocess.check_call.assert_called_once_with([ os.path.join(sys.prefix, 'bin', 'alembic'), '-c', wutta_conf, 'upgrade', 'heads']) def test_show_goodbye(self): handler = self.make_handler() with patch.object(handler, 'rprint') as rprint: handler.schema_installed = True handler.show_goodbye() rprint.assert_any_call("\n\t[bold green]initial setup is complete![/bold green]") rprint.assert_any_call("\t[blue]bin/pserve file+ini:app/web.conf[/blue]") def test_require_prompt_toolkit_installed(self): # nb. this assumes we *do* have prompt_toolkit installed handler = self.make_handler() with patch.object(mod, 'subprocess') as subprocess: handler.require_prompt_toolkit(answer='Y') self.assertFalse(subprocess.check_call.called) def test_require_prompt_toolkit_missing(self): handler = self.make_handler() orig_import = __import__ stuff = {'attempts': 0} def mock_import(name, globals=None, locals=None, fromlist=(), level=0): if name == 'prompt_toolkit': # nb. pretend this is not installed raise ImportError return orig_import(name, globals, locals, fromlist, level) # prompt_toolkit not installed, and user declines offer to install with patch('builtins.__import__', side_effect=mock_import): with patch.object(mod, 'subprocess') as subprocess: with patch.object(mod, 'sys') as sys: sys.exit.side_effect = RuntimeError self.assertRaises(RuntimeError, handler.require_prompt_toolkit, answer='N') self.assertFalse(subprocess.check_call.called) sys.stderr.write.assert_called_once_with("prompt_toolkit is required; aborting\n") sys.exit.assert_called_once_with(1) def test_require_prompt_toolkit_missing_then_installed(self): handler = self.make_handler() orig_import = __import__ stuff = {'attempts': 0} def mock_import(name, globals=None, locals=None, fromlist=(), level=0): if name == 'prompt_toolkit': stuff['attempts'] += 1 if stuff['attempts'] == 1: # nb. pretend this is not installed raise ImportError return orig_import('prompt_toolkit') return orig_import(name, globals, locals, fromlist, level) # prompt_toolkit not installed, and user declines offer to install with patch('builtins.__import__', side_effect=mock_import): with patch.object(mod, 'subprocess') as subprocess: with patch.object(mod, 'sys') as sys: sys.executable = 'python' handler.require_prompt_toolkit(answer='Y') subprocess.check_call.assert_called_once_with(['python', '-m', 'pip', 'install', 'prompt_toolkit']) self.assertFalse(sys.exit.called) self.assertEqual(stuff['attempts'], 2) def test_prompt_generic(self): handler = self.make_handler() style = handler.get_prompt_style() orig_import = __import__ mock_prompt = MagicMock() def mock_import(name, globals=None, locals=None, fromlist=(), level=0): if name == 'prompt_toolkit': if fromlist == ('prompt',): return MagicMock(prompt=mock_prompt) return orig_import(name, globals, locals, fromlist, level) with patch('builtins.__import__', side_effect=mock_import): with patch.object(handler, 'get_prompt_style', return_value=style): with patch.object(handler, 'rprint') as rprint: # no input or default value mock_prompt.return_value = '' result = handler.prompt_generic('foo') self.assertIsNone(result) mock_prompt.assert_called_once_with([('', '\n'), ('class:bold', 'foo'), ('', ': ')], style=style, is_password=False) # fallback to default value mock_prompt.reset_mock() mock_prompt.return_value = '' result = handler.prompt_generic('foo', default='baz') self.assertEqual(result, 'baz') mock_prompt.assert_called_once_with([('', '\n'), ('class:bold', 'foo'), ('', ' [baz]: ')], style=style, is_password=False) # text input value mock_prompt.reset_mock() mock_prompt.return_value = 'bar' result = handler.prompt_generic('foo') self.assertEqual(result, 'bar') mock_prompt.assert_called_once_with([('', '\n'), ('class:bold', 'foo'), ('', ': ')], style=style, is_password=False) # bool value (no default; true input) mock_prompt.reset_mock() mock_prompt.return_value = 'Y' result = handler.prompt_generic('foo', is_bool=True) self.assertTrue(result) mock_prompt.assert_called_once_with([('', '\n'), ('class:bold', 'foo'), ('', ': ')], style=style, is_password=False) # bool value (no default; false input) mock_prompt.reset_mock() mock_prompt.return_value = 'N' result = handler.prompt_generic('foo', is_bool=True) self.assertFalse(result) mock_prompt.assert_called_once_with([('', '\n'), ('class:bold', 'foo'), ('', ': ')], style=style, is_password=False) # bool value (default; no input) mock_prompt.reset_mock() mock_prompt.return_value = '' result = handler.prompt_generic('foo', is_bool=True, default=True) self.assertTrue(result) mock_prompt.assert_called_once_with([('', '\n'), ('class:bold', 'foo'), ('', ' [Y]: ')], style=style, is_password=False) # bool value (bad input) mock_prompt.reset_mock() counter = {'attempts': 0} def omg(*args, **kwargs): counter['attempts'] += 1 if counter['attempts'] == 1: # nb. bad input first time we ask return 'doesnotmakesense' # nb. but good input after that return 'N' mock_prompt.side_effect = omg result = handler.prompt_generic('foo', is_bool=True) self.assertFalse(result) # nb. user was prompted twice self.assertEqual(mock_prompt.call_count, 2) # Ctrl+C mock_prompt.reset_mock() mock_prompt.side_effect = KeyboardInterrupt with patch.object(mod, 'sys') as sys: sys.exit.side_effect = RuntimeError self.assertRaises(RuntimeError, handler.prompt_generic, 'foo') sys.exit.assert_called_once_with(1) # Ctrl+D mock_prompt.reset_mock() mock_prompt.side_effect = EOFError with patch.object(mod, 'sys') as sys: sys.exit.side_effect = RuntimeError self.assertRaises(RuntimeError, handler.prompt_generic, 'foo') sys.exit.assert_called_once_with(1) # missing required value mock_prompt.reset_mock() counter = {'attempts': 0} def omg(*args, **kwargs): counter['attempts'] += 1 if counter['attempts'] == 1: # nb. no input first time we ask return '' # nb. but good input after that return 'bar' mock_prompt.side_effect = omg result = handler.prompt_generic('foo', required=True) self.assertEqual(result, 'bar') # nb. user was prompted twice self.assertEqual(mock_prompt.call_count, 2) def test_prompt_bool(self): handler = self.make_handler() orig_import = __import__ mock_prompt = MagicMock() def mock_import(name, globals=None, locals=None, fromlist=(), level=0): if name == 'prompt_toolkit': if fromlist == ('prompt',): return MagicMock(prompt=mock_prompt) return orig_import(name, globals, locals, fromlist, level) with patch('builtins.__import__', side_effect=mock_import): with patch.object(handler, 'rprint') as rprint: # no default; true input mock_prompt.reset_mock() mock_prompt.return_value = 'Y' result = handler.prompt_bool('foo') self.assertTrue(result) mock_prompt.assert_called_once() # no default; false input mock_prompt.reset_mock() mock_prompt.return_value = 'N' result = handler.prompt_bool('foo') self.assertFalse(result) mock_prompt.assert_called_once() # default; no input mock_prompt.reset_mock() mock_prompt.return_value = '' result = handler.prompt_bool('foo', default=True) self.assertTrue(result) mock_prompt.assert_called_once() # bad input mock_prompt.reset_mock() counter = {'attempts': 0} def omg(*args, **kwargs): counter['attempts'] += 1 if counter['attempts'] == 1: # nb. bad input first time we ask return 'doesnotmakesense' # nb. but good input after that return 'N' mock_prompt.side_effect = omg result = handler.prompt_bool('foo') self.assertFalse(result) # nb. user was prompted twice self.assertEqual(mock_prompt.call_count, 2)