diff --git a/docs/conf.py b/docs/conf.py index 3256549..bff92da 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -8,32 +8,32 @@ from importlib.metadata import version as get_version -project = 'WuttaMess' -copyright = '2024, Lance Edgar' -author = 'Lance Edgar' -release = get_version('WuttaMess') +project = "WuttaMess" +copyright = "2024, Lance Edgar" +author = "Lance Edgar" +release = get_version("WuttaMess") # -- General configuration --------------------------------------------------- # https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration extensions = [ - 'sphinx.ext.autodoc', - 'sphinx.ext.intersphinx', - 'sphinx.ext.viewcode', - 'sphinx.ext.todo', + "sphinx.ext.autodoc", + "sphinx.ext.intersphinx", + "sphinx.ext.viewcode", + "sphinx.ext.todo", ] -templates_path = ['_templates'] -exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store'] +templates_path = ["_templates"] +exclude_patterns = ["_build", "Thumbs.db", ".DS_Store"] intersphinx_mapping = { - 'fabsync': ('https://fabsync.ignorare.dev/', None), - 'invoke': ('https://docs.pyinvoke.org/en/stable/', None), + "fabsync": ("https://fabsync.ignorare.dev/", None), + "invoke": ("https://docs.pyinvoke.org/en/stable/", None), } # -- Options for HTML output ------------------------------------------------- # https://www.sphinx-doc.org/en/master/usage/configuration.html#options-for-html-output -html_theme = 'furo' -html_static_path = ['_static'] +html_theme = "furo" +html_static_path = ["_static"] diff --git a/src/wuttamess/_version.py b/src/wuttamess/_version.py index c3d9e4f..96c91a0 100644 --- a/src/wuttamess/_version.py +++ b/src/wuttamess/_version.py @@ -3,4 +3,4 @@ from importlib.metadata import version -__version__ = version('WuttaMess') +__version__ = version("WuttaMess") diff --git a/src/wuttamess/apt.py b/src/wuttamess/apt.py index e11dca2..df584a4 100644 --- a/src/wuttamess/apt.py +++ b/src/wuttamess/apt.py @@ -25,7 +25,7 @@ APT package management """ -def dist_upgrade(c, frontend='noninteractive'): +def dist_upgrade(c, frontend="noninteractive"): """ Run a full dist-upgrade for APT. Essentially this runs: @@ -46,9 +46,9 @@ def install(c, *packages, **kwargs): apt install PKG [PKG ...] """ - frontend = kwargs.pop('frontend', 'noninteractive') - packages = ' '.join(packages) - return c.run(f'DEBIAN_FRONTEND={frontend} apt-get --assume-yes install {packages}') + frontend = kwargs.pop("frontend", "noninteractive") + packages = " ".join(packages) + return c.run(f"DEBIAN_FRONTEND={frontend} apt-get --assume-yes install {packages}") def is_installed(c, package): @@ -61,7 +61,7 @@ def is_installed(c, package): :returns: ``True`` if package is installed, else ``False``. """ - return c.run(f'dpkg-query -s {package}', warn=True).ok + return c.run(f"dpkg-query -s {package}", warn=True).ok def update(c): @@ -72,10 +72,10 @@ def update(c): apt update """ - c.run('apt-get update') + c.run("apt-get update") -def upgrade(c, dist_upgrade=False, frontend='noninteractive'): +def upgrade(c, dist_upgrade=False, frontend="noninteractive"): """ Upgrade packages via APT. Essentially this runs: @@ -87,8 +87,8 @@ def upgrade(c, dist_upgrade=False, frontend='noninteractive'): apt dist-upgrade """ - options = '' - if frontend == 'noninteractive': + options = "" + if frontend == "noninteractive": options = '--option Dpkg::Options::="--force-confdef" --option Dpkg::Options::="--force-confold"' - upgrade = 'dist-upgrade' if dist_upgrade else 'upgrade' - c.run(f'DEBIAN_FRONTEND={frontend} apt-get --assume-yes {options} {upgrade}') + upgrade = "dist-upgrade" if dist_upgrade else "upgrade" + c.run(f"DEBIAN_FRONTEND={frontend} apt-get --assume-yes {options} {upgrade}") diff --git a/src/wuttamess/postfix.py b/src/wuttamess/postfix.py index e5e9730..d6ef4b9 100644 --- a/src/wuttamess/postfix.py +++ b/src/wuttamess/postfix.py @@ -36,32 +36,32 @@ def set_myhostname(c, hostname): """ Configure the ``myhostname`` setting with the given string. """ - set_config(c, 'myhostname', hostname) + set_config(c, "myhostname", hostname) def set_myorigin(c, origin): """ Configure the ``myorigin`` setting with the given string. """ - set_config(c, 'myorigin', origin) + set_config(c, "myorigin", origin) def set_mydestination(c, *destinations): """ Configure the ``mydestinations`` setting with the given strings. """ - set_config(c, 'mydestination', ', '.join(destinations)) + set_config(c, "mydestination", ", ".join(destinations)) def set_mynetworks(c, *networks): """ Configure the ``mynetworks`` setting with the given strings. """ - set_config(c, 'mynetworks', ' '.join(networks)) + set_config(c, "mynetworks", " ".join(networks)) def set_relayhost(c, relayhost): """ Configure the ``relayhost`` setting with the given string """ - set_config(c, 'relayhost', relayhost) + set_config(c, "relayhost", relayhost) diff --git a/src/wuttamess/postgres.py b/src/wuttamess/postgres.py index 1317768..8416a58 100644 --- a/src/wuttamess/postgres.py +++ b/src/wuttamess/postgres.py @@ -25,7 +25,7 @@ PostgreSQL DB utilities """ -def sql(c, sql, database='', port=None, **kwargs): +def sql(c, sql, database="", port=None, **kwargs): """ Execute some SQL as the ``postgres`` user. @@ -38,9 +38,12 @@ def sql(c, sql, database='', port=None, **kwargs): :param port: Optional port for PostgreSQL; default is 5432. """ - port = f' --port={port}' if port else '' - return c.sudo(f'psql{port} --tuples-only --no-align --command="{sql}" {database}', - user='postgres', **kwargs) + port = f" --port={port}" if port else "" + return c.sudo( + f'psql{port} --tuples-only --no-align --command="{sql}" {database}', + user="postgres", + **kwargs, + ) def user_exists(c, name, port=None): @@ -55,7 +58,9 @@ def user_exists(c, name, port=None): :returns: ``True`` if user exists, else ``False``. """ - user = sql(c, f"SELECT rolname FROM pg_roles WHERE rolname = '{name}'", port=port).stdout.strip() + user = sql( + c, f"SELECT rolname FROM pg_roles WHERE rolname = '{name}'", port=port + ).stdout.strip() return bool(user) @@ -77,9 +82,11 @@ def create_user(c, name, password=None, port=None, checkfirst=True): try to create user with no check. """ if not checkfirst or not user_exists(c, name, port=port): - portarg = f' --port={port}' if port else '' - c.sudo(f'createuser{portarg} --no-createrole --no-superuser {name}', - user='postgres') + portarg = f" --port={port}" if port else "" + c.sudo( + f"createuser{portarg} --no-createrole --no-superuser {name}", + user="postgres", + ) if password: set_user_password(c, name, password, port=port) @@ -96,7 +103,13 @@ def set_user_password(c, name, password, port=None): :param port: Optional port for PostgreSQL; default is 5432. """ - sql(c, f"ALTER USER \\\"{name}\\\" PASSWORD '{password}';", port=port, hide=True, echo=False) + sql( + c, + f"ALTER USER \\\"{name}\\\" PASSWORD '{password}';", + port=port, + hide=True, + echo=False, + ) def db_exists(c, name, port=None): @@ -111,7 +124,9 @@ def db_exists(c, name, port=None): :returns: ``True`` if database exists, else ``False``. """ - db = sql(c, f"SELECT datname FROM pg_database WHERE datname = '{name}'", port=port).stdout.strip() + db = sql( + c, f"SELECT datname FROM pg_database WHERE datname = '{name}'", port=port + ).stdout.strip() return db == name @@ -132,10 +147,9 @@ def create_db(c, name, owner=None, port=None, checkfirst=True): create DB with no check. """ if not checkfirst or not db_exists(c, name, port=port): - port = f' --port={port}' if port else '' - owner = f' --owner={owner}' if owner else '' - c.sudo(f'createdb{port}{owner} {name}', - user='postgres') + port = f" --port={port}" if port else "" + owner = f" --owner={owner}" if owner else "" + c.sudo(f"createdb{port}{owner} {name}", user="postgres") def drop_db(c, name, checkfirst=True): @@ -151,7 +165,7 @@ def drop_db(c, name, checkfirst=True): DB with no check. """ if not checkfirst or db_exists(c, name): - c.sudo(f'dropdb {name}', user='postgres') + c.sudo(f"dropdb {name}", user="postgres") def dump_db(c, name): @@ -170,15 +184,15 @@ def dump_db(c, name): filename and not the path, since the file is expected to exist in the connected user's home folder. """ - sql_name = f'{name}.sql' - gz_name = f'{sql_name}.gz' - tmp_name = f'/tmp/{gz_name}' + sql_name = f"{name}.sql" + gz_name = f"{sql_name}.gz" + tmp_name = f"/tmp/{gz_name}" # TODO: when pg_dump fails the command still succeeds! (would this work?) - #cmd = f'set -e && pg_dump {name} | gzip -c > {tmp_name}' - cmd = f'pg_dump {name} | gzip -c > {tmp_name}' + # cmd = f'set -e && pg_dump {name} | gzip -c > {tmp_name}' + cmd = f"pg_dump {name} | gzip -c > {tmp_name}" - c.sudo(cmd, user='postgres') + c.sudo(cmd, user="postgres") c.run(f"cp {tmp_name} {gz_name}") c.run(f"rm {tmp_name}") diff --git a/src/wuttamess/ssh.py b/src/wuttamess/ssh.py index 87a7540..89a2b64 100644 --- a/src/wuttamess/ssh.py +++ b/src/wuttamess/ssh.py @@ -46,11 +46,11 @@ def cache_host_key(c, host, port=None, user=None): :param user: User on the fabric target whose SSH key cache should be updated to include the given ``host``. """ - port = f'-p {port} ' if port else '' + port = f"-p {port} " if port else "" # first try to run a basic command over ssh - cmd = f'ssh {port}{host} whoami' - if user and user != 'root': + cmd = f"ssh {port}{host} whoami" + if user and user != "root": result = c.sudo(cmd, user=user, warn=True) else: result = c.run(cmd, warn=True) @@ -68,8 +68,8 @@ def cache_host_key(c, host, port=None, user=None): # okay then we now think that the ssh connection itself # was not made, which presumably means we *do* need to # cache the host key, so try that now - cmd = f'ssh -o StrictHostKeyChecking=no {port}{host} whoami' - if user and user != 'root': + cmd = f"ssh -o StrictHostKeyChecking=no {port}{host} whoami" + if user and user != "root": c.sudo(cmd, user=user, warn=True) else: c.run(cmd, warn=True) diff --git a/src/wuttamess/sync.py b/src/wuttamess/sync.py index 6147ca7..24b9165 100644 --- a/src/wuttamess/sync.py +++ b/src/wuttamess/sync.py @@ -29,7 +29,7 @@ See :doc:`/narr/usage` for a basic example. import fabsync -def make_root(path, dest='/'): +def make_root(path, dest="/"): """ Make and return a "root" object for use with future sync calls. @@ -91,9 +91,9 @@ def isync(c, root, selector=None, tags=None, echo=True, **kwargs): if not isinstance(selector, fabsync.ItemSelector): kw = {} if tags: - kw['tags'] = tags + kw["tags"] = tags selector = make_selector(selector, **kw) - kwargs['selector'] = selector + kwargs["selector"] = selector for result in fabsync.isync(c, root, **kwargs): if echo: @@ -111,5 +111,4 @@ def check_isync(c, root, selector=None, **kwargs): :returns: ``True`` if any sync result indicates a file was modified; otherwise ``False``. """ - return any([result.modified - for result in isync(c, root, selector, **kwargs)]) + return any([result.modified for result in isync(c, root, selector, **kwargs)]) diff --git a/src/wuttamess/util.py b/src/wuttamess/util.py index 2fde609..e201c3e 100644 --- a/src/wuttamess/util.py +++ b/src/wuttamess/util.py @@ -34,7 +34,7 @@ def exists(c, path): """ Returns ``True`` if given path exists on the host, otherwise ``False``. """ - return not c.run(f'test -e {path}', warn=True).failed + return not c.run(f"test -e {path}", warn=True).failed def get_home_path(c, user=None): @@ -49,8 +49,8 @@ def get_home_path(c, user=None): :returns: Home folder path as string. """ user = user or c.user - home = c.run(f'getent passwd {user} | cut -d: -f6').stdout.strip() - home = home.rstrip('/') + home = c.run(f"getent passwd {user} | cut -d: -f6").stdout.strip() + home = home.rstrip("/") return home @@ -96,6 +96,7 @@ def mako_renderer(c, env={}): sync.check_isync(c, root, 'etc/postfix', renderers=renderers) """ + def render(path: Path, vars: Mapping[str, Any], **kwargs) -> bytes: return Template(filename=str(path)).render(**env) @@ -113,7 +114,7 @@ def set_timezone(c, timezone): """ c.run(f"bash -c 'echo {timezone} > /etc/timezone'") - if is_symlink(c, '/etc/localtime'): - c.run(f'ln -sf /usr/share/zoneinfo/{timezone} /etc/localtime') + if is_symlink(c, "/etc/localtime"): + c.run(f"ln -sf /usr/share/zoneinfo/{timezone} /etc/localtime") else: - c.run(f'cp /usr/share/zoneinfo/{timezone} /etc/localtime') + c.run(f"cp /usr/share/zoneinfo/{timezone} /etc/localtime") diff --git a/src/wuttamess/wutta.py b/src/wuttamess/wutta.py index 0145542..3688e53 100644 --- a/src/wuttamess/wutta.py +++ b/src/wuttamess/wutta.py @@ -27,7 +27,7 @@ Utilities for Wutta Framework from wuttamess import postgres -def purge_email_settings(c, dbname, appname='wutta'): +def purge_email_settings(c, dbname, appname="wutta"): """ Purge production email settings for a database. @@ -47,11 +47,23 @@ def purge_email_settings(c, dbname, appname='wutta'): :param appname: The ``appname`` used to determine setting names. """ - postgres.sql(c, f"delete from setting where name like '{appname}.email.%.sender';", - database=dbname) - postgres.sql(c, f"delete from setting where name like '{appname}.email.%.to';", - database=dbname) - postgres.sql(c, f"delete from setting where name like '{appname}.email.%.cc';", - database=dbname) - postgres.sql(c, f"delete from setting where name like '{appname}.email.%.bcc';", - database=dbname) + postgres.sql( + c, + f"delete from setting where name like '{appname}.email.%.sender';", + database=dbname, + ) + postgres.sql( + c, + f"delete from setting where name like '{appname}.email.%.to';", + database=dbname, + ) + postgres.sql( + c, + f"delete from setting where name like '{appname}.email.%.cc';", + database=dbname, + ) + postgres.sql( + c, + f"delete from setting where name like '{appname}.email.%.bcc';", + database=dbname, + ) diff --git a/tasks.py b/tasks.py index facdf57..49daeb4 100644 --- a/tasks.py +++ b/tasks.py @@ -15,10 +15,10 @@ def release(c, skip_tests=False): Release a new version of WuttaMess """ if not skip_tests: - c.run('pytest') + c.run("pytest") - if os.path.exists('dist'): - shutil.rmtree('dist') + if os.path.exists("dist"): + shutil.rmtree("dist") - c.run('python -m build --sdist') - c.run('twine upload dist/*') + c.run("python -m build --sdist") + c.run("twine upload dist/*") diff --git a/tests/test_apt.py b/tests/test_apt.py index 50c80a9..367ce05 100644 --- a/tests/test_apt.py +++ b/tests/test_apt.py @@ -10,19 +10,23 @@ class TestDistUpgrade(TestCase): def test_basic(self): c = MagicMock() - with patch.object(mod, 'update') as update: - with patch.object(mod, 'upgrade') as upgrade: - mod.dist_upgrade(c, frontend='whatever') + with patch.object(mod, "update") as update: + with patch.object(mod, "upgrade") as upgrade: + mod.dist_upgrade(c, frontend="whatever") update.assert_called_once_with(c) - upgrade.assert_called_once_with(c, dist_upgrade=True, frontend='whatever') + upgrade.assert_called_once_with( + c, dist_upgrade=True, frontend="whatever" + ) class TestInstall(TestCase): def test_basic(self): c = MagicMock() - mod.install(c, 'postfix') - c.run.assert_called_once_with('DEBIAN_FRONTEND=noninteractive apt-get --assume-yes install postfix') + mod.install(c, "postfix") + c.run.assert_called_once_with( + "DEBIAN_FRONTEND=noninteractive apt-get --assume-yes install postfix" + ) class TestIsInstalled(TestCase): @@ -30,14 +34,14 @@ class TestIsInstalled(TestCase): def test_already_installed(self): c = MagicMock() c.run.return_value.ok = True - self.assertTrue(mod.is_installed(c, 'postfix')) - c.run.assert_called_once_with('dpkg-query -s postfix', warn=True) + self.assertTrue(mod.is_installed(c, "postfix")) + c.run.assert_called_once_with("dpkg-query -s postfix", warn=True) def test_not_installed(self): c = MagicMock() c.run.return_value.ok = False - self.assertFalse(mod.is_installed(c, 'postfix')) - c.run.assert_called_once_with('dpkg-query -s postfix', warn=True) + self.assertFalse(mod.is_installed(c, "postfix")) + c.run.assert_called_once_with("dpkg-query -s postfix", warn=True) class TestUpdate(TestCase): @@ -45,7 +49,7 @@ class TestUpdate(TestCase): def test_basic(self): c = MagicMock() mod.update(c) - c.run.assert_called_once_with('apt-get update') + c.run.assert_called_once_with("apt-get update") class TestUpgrade(TestCase): @@ -53,4 +57,6 @@ class TestUpgrade(TestCase): def test_basic(self): c = MagicMock() mod.upgrade(c) - c.run.assert_called_once_with('DEBIAN_FRONTEND=noninteractive apt-get --assume-yes --option Dpkg::Options::="--force-confdef" --option Dpkg::Options::="--force-confold" upgrade') + c.run.assert_called_once_with( + 'DEBIAN_FRONTEND=noninteractive apt-get --assume-yes --option Dpkg::Options::="--force-confdef" --option Dpkg::Options::="--force-confold" upgrade' + ) diff --git a/tests/test_postfix.py b/tests/test_postfix.py index 1f75253..86e01b1 100644 --- a/tests/test_postfix.py +++ b/tests/test_postfix.py @@ -10,7 +10,7 @@ class TestSetConfig(TestCase): def test_basic(self): c = MagicMock() - mod.set_config(c, 'foo', 'bar') + mod.set_config(c, "foo", "bar") c.run.assert_called_once_with("postconf -e 'foo=bar'") @@ -18,7 +18,7 @@ class TestSetMyhostname(TestCase): def test_basic(self): c = MagicMock() - mod.set_myhostname(c, 'test.example.com') + mod.set_myhostname(c, "test.example.com") c.run.assert_called_once_with("postconf -e 'myhostname=test.example.com'") @@ -26,7 +26,7 @@ class TestSetMyorigin(TestCase): def test_basic(self): c = MagicMock() - mod.set_myorigin(c, 'example.com') + mod.set_myorigin(c, "example.com") c.run.assert_called_once_with("postconf -e 'myorigin=example.com'") @@ -34,15 +34,17 @@ class TestSetMydestination(TestCase): def test_basic(self): c = MagicMock() - mod.set_mydestination(c, 'example.com', 'test.example.com', 'localhost') - c.run.assert_called_once_with("postconf -e 'mydestination=example.com, test.example.com, localhost'") + mod.set_mydestination(c, "example.com", "test.example.com", "localhost") + c.run.assert_called_once_with( + "postconf -e 'mydestination=example.com, test.example.com, localhost'" + ) class TestSetMynetworks(TestCase): def test_basic(self): c = MagicMock() - mod.set_mynetworks(c, '127.0.0.0/8', '[::1]/128') + mod.set_mynetworks(c, "127.0.0.0/8", "[::1]/128") c.run.assert_called_once_with("postconf -e 'mynetworks=127.0.0.0/8 [::1]/128'") @@ -50,5 +52,5 @@ class TestSetRelayhost(TestCase): def test_basic(self): c = MagicMock() - mod.set_relayhost(c, 'mail.example.com') + mod.set_relayhost(c, "mail.example.com") c.run.assert_called_once_with("postconf -e 'relayhost=mail.example.com'") diff --git a/tests/test_postgres.py b/tests/test_postgres.py index 95e49b4..bfe59a7 100644 --- a/tests/test_postgres.py +++ b/tests/test_postgres.py @@ -11,98 +11,115 @@ class TestSql(TestCase): def test_basic(self): c = MagicMock() mod.sql(c, "select @@version") - c.sudo.assert_called_once_with('psql --tuples-only --no-align --command="select @@version" ', - user='postgres') + c.sudo.assert_called_once_with( + 'psql --tuples-only --no-align --command="select @@version" ', + user="postgres", + ) class TestUserExists(TestCase): def test_user_exists(self): c = MagicMock() - with patch.object(mod, 'sql') as sql: - sql.return_value.stdout = 'foo' - self.assertTrue(mod.user_exists(c, 'foo')) - sql.assert_called_once_with(c, "SELECT rolname FROM pg_roles WHERE rolname = 'foo'", port=None) + with patch.object(mod, "sql") as sql: + sql.return_value.stdout = "foo" + self.assertTrue(mod.user_exists(c, "foo")) + sql.assert_called_once_with( + c, "SELECT rolname FROM pg_roles WHERE rolname = 'foo'", port=None + ) def test_user_does_not_exist(self): c = MagicMock() - with patch.object(mod, 'sql') as sql: - sql.return_value.stdout = '' - self.assertFalse(mod.user_exists(c, 'foo')) - sql.assert_called_once_with(c, "SELECT rolname FROM pg_roles WHERE rolname = 'foo'", port=None) + with patch.object(mod, "sql") as sql: + sql.return_value.stdout = "" + self.assertFalse(mod.user_exists(c, "foo")) + sql.assert_called_once_with( + c, "SELECT rolname FROM pg_roles WHERE rolname = 'foo'", port=None + ) class TestCreateUser(TestCase): def test_basic(self): c = MagicMock() - with patch.object(mod, 'set_user_password') as set_user_password: - mod.create_user(c, 'foo', checkfirst=False) - c.sudo.assert_called_once_with('createuser --no-createrole --no-superuser foo', - user='postgres') + with patch.object(mod, "set_user_password") as set_user_password: + mod.create_user(c, "foo", checkfirst=False) + c.sudo.assert_called_once_with( + "createuser --no-createrole --no-superuser foo", user="postgres" + ) set_user_password.assert_not_called() def test_user_exists(self): c = MagicMock() - with patch.object(mod, 'user_exists') as user_exists: + with patch.object(mod, "user_exists") as user_exists: user_exists.return_value = True - mod.create_user(c, 'foo') - user_exists.assert_called_once_with(c, 'foo', port=None) + mod.create_user(c, "foo") + user_exists.assert_called_once_with(c, "foo", port=None) c.sudo.assert_not_called() def test_with_password(self): c = MagicMock() - with patch.object(mod, 'set_user_password') as set_user_password: - mod.create_user(c, 'foo', 'foopass', checkfirst=False) - c.sudo.assert_called_once_with('createuser --no-createrole --no-superuser foo', - user='postgres') - set_user_password.assert_called_once_with(c, 'foo', 'foopass', port=None) + with patch.object(mod, "set_user_password") as set_user_password: + mod.create_user(c, "foo", "foopass", checkfirst=False) + c.sudo.assert_called_once_with( + "createuser --no-createrole --no-superuser foo", user="postgres" + ) + set_user_password.assert_called_once_with(c, "foo", "foopass", port=None) class TestSetUserPassword(TestCase): def test_basic(self): c = MagicMock() - with patch.object(mod, 'sql') as sql: - mod.set_user_password(c, 'foo', 'foopass') - sql.assert_called_once_with(c, "ALTER USER \\\"foo\\\" PASSWORD 'foopass';", - port=None, hide=True, echo=False) + with patch.object(mod, "sql") as sql: + mod.set_user_password(c, "foo", "foopass") + sql.assert_called_once_with( + c, + "ALTER USER \\\"foo\\\" PASSWORD 'foopass';", + port=None, + hide=True, + echo=False, + ) class TestDbExists(TestCase): def test_db_exists(self): c = MagicMock() - with patch.object(mod, 'sql') as sql: - sql.return_value.stdout = 'foo' - self.assertTrue(mod.db_exists(c, 'foo')) - sql.assert_called_once_with(c, "SELECT datname FROM pg_database WHERE datname = 'foo'", port=None) + with patch.object(mod, "sql") as sql: + sql.return_value.stdout = "foo" + self.assertTrue(mod.db_exists(c, "foo")) + sql.assert_called_once_with( + c, "SELECT datname FROM pg_database WHERE datname = 'foo'", port=None + ) def test_db_does_not_exist(self): c = MagicMock() - with patch.object(mod, 'sql') as sql: - sql.return_value.stdout = '' - self.assertFalse(mod.db_exists(c, 'foo')) - sql.assert_called_once_with(c, "SELECT datname FROM pg_database WHERE datname = 'foo'", port=None) + with patch.object(mod, "sql") as sql: + sql.return_value.stdout = "" + self.assertFalse(mod.db_exists(c, "foo")) + sql.assert_called_once_with( + c, "SELECT datname FROM pg_database WHERE datname = 'foo'", port=None + ) class TestCreateDb(TestCase): def test_basic(self): c = MagicMock() - mod.create_db(c, 'foo', checkfirst=False) - c.sudo.assert_called_once_with('createdb foo', user='postgres') + mod.create_db(c, "foo", checkfirst=False) + c.sudo.assert_called_once_with("createdb foo", user="postgres") def test_db_exists(self): c = MagicMock() - with patch.object(mod, 'db_exists') as db_exists: + with patch.object(mod, "db_exists") as db_exists: db_exists.return_value = True - mod.create_db(c, 'foo') - db_exists.assert_called_once_with(c, 'foo', port=None) + mod.create_db(c, "foo") + db_exists.assert_called_once_with(c, "foo", port=None) c.sudo.assert_not_called() @@ -110,17 +127,17 @@ class TestDropDb(TestCase): def test_basic(self): c = MagicMock() - mod.drop_db(c, 'foo', checkfirst=False) - c.sudo.assert_called_once_with('dropdb foo', user='postgres') + mod.drop_db(c, "foo", checkfirst=False) + c.sudo.assert_called_once_with("dropdb foo", user="postgres") def test_db_does_not_exist(self): c = MagicMock() - with patch.object(mod, 'db_exists') as db_exists: + with patch.object(mod, "db_exists") as db_exists: db_exists.return_value = False - mod.drop_db(c, 'foo') - db_exists.assert_called_once_with(c, 'foo') + mod.drop_db(c, "foo") + db_exists.assert_called_once_with(c, "foo") c.sudo.assert_not_called() @@ -128,7 +145,9 @@ class TestDumpDb(TestCase): def test_basic(self): c = MagicMock() - result = mod.dump_db(c, 'foo') - self.assertEqual(result, 'foo.sql.gz') - c.sudo.assert_called_once_with('pg_dump foo | gzip -c > /tmp/foo.sql.gz', user='postgres') - c.run.assert_called_with('rm /tmp/foo.sql.gz') + result = mod.dump_db(c, "foo") + self.assertEqual(result, "foo.sql.gz") + c.sudo.assert_called_once_with( + "pg_dump foo | gzip -c > /tmp/foo.sql.gz", user="postgres" + ) + c.run.assert_called_with("rm /tmp/foo.sql.gz") diff --git a/tests/test_ssh.py b/tests/test_ssh.py index 93b1cdc..bee1574 100644 --- a/tests/test_ssh.py +++ b/tests/test_ssh.py @@ -13,8 +13,8 @@ class TestCacheHostKey(TestCase): # assume the first command runs okay c.run.return_value.failed = False - mod.cache_host_key(c, 'example.com') - c.run.assert_called_once_with('ssh example.com whoami', warn=True) + mod.cache_host_key(c, "example.com") + c.run.assert_called_once_with("ssh example.com whoami", warn=True) def test_root_commands_not_allowed(self): c = MagicMock() @@ -22,25 +22,27 @@ class TestCacheHostKey(TestCase): # assume the first command fails b/c "disallowed" c.run.return_value.failed = True c.run.return_value.stderr = "Disallowed command" - mod.cache_host_key(c, 'example.com') - c.run.assert_called_once_with('ssh example.com whoami', warn=True) + mod.cache_host_key(c, "example.com") + c.run.assert_called_once_with("ssh example.com whoami", warn=True) def test_root_cache_key(self): c = MagicMock() # first command fails; second command caches host key c.run.return_value.failed = True - mod.cache_host_key(c, 'example.com') - c.run.assert_has_calls([call('ssh example.com whoami', warn=True)]) - c.run.assert_called_with('ssh -o StrictHostKeyChecking=no example.com whoami', warn=True) + mod.cache_host_key(c, "example.com") + c.run.assert_has_calls([call("ssh example.com whoami", warn=True)]) + c.run.assert_called_with( + "ssh -o StrictHostKeyChecking=no example.com whoami", warn=True + ) def test_user_already_cached(self): c = MagicMock() # assume the first command runs okay c.sudo.return_value.failed = False - mod.cache_host_key(c, 'example.com', user='foo') - c.sudo.assert_called_once_with('ssh example.com whoami', user='foo', warn=True) + mod.cache_host_key(c, "example.com", user="foo") + c.sudo.assert_called_once_with("ssh example.com whoami", user="foo", warn=True) def test_user_commands_not_allowed(self): c = MagicMock() @@ -48,15 +50,16 @@ class TestCacheHostKey(TestCase): # assume the first command fails b/c "disallowed" c.sudo.return_value.failed = True c.sudo.return_value.stderr = "Disallowed command" - mod.cache_host_key(c, 'example.com', user='foo') - c.sudo.assert_called_once_with('ssh example.com whoami', user='foo', warn=True) + mod.cache_host_key(c, "example.com", user="foo") + c.sudo.assert_called_once_with("ssh example.com whoami", user="foo", warn=True) def test_user_cache_key(self): c = MagicMock() # first command fails; second command caches host key c.sudo.return_value.failed = True - mod.cache_host_key(c, 'example.com', user='foo') - c.sudo.assert_has_calls([call('ssh example.com whoami', user='foo', warn=True)]) - c.sudo.assert_called_with('ssh -o StrictHostKeyChecking=no example.com whoami', - user='foo', warn=True) + mod.cache_host_key(c, "example.com", user="foo") + c.sudo.assert_has_calls([call("ssh example.com whoami", user="foo", warn=True)]) + c.sudo.assert_called_with( + "ssh -o StrictHostKeyChecking=no example.com whoami", user="foo", warn=True + ) diff --git a/tests/test_sync.py b/tests/test_sync.py index 0a11084..8d91a05 100644 --- a/tests/test_sync.py +++ b/tests/test_sync.py @@ -12,26 +12,26 @@ from wuttamess import sync as mod class TestMakeRoot(TestCase): def test_basic(self): - root = mod.make_root('files') + root = mod.make_root("files") self.assertIsInstance(root, SyncedRoot) - self.assertEqual(root.src, Path('files')) - self.assertEqual(root.dest, Path('/')) + self.assertEqual(root.src, Path("files")) + self.assertEqual(root.dest, Path("/")) class TestMakeSelector(TestCase): def test_basic(self): - selector = mod.make_selector('etc/postfix') + selector = mod.make_selector("etc/postfix") self.assertIsInstance(selector, ItemSelector) - self.assertEqual(selector.subpath, Path('etc/postfix')) + self.assertEqual(selector.subpath, Path("etc/postfix")) class TestIsync(TestCase): def test_basic(self): c = MagicMock() - root = mod.make_root('files') - with patch.object(mod, 'fabsync') as fabsync: + root = mod.make_root("files") + with patch.object(mod, "fabsync") as fabsync: fabsync.ItemSelector = ItemSelector # nothing to sync @@ -42,7 +42,7 @@ class TestIsync(TestCase): # sync one file fabsync.isync.reset_mock() - result = MagicMock(path='/foo', modified=True) + result = MagicMock(path="/foo", modified=True) fabsync.isync.return_value = [result] results = list(mod.isync(c, root)) self.assertEqual(results, [result]) @@ -50,34 +50,38 @@ class TestIsync(TestCase): # sync with selector (subpath) fabsync.isync.reset_mock() - result = MagicMock(path='/foo', modified=True) + result = MagicMock(path="/foo", modified=True) fabsync.isync.return_value = [result] - results = list(mod.isync(c, root, 'foo')) + results = list(mod.isync(c, root, "foo")) self.assertEqual(results, [result]) - fabsync.isync.assert_called_once_with(c, root, selector=fabsync.ItemSelector.new('foo')) + fabsync.isync.assert_called_once_with( + c, root, selector=fabsync.ItemSelector.new("foo") + ) # sync with selector (subpath + tags) fabsync.isync.reset_mock() - result = MagicMock(path='/foo', modified=True) + result = MagicMock(path="/foo", modified=True) fabsync.isync.return_value = [result] - results = list(mod.isync(c, root, 'foo', tags={'bar'})) + results = list(mod.isync(c, root, "foo", tags={"bar"})) self.assertEqual(results, [result]) - fabsync.isync.assert_called_once_with(c, root, selector=fabsync.ItemSelector.new('foo', tags={'bar'})) + fabsync.isync.assert_called_once_with( + c, root, selector=fabsync.ItemSelector.new("foo", tags={"bar"}) + ) class TestCheckIsync(TestCase): def test_basic(self): c = MagicMock() - root = mod.make_root('files') - with patch.object(mod, 'isync') as isync: + root = mod.make_root("files") + with patch.object(mod, "isync") as isync: # file(s) modified - result = MagicMock(path='/foo', modified=True) + result = MagicMock(path="/foo", modified=True) isync.return_value = [result] self.assertTrue(mod.check_isync(c, root)) # not modified - result = MagicMock(path='/foo', modified=False) + result = MagicMock(path="/foo", modified=False) isync.return_value = [result] self.assertFalse(mod.check_isync(c, root)) diff --git a/tests/test_util.py b/tests/test_util.py index 3793f93..7859634 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -11,17 +11,17 @@ class TestExists(TestCase): def test_basic(self): c = MagicMock() - mod.exists(c, '/foo') - c.run.assert_called_once_with('test -e /foo', warn=True) + mod.exists(c, "/foo") + c.run.assert_called_once_with("test -e /foo", warn=True) class TestHomePath(TestCase): def test_basic(self): c = MagicMock() - c.run.return_value.stdout = '/home/foo' - path = mod.get_home_path(c, user='foo') - self.assertEqual(path, '/home/foo') + c.run.return_value.stdout = "/home/foo" + path = mod.get_home_path(c, user="foo") + self.assertEqual(path, "/home/foo") class TestIsSymlink(TestCase): @@ -29,13 +29,13 @@ class TestIsSymlink(TestCase): def test_yes(self): c = MagicMock() c.run.return_value.failed = False - self.assertTrue(mod.is_symlink(c, '/foo')) + self.assertTrue(mod.is_symlink(c, "/foo")) c.run.assert_called_once_with('test -L "$(echo /foo)"', warn=True) def test_no(self): c = MagicMock() c.run.return_value.failed = True - self.assertFalse(mod.is_symlink(c, '/foo')) + self.assertFalse(mod.is_symlink(c, "/foo")) c.run.assert_called_once_with('test -L "$(echo /foo)"', warn=True) @@ -43,31 +43,39 @@ class TestMakoRenderer(TestCase): def test_basic(self): c = MagicMock() - renderer = mod.mako_renderer(c, env={'machine_is_live': True}) + renderer = mod.mako_renderer(c, env={"machine_is_live": True}) here = os.path.dirname(__file__) - path = os.path.join(here, 'files', 'bar', 'baz') + path = os.path.join(here, "files", "bar", "baz") rendered = renderer(path, vars={}) - self.assertEqual(rendered, 'machine_is_live = True') + self.assertEqual(rendered, "machine_is_live = True") class TestSetTimezone(TestCase): def test_symlink(self): c = MagicMock() - with patch.object(mod, 'is_symlink') as is_symlink: + with patch.object(mod, "is_symlink") as is_symlink: is_symlink.return_value = True - mod.set_timezone(c, 'America/Chicago') - c.run.assert_has_calls([ - call("bash -c 'echo America/Chicago > /etc/timezone'"), - ]) - c.run.assert_called_with('ln -sf /usr/share/zoneinfo/America/Chicago /etc/localtime') + mod.set_timezone(c, "America/Chicago") + c.run.assert_has_calls( + [ + call("bash -c 'echo America/Chicago > /etc/timezone'"), + ] + ) + c.run.assert_called_with( + "ln -sf /usr/share/zoneinfo/America/Chicago /etc/localtime" + ) def test_not_symlink(self): c = MagicMock() - with patch.object(mod, 'is_symlink') as is_symlink: + with patch.object(mod, "is_symlink") as is_symlink: is_symlink.return_value = False - mod.set_timezone(c, 'America/Chicago') - c.run.assert_has_calls([ - call("bash -c 'echo America/Chicago > /etc/timezone'"), - ]) - c.run.assert_called_with('cp /usr/share/zoneinfo/America/Chicago /etc/localtime') + mod.set_timezone(c, "America/Chicago") + c.run.assert_has_calls( + [ + call("bash -c 'echo America/Chicago > /etc/timezone'"), + ] + ) + c.run.assert_called_with( + "cp /usr/share/zoneinfo/America/Chicago /etc/localtime" + ) diff --git a/tests/test_wutta.py b/tests/test_wutta.py index 1b8436c..ca08919 100644 --- a/tests/test_wutta.py +++ b/tests/test_wutta.py @@ -12,15 +12,29 @@ class TestPurgeEmailSettings(TestCase): c = MagicMock() sql = MagicMock() postgres = MagicMock(sql=sql) - with patch.object(mod, 'postgres', new=postgres): - mod.purge_email_settings(c, 'testy', appname='wuttatest') - sql.assert_has_calls([ - call(c, "delete from setting where name like 'wuttatest.email.%.sender';", - database='testy'), - call(c, "delete from setting where name like 'wuttatest.email.%.to';", - database='testy'), - call(c, "delete from setting where name like 'wuttatest.email.%.cc';", - database='testy'), - call(c, "delete from setting where name like 'wuttatest.email.%.bcc';", - database='testy'), - ]) + with patch.object(mod, "postgres", new=postgres): + mod.purge_email_settings(c, "testy", appname="wuttatest") + sql.assert_has_calls( + [ + call( + c, + "delete from setting where name like 'wuttatest.email.%.sender';", + database="testy", + ), + call( + c, + "delete from setting where name like 'wuttatest.email.%.to';", + database="testy", + ), + call( + c, + "delete from setting where name like 'wuttatest.email.%.cc';", + database="testy", + ), + call( + c, + "delete from setting where name like 'wuttatest.email.%.bcc';", + database="testy", + ), + ] + )