fix: format all code with black

and from now on should not deviate from that...
This commit is contained in:
Lance Edgar 2025-08-31 12:52:36 -05:00
parent 2bd094b10b
commit 147d2fd871
17 changed files with 298 additions and 216 deletions

View file

@ -8,32 +8,32 @@
from importlib.metadata import version as get_version
project = 'WuttaMess'
copyright = '2024, Lance Edgar'
author = 'Lance Edgar'
release = get_version('WuttaMess')
project = "WuttaMess"
copyright = "2024, Lance Edgar"
author = "Lance Edgar"
release = get_version("WuttaMess")
# -- General configuration ---------------------------------------------------
# https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration
extensions = [
'sphinx.ext.autodoc',
'sphinx.ext.intersphinx',
'sphinx.ext.viewcode',
'sphinx.ext.todo',
"sphinx.ext.autodoc",
"sphinx.ext.intersphinx",
"sphinx.ext.viewcode",
"sphinx.ext.todo",
]
templates_path = ['_templates']
exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store']
templates_path = ["_templates"]
exclude_patterns = ["_build", "Thumbs.db", ".DS_Store"]
intersphinx_mapping = {
'fabsync': ('https://fabsync.ignorare.dev/', None),
'invoke': ('https://docs.pyinvoke.org/en/stable/', None),
"fabsync": ("https://fabsync.ignorare.dev/", None),
"invoke": ("https://docs.pyinvoke.org/en/stable/", None),
}
# -- Options for HTML output -------------------------------------------------
# https://www.sphinx-doc.org/en/master/usage/configuration.html#options-for-html-output
html_theme = 'furo'
html_static_path = ['_static']
html_theme = "furo"
html_static_path = ["_static"]

View file

@ -3,4 +3,4 @@
from importlib.metadata import version
__version__ = version('WuttaMess')
__version__ = version("WuttaMess")

View file

@ -25,7 +25,7 @@ APT package management
"""
def dist_upgrade(c, frontend='noninteractive'):
def dist_upgrade(c, frontend="noninteractive"):
"""
Run a full dist-upgrade for APT. Essentially this runs:
@ -46,9 +46,9 @@ def install(c, *packages, **kwargs):
apt install PKG [PKG ...]
"""
frontend = kwargs.pop('frontend', 'noninteractive')
packages = ' '.join(packages)
return c.run(f'DEBIAN_FRONTEND={frontend} apt-get --assume-yes install {packages}')
frontend = kwargs.pop("frontend", "noninteractive")
packages = " ".join(packages)
return c.run(f"DEBIAN_FRONTEND={frontend} apt-get --assume-yes install {packages}")
def is_installed(c, package):
@ -61,7 +61,7 @@ def is_installed(c, package):
:returns: ``True`` if package is installed, else ``False``.
"""
return c.run(f'dpkg-query -s {package}', warn=True).ok
return c.run(f"dpkg-query -s {package}", warn=True).ok
def update(c):
@ -72,10 +72,10 @@ def update(c):
apt update
"""
c.run('apt-get update')
c.run("apt-get update")
def upgrade(c, dist_upgrade=False, frontend='noninteractive'):
def upgrade(c, dist_upgrade=False, frontend="noninteractive"):
"""
Upgrade packages via APT. Essentially this runs:
@ -87,8 +87,8 @@ def upgrade(c, dist_upgrade=False, frontend='noninteractive'):
apt dist-upgrade
"""
options = ''
if frontend == 'noninteractive':
options = ""
if frontend == "noninteractive":
options = '--option Dpkg::Options::="--force-confdef" --option Dpkg::Options::="--force-confold"'
upgrade = 'dist-upgrade' if dist_upgrade else 'upgrade'
c.run(f'DEBIAN_FRONTEND={frontend} apt-get --assume-yes {options} {upgrade}')
upgrade = "dist-upgrade" if dist_upgrade else "upgrade"
c.run(f"DEBIAN_FRONTEND={frontend} apt-get --assume-yes {options} {upgrade}")

View file

@ -36,32 +36,32 @@ def set_myhostname(c, hostname):
"""
Configure the ``myhostname`` setting with the given string.
"""
set_config(c, 'myhostname', hostname)
set_config(c, "myhostname", hostname)
def set_myorigin(c, origin):
"""
Configure the ``myorigin`` setting with the given string.
"""
set_config(c, 'myorigin', origin)
set_config(c, "myorigin", origin)
def set_mydestination(c, *destinations):
"""
Configure the ``mydestinations`` setting with the given strings.
"""
set_config(c, 'mydestination', ', '.join(destinations))
set_config(c, "mydestination", ", ".join(destinations))
def set_mynetworks(c, *networks):
"""
Configure the ``mynetworks`` setting with the given strings.
"""
set_config(c, 'mynetworks', ' '.join(networks))
set_config(c, "mynetworks", " ".join(networks))
def set_relayhost(c, relayhost):
"""
Configure the ``relayhost`` setting with the given string
"""
set_config(c, 'relayhost', relayhost)
set_config(c, "relayhost", relayhost)

View file

@ -25,7 +25,7 @@ PostgreSQL DB utilities
"""
def sql(c, sql, database='', port=None, **kwargs):
def sql(c, sql, database="", port=None, **kwargs):
"""
Execute some SQL as the ``postgres`` user.
@ -38,9 +38,12 @@ def sql(c, sql, database='', port=None, **kwargs):
:param port: Optional port for PostgreSQL; default is 5432.
"""
port = f' --port={port}' if port else ''
return c.sudo(f'psql{port} --tuples-only --no-align --command="{sql}" {database}',
user='postgres', **kwargs)
port = f" --port={port}" if port else ""
return c.sudo(
f'psql{port} --tuples-only --no-align --command="{sql}" {database}',
user="postgres",
**kwargs,
)
def user_exists(c, name, port=None):
@ -55,7 +58,9 @@ def user_exists(c, name, port=None):
:returns: ``True`` if user exists, else ``False``.
"""
user = sql(c, f"SELECT rolname FROM pg_roles WHERE rolname = '{name}'", port=port).stdout.strip()
user = sql(
c, f"SELECT rolname FROM pg_roles WHERE rolname = '{name}'", port=port
).stdout.strip()
return bool(user)
@ -77,9 +82,11 @@ def create_user(c, name, password=None, port=None, checkfirst=True):
try to create user with no check.
"""
if not checkfirst or not user_exists(c, name, port=port):
portarg = f' --port={port}' if port else ''
c.sudo(f'createuser{portarg} --no-createrole --no-superuser {name}',
user='postgres')
portarg = f" --port={port}" if port else ""
c.sudo(
f"createuser{portarg} --no-createrole --no-superuser {name}",
user="postgres",
)
if password:
set_user_password(c, name, password, port=port)
@ -96,7 +103,13 @@ def set_user_password(c, name, password, port=None):
:param port: Optional port for PostgreSQL; default is 5432.
"""
sql(c, f"ALTER USER \\\"{name}\\\" PASSWORD '{password}';", port=port, hide=True, echo=False)
sql(
c,
f"ALTER USER \\\"{name}\\\" PASSWORD '{password}';",
port=port,
hide=True,
echo=False,
)
def db_exists(c, name, port=None):
@ -111,7 +124,9 @@ def db_exists(c, name, port=None):
:returns: ``True`` if database exists, else ``False``.
"""
db = sql(c, f"SELECT datname FROM pg_database WHERE datname = '{name}'", port=port).stdout.strip()
db = sql(
c, f"SELECT datname FROM pg_database WHERE datname = '{name}'", port=port
).stdout.strip()
return db == name
@ -132,10 +147,9 @@ def create_db(c, name, owner=None, port=None, checkfirst=True):
create DB with no check.
"""
if not checkfirst or not db_exists(c, name, port=port):
port = f' --port={port}' if port else ''
owner = f' --owner={owner}' if owner else ''
c.sudo(f'createdb{port}{owner} {name}',
user='postgres')
port = f" --port={port}" if port else ""
owner = f" --owner={owner}" if owner else ""
c.sudo(f"createdb{port}{owner} {name}", user="postgres")
def drop_db(c, name, checkfirst=True):
@ -151,7 +165,7 @@ def drop_db(c, name, checkfirst=True):
DB with no check.
"""
if not checkfirst or db_exists(c, name):
c.sudo(f'dropdb {name}', user='postgres')
c.sudo(f"dropdb {name}", user="postgres")
def dump_db(c, name):
@ -170,15 +184,15 @@ def dump_db(c, name):
filename and not the path, since the file is expected to exist
in the connected user's home folder.
"""
sql_name = f'{name}.sql'
gz_name = f'{sql_name}.gz'
tmp_name = f'/tmp/{gz_name}'
sql_name = f"{name}.sql"
gz_name = f"{sql_name}.gz"
tmp_name = f"/tmp/{gz_name}"
# TODO: when pg_dump fails the command still succeeds! (would this work?)
# cmd = f'set -e && pg_dump {name} | gzip -c > {tmp_name}'
cmd = f'pg_dump {name} | gzip -c > {tmp_name}'
cmd = f"pg_dump {name} | gzip -c > {tmp_name}"
c.sudo(cmd, user='postgres')
c.sudo(cmd, user="postgres")
c.run(f"cp {tmp_name} {gz_name}")
c.run(f"rm {tmp_name}")

View file

@ -46,11 +46,11 @@ def cache_host_key(c, host, port=None, user=None):
:param user: User on the fabric target whose SSH key cache should
be updated to include the given ``host``.
"""
port = f'-p {port} ' if port else ''
port = f"-p {port} " if port else ""
# first try to run a basic command over ssh
cmd = f'ssh {port}{host} whoami'
if user and user != 'root':
cmd = f"ssh {port}{host} whoami"
if user and user != "root":
result = c.sudo(cmd, user=user, warn=True)
else:
result = c.run(cmd, warn=True)
@ -68,8 +68,8 @@ def cache_host_key(c, host, port=None, user=None):
# okay then we now think that the ssh connection itself
# was not made, which presumably means we *do* need to
# cache the host key, so try that now
cmd = f'ssh -o StrictHostKeyChecking=no {port}{host} whoami'
if user and user != 'root':
cmd = f"ssh -o StrictHostKeyChecking=no {port}{host} whoami"
if user and user != "root":
c.sudo(cmd, user=user, warn=True)
else:
c.run(cmd, warn=True)

View file

@ -29,7 +29,7 @@ See :doc:`/narr/usage` for a basic example.
import fabsync
def make_root(path, dest='/'):
def make_root(path, dest="/"):
"""
Make and return a "root" object for use with future sync calls.
@ -91,9 +91,9 @@ def isync(c, root, selector=None, tags=None, echo=True, **kwargs):
if not isinstance(selector, fabsync.ItemSelector):
kw = {}
if tags:
kw['tags'] = tags
kw["tags"] = tags
selector = make_selector(selector, **kw)
kwargs['selector'] = selector
kwargs["selector"] = selector
for result in fabsync.isync(c, root, **kwargs):
if echo:
@ -111,5 +111,4 @@ def check_isync(c, root, selector=None, **kwargs):
:returns: ``True`` if any sync result indicates a file was
modified; otherwise ``False``.
"""
return any([result.modified
for result in isync(c, root, selector, **kwargs)])
return any([result.modified for result in isync(c, root, selector, **kwargs)])

View file

@ -34,7 +34,7 @@ def exists(c, path):
"""
Returns ``True`` if given path exists on the host, otherwise ``False``.
"""
return not c.run(f'test -e {path}', warn=True).failed
return not c.run(f"test -e {path}", warn=True).failed
def get_home_path(c, user=None):
@ -49,8 +49,8 @@ def get_home_path(c, user=None):
:returns: Home folder path as string.
"""
user = user or c.user
home = c.run(f'getent passwd {user} | cut -d: -f6').stdout.strip()
home = home.rstrip('/')
home = c.run(f"getent passwd {user} | cut -d: -f6").stdout.strip()
home = home.rstrip("/")
return home
@ -96,6 +96,7 @@ def mako_renderer(c, env={}):
sync.check_isync(c, root, 'etc/postfix', renderers=renderers)
"""
def render(path: Path, vars: Mapping[str, Any], **kwargs) -> bytes:
return Template(filename=str(path)).render(**env)
@ -113,7 +114,7 @@ def set_timezone(c, timezone):
"""
c.run(f"bash -c 'echo {timezone} > /etc/timezone'")
if is_symlink(c, '/etc/localtime'):
c.run(f'ln -sf /usr/share/zoneinfo/{timezone} /etc/localtime')
if is_symlink(c, "/etc/localtime"):
c.run(f"ln -sf /usr/share/zoneinfo/{timezone} /etc/localtime")
else:
c.run(f'cp /usr/share/zoneinfo/{timezone} /etc/localtime')
c.run(f"cp /usr/share/zoneinfo/{timezone} /etc/localtime")

View file

@ -27,7 +27,7 @@ Utilities for Wutta Framework
from wuttamess import postgres
def purge_email_settings(c, dbname, appname='wutta'):
def purge_email_settings(c, dbname, appname="wutta"):
"""
Purge production email settings for a database.
@ -47,11 +47,23 @@ def purge_email_settings(c, dbname, appname='wutta'):
:param appname: The ``appname`` used to determine setting names.
"""
postgres.sql(c, f"delete from setting where name like '{appname}.email.%.sender';",
database=dbname)
postgres.sql(c, f"delete from setting where name like '{appname}.email.%.to';",
database=dbname)
postgres.sql(c, f"delete from setting where name like '{appname}.email.%.cc';",
database=dbname)
postgres.sql(c, f"delete from setting where name like '{appname}.email.%.bcc';",
database=dbname)
postgres.sql(
c,
f"delete from setting where name like '{appname}.email.%.sender';",
database=dbname,
)
postgres.sql(
c,
f"delete from setting where name like '{appname}.email.%.to';",
database=dbname,
)
postgres.sql(
c,
f"delete from setting where name like '{appname}.email.%.cc';",
database=dbname,
)
postgres.sql(
c,
f"delete from setting where name like '{appname}.email.%.bcc';",
database=dbname,
)

View file

@ -15,10 +15,10 @@ def release(c, skip_tests=False):
Release a new version of WuttaMess
"""
if not skip_tests:
c.run('pytest')
c.run("pytest")
if os.path.exists('dist'):
shutil.rmtree('dist')
if os.path.exists("dist"):
shutil.rmtree("dist")
c.run('python -m build --sdist')
c.run('twine upload dist/*')
c.run("python -m build --sdist")
c.run("twine upload dist/*")

View file

@ -10,19 +10,23 @@ class TestDistUpgrade(TestCase):
def test_basic(self):
c = MagicMock()
with patch.object(mod, 'update') as update:
with patch.object(mod, 'upgrade') as upgrade:
mod.dist_upgrade(c, frontend='whatever')
with patch.object(mod, "update") as update:
with patch.object(mod, "upgrade") as upgrade:
mod.dist_upgrade(c, frontend="whatever")
update.assert_called_once_with(c)
upgrade.assert_called_once_with(c, dist_upgrade=True, frontend='whatever')
upgrade.assert_called_once_with(
c, dist_upgrade=True, frontend="whatever"
)
class TestInstall(TestCase):
def test_basic(self):
c = MagicMock()
mod.install(c, 'postfix')
c.run.assert_called_once_with('DEBIAN_FRONTEND=noninteractive apt-get --assume-yes install postfix')
mod.install(c, "postfix")
c.run.assert_called_once_with(
"DEBIAN_FRONTEND=noninteractive apt-get --assume-yes install postfix"
)
class TestIsInstalled(TestCase):
@ -30,14 +34,14 @@ class TestIsInstalled(TestCase):
def test_already_installed(self):
c = MagicMock()
c.run.return_value.ok = True
self.assertTrue(mod.is_installed(c, 'postfix'))
c.run.assert_called_once_with('dpkg-query -s postfix', warn=True)
self.assertTrue(mod.is_installed(c, "postfix"))
c.run.assert_called_once_with("dpkg-query -s postfix", warn=True)
def test_not_installed(self):
c = MagicMock()
c.run.return_value.ok = False
self.assertFalse(mod.is_installed(c, 'postfix'))
c.run.assert_called_once_with('dpkg-query -s postfix', warn=True)
self.assertFalse(mod.is_installed(c, "postfix"))
c.run.assert_called_once_with("dpkg-query -s postfix", warn=True)
class TestUpdate(TestCase):
@ -45,7 +49,7 @@ class TestUpdate(TestCase):
def test_basic(self):
c = MagicMock()
mod.update(c)
c.run.assert_called_once_with('apt-get update')
c.run.assert_called_once_with("apt-get update")
class TestUpgrade(TestCase):
@ -53,4 +57,6 @@ class TestUpgrade(TestCase):
def test_basic(self):
c = MagicMock()
mod.upgrade(c)
c.run.assert_called_once_with('DEBIAN_FRONTEND=noninteractive apt-get --assume-yes --option Dpkg::Options::="--force-confdef" --option Dpkg::Options::="--force-confold" upgrade')
c.run.assert_called_once_with(
'DEBIAN_FRONTEND=noninteractive apt-get --assume-yes --option Dpkg::Options::="--force-confdef" --option Dpkg::Options::="--force-confold" upgrade'
)

View file

@ -10,7 +10,7 @@ class TestSetConfig(TestCase):
def test_basic(self):
c = MagicMock()
mod.set_config(c, 'foo', 'bar')
mod.set_config(c, "foo", "bar")
c.run.assert_called_once_with("postconf -e 'foo=bar'")
@ -18,7 +18,7 @@ class TestSetMyhostname(TestCase):
def test_basic(self):
c = MagicMock()
mod.set_myhostname(c, 'test.example.com')
mod.set_myhostname(c, "test.example.com")
c.run.assert_called_once_with("postconf -e 'myhostname=test.example.com'")
@ -26,7 +26,7 @@ class TestSetMyorigin(TestCase):
def test_basic(self):
c = MagicMock()
mod.set_myorigin(c, 'example.com')
mod.set_myorigin(c, "example.com")
c.run.assert_called_once_with("postconf -e 'myorigin=example.com'")
@ -34,15 +34,17 @@ class TestSetMydestination(TestCase):
def test_basic(self):
c = MagicMock()
mod.set_mydestination(c, 'example.com', 'test.example.com', 'localhost')
c.run.assert_called_once_with("postconf -e 'mydestination=example.com, test.example.com, localhost'")
mod.set_mydestination(c, "example.com", "test.example.com", "localhost")
c.run.assert_called_once_with(
"postconf -e 'mydestination=example.com, test.example.com, localhost'"
)
class TestSetMynetworks(TestCase):
def test_basic(self):
c = MagicMock()
mod.set_mynetworks(c, '127.0.0.0/8', '[::1]/128')
mod.set_mynetworks(c, "127.0.0.0/8", "[::1]/128")
c.run.assert_called_once_with("postconf -e 'mynetworks=127.0.0.0/8 [::1]/128'")
@ -50,5 +52,5 @@ class TestSetRelayhost(TestCase):
def test_basic(self):
c = MagicMock()
mod.set_relayhost(c, 'mail.example.com')
mod.set_relayhost(c, "mail.example.com")
c.run.assert_called_once_with("postconf -e 'relayhost=mail.example.com'")

View file

@ -11,98 +11,115 @@ class TestSql(TestCase):
def test_basic(self):
c = MagicMock()
mod.sql(c, "select @@version")
c.sudo.assert_called_once_with('psql --tuples-only --no-align --command="select @@version" ',
user='postgres')
c.sudo.assert_called_once_with(
'psql --tuples-only --no-align --command="select @@version" ',
user="postgres",
)
class TestUserExists(TestCase):
def test_user_exists(self):
c = MagicMock()
with patch.object(mod, 'sql') as sql:
sql.return_value.stdout = 'foo'
self.assertTrue(mod.user_exists(c, 'foo'))
sql.assert_called_once_with(c, "SELECT rolname FROM pg_roles WHERE rolname = 'foo'", port=None)
with patch.object(mod, "sql") as sql:
sql.return_value.stdout = "foo"
self.assertTrue(mod.user_exists(c, "foo"))
sql.assert_called_once_with(
c, "SELECT rolname FROM pg_roles WHERE rolname = 'foo'", port=None
)
def test_user_does_not_exist(self):
c = MagicMock()
with patch.object(mod, 'sql') as sql:
sql.return_value.stdout = ''
self.assertFalse(mod.user_exists(c, 'foo'))
sql.assert_called_once_with(c, "SELECT rolname FROM pg_roles WHERE rolname = 'foo'", port=None)
with patch.object(mod, "sql") as sql:
sql.return_value.stdout = ""
self.assertFalse(mod.user_exists(c, "foo"))
sql.assert_called_once_with(
c, "SELECT rolname FROM pg_roles WHERE rolname = 'foo'", port=None
)
class TestCreateUser(TestCase):
def test_basic(self):
c = MagicMock()
with patch.object(mod, 'set_user_password') as set_user_password:
mod.create_user(c, 'foo', checkfirst=False)
c.sudo.assert_called_once_with('createuser --no-createrole --no-superuser foo',
user='postgres')
with patch.object(mod, "set_user_password") as set_user_password:
mod.create_user(c, "foo", checkfirst=False)
c.sudo.assert_called_once_with(
"createuser --no-createrole --no-superuser foo", user="postgres"
)
set_user_password.assert_not_called()
def test_user_exists(self):
c = MagicMock()
with patch.object(mod, 'user_exists') as user_exists:
with patch.object(mod, "user_exists") as user_exists:
user_exists.return_value = True
mod.create_user(c, 'foo')
user_exists.assert_called_once_with(c, 'foo', port=None)
mod.create_user(c, "foo")
user_exists.assert_called_once_with(c, "foo", port=None)
c.sudo.assert_not_called()
def test_with_password(self):
c = MagicMock()
with patch.object(mod, 'set_user_password') as set_user_password:
mod.create_user(c, 'foo', 'foopass', checkfirst=False)
c.sudo.assert_called_once_with('createuser --no-createrole --no-superuser foo',
user='postgres')
set_user_password.assert_called_once_with(c, 'foo', 'foopass', port=None)
with patch.object(mod, "set_user_password") as set_user_password:
mod.create_user(c, "foo", "foopass", checkfirst=False)
c.sudo.assert_called_once_with(
"createuser --no-createrole --no-superuser foo", user="postgres"
)
set_user_password.assert_called_once_with(c, "foo", "foopass", port=None)
class TestSetUserPassword(TestCase):
def test_basic(self):
c = MagicMock()
with patch.object(mod, 'sql') as sql:
mod.set_user_password(c, 'foo', 'foopass')
sql.assert_called_once_with(c, "ALTER USER \\\"foo\\\" PASSWORD 'foopass';",
port=None, hide=True, echo=False)
with patch.object(mod, "sql") as sql:
mod.set_user_password(c, "foo", "foopass")
sql.assert_called_once_with(
c,
"ALTER USER \\\"foo\\\" PASSWORD 'foopass';",
port=None,
hide=True,
echo=False,
)
class TestDbExists(TestCase):
def test_db_exists(self):
c = MagicMock()
with patch.object(mod, 'sql') as sql:
sql.return_value.stdout = 'foo'
self.assertTrue(mod.db_exists(c, 'foo'))
sql.assert_called_once_with(c, "SELECT datname FROM pg_database WHERE datname = 'foo'", port=None)
with patch.object(mod, "sql") as sql:
sql.return_value.stdout = "foo"
self.assertTrue(mod.db_exists(c, "foo"))
sql.assert_called_once_with(
c, "SELECT datname FROM pg_database WHERE datname = 'foo'", port=None
)
def test_db_does_not_exist(self):
c = MagicMock()
with patch.object(mod, 'sql') as sql:
sql.return_value.stdout = ''
self.assertFalse(mod.db_exists(c, 'foo'))
sql.assert_called_once_with(c, "SELECT datname FROM pg_database WHERE datname = 'foo'", port=None)
with patch.object(mod, "sql") as sql:
sql.return_value.stdout = ""
self.assertFalse(mod.db_exists(c, "foo"))
sql.assert_called_once_with(
c, "SELECT datname FROM pg_database WHERE datname = 'foo'", port=None
)
class TestCreateDb(TestCase):
def test_basic(self):
c = MagicMock()
mod.create_db(c, 'foo', checkfirst=False)
c.sudo.assert_called_once_with('createdb foo', user='postgres')
mod.create_db(c, "foo", checkfirst=False)
c.sudo.assert_called_once_with("createdb foo", user="postgres")
def test_db_exists(self):
c = MagicMock()
with patch.object(mod, 'db_exists') as db_exists:
with patch.object(mod, "db_exists") as db_exists:
db_exists.return_value = True
mod.create_db(c, 'foo')
db_exists.assert_called_once_with(c, 'foo', port=None)
mod.create_db(c, "foo")
db_exists.assert_called_once_with(c, "foo", port=None)
c.sudo.assert_not_called()
@ -110,17 +127,17 @@ class TestDropDb(TestCase):
def test_basic(self):
c = MagicMock()
mod.drop_db(c, 'foo', checkfirst=False)
c.sudo.assert_called_once_with('dropdb foo', user='postgres')
mod.drop_db(c, "foo", checkfirst=False)
c.sudo.assert_called_once_with("dropdb foo", user="postgres")
def test_db_does_not_exist(self):
c = MagicMock()
with patch.object(mod, 'db_exists') as db_exists:
with patch.object(mod, "db_exists") as db_exists:
db_exists.return_value = False
mod.drop_db(c, 'foo')
db_exists.assert_called_once_with(c, 'foo')
mod.drop_db(c, "foo")
db_exists.assert_called_once_with(c, "foo")
c.sudo.assert_not_called()
@ -128,7 +145,9 @@ class TestDumpDb(TestCase):
def test_basic(self):
c = MagicMock()
result = mod.dump_db(c, 'foo')
self.assertEqual(result, 'foo.sql.gz')
c.sudo.assert_called_once_with('pg_dump foo | gzip -c > /tmp/foo.sql.gz', user='postgres')
c.run.assert_called_with('rm /tmp/foo.sql.gz')
result = mod.dump_db(c, "foo")
self.assertEqual(result, "foo.sql.gz")
c.sudo.assert_called_once_with(
"pg_dump foo | gzip -c > /tmp/foo.sql.gz", user="postgres"
)
c.run.assert_called_with("rm /tmp/foo.sql.gz")

View file

@ -13,8 +13,8 @@ class TestCacheHostKey(TestCase):
# assume the first command runs okay
c.run.return_value.failed = False
mod.cache_host_key(c, 'example.com')
c.run.assert_called_once_with('ssh example.com whoami', warn=True)
mod.cache_host_key(c, "example.com")
c.run.assert_called_once_with("ssh example.com whoami", warn=True)
def test_root_commands_not_allowed(self):
c = MagicMock()
@ -22,25 +22,27 @@ class TestCacheHostKey(TestCase):
# assume the first command fails b/c "disallowed"
c.run.return_value.failed = True
c.run.return_value.stderr = "Disallowed command"
mod.cache_host_key(c, 'example.com')
c.run.assert_called_once_with('ssh example.com whoami', warn=True)
mod.cache_host_key(c, "example.com")
c.run.assert_called_once_with("ssh example.com whoami", warn=True)
def test_root_cache_key(self):
c = MagicMock()
# first command fails; second command caches host key
c.run.return_value.failed = True
mod.cache_host_key(c, 'example.com')
c.run.assert_has_calls([call('ssh example.com whoami', warn=True)])
c.run.assert_called_with('ssh -o StrictHostKeyChecking=no example.com whoami', warn=True)
mod.cache_host_key(c, "example.com")
c.run.assert_has_calls([call("ssh example.com whoami", warn=True)])
c.run.assert_called_with(
"ssh -o StrictHostKeyChecking=no example.com whoami", warn=True
)
def test_user_already_cached(self):
c = MagicMock()
# assume the first command runs okay
c.sudo.return_value.failed = False
mod.cache_host_key(c, 'example.com', user='foo')
c.sudo.assert_called_once_with('ssh example.com whoami', user='foo', warn=True)
mod.cache_host_key(c, "example.com", user="foo")
c.sudo.assert_called_once_with("ssh example.com whoami", user="foo", warn=True)
def test_user_commands_not_allowed(self):
c = MagicMock()
@ -48,15 +50,16 @@ class TestCacheHostKey(TestCase):
# assume the first command fails b/c "disallowed"
c.sudo.return_value.failed = True
c.sudo.return_value.stderr = "Disallowed command"
mod.cache_host_key(c, 'example.com', user='foo')
c.sudo.assert_called_once_with('ssh example.com whoami', user='foo', warn=True)
mod.cache_host_key(c, "example.com", user="foo")
c.sudo.assert_called_once_with("ssh example.com whoami", user="foo", warn=True)
def test_user_cache_key(self):
c = MagicMock()
# first command fails; second command caches host key
c.sudo.return_value.failed = True
mod.cache_host_key(c, 'example.com', user='foo')
c.sudo.assert_has_calls([call('ssh example.com whoami', user='foo', warn=True)])
c.sudo.assert_called_with('ssh -o StrictHostKeyChecking=no example.com whoami',
user='foo', warn=True)
mod.cache_host_key(c, "example.com", user="foo")
c.sudo.assert_has_calls([call("ssh example.com whoami", user="foo", warn=True)])
c.sudo.assert_called_with(
"ssh -o StrictHostKeyChecking=no example.com whoami", user="foo", warn=True
)

View file

@ -12,26 +12,26 @@ from wuttamess import sync as mod
class TestMakeRoot(TestCase):
def test_basic(self):
root = mod.make_root('files')
root = mod.make_root("files")
self.assertIsInstance(root, SyncedRoot)
self.assertEqual(root.src, Path('files'))
self.assertEqual(root.dest, Path('/'))
self.assertEqual(root.src, Path("files"))
self.assertEqual(root.dest, Path("/"))
class TestMakeSelector(TestCase):
def test_basic(self):
selector = mod.make_selector('etc/postfix')
selector = mod.make_selector("etc/postfix")
self.assertIsInstance(selector, ItemSelector)
self.assertEqual(selector.subpath, Path('etc/postfix'))
self.assertEqual(selector.subpath, Path("etc/postfix"))
class TestIsync(TestCase):
def test_basic(self):
c = MagicMock()
root = mod.make_root('files')
with patch.object(mod, 'fabsync') as fabsync:
root = mod.make_root("files")
with patch.object(mod, "fabsync") as fabsync:
fabsync.ItemSelector = ItemSelector
# nothing to sync
@ -42,7 +42,7 @@ class TestIsync(TestCase):
# sync one file
fabsync.isync.reset_mock()
result = MagicMock(path='/foo', modified=True)
result = MagicMock(path="/foo", modified=True)
fabsync.isync.return_value = [result]
results = list(mod.isync(c, root))
self.assertEqual(results, [result])
@ -50,34 +50,38 @@ class TestIsync(TestCase):
# sync with selector (subpath)
fabsync.isync.reset_mock()
result = MagicMock(path='/foo', modified=True)
result = MagicMock(path="/foo", modified=True)
fabsync.isync.return_value = [result]
results = list(mod.isync(c, root, 'foo'))
results = list(mod.isync(c, root, "foo"))
self.assertEqual(results, [result])
fabsync.isync.assert_called_once_with(c, root, selector=fabsync.ItemSelector.new('foo'))
fabsync.isync.assert_called_once_with(
c, root, selector=fabsync.ItemSelector.new("foo")
)
# sync with selector (subpath + tags)
fabsync.isync.reset_mock()
result = MagicMock(path='/foo', modified=True)
result = MagicMock(path="/foo", modified=True)
fabsync.isync.return_value = [result]
results = list(mod.isync(c, root, 'foo', tags={'bar'}))
results = list(mod.isync(c, root, "foo", tags={"bar"}))
self.assertEqual(results, [result])
fabsync.isync.assert_called_once_with(c, root, selector=fabsync.ItemSelector.new('foo', tags={'bar'}))
fabsync.isync.assert_called_once_with(
c, root, selector=fabsync.ItemSelector.new("foo", tags={"bar"})
)
class TestCheckIsync(TestCase):
def test_basic(self):
c = MagicMock()
root = mod.make_root('files')
with patch.object(mod, 'isync') as isync:
root = mod.make_root("files")
with patch.object(mod, "isync") as isync:
# file(s) modified
result = MagicMock(path='/foo', modified=True)
result = MagicMock(path="/foo", modified=True)
isync.return_value = [result]
self.assertTrue(mod.check_isync(c, root))
# not modified
result = MagicMock(path='/foo', modified=False)
result = MagicMock(path="/foo", modified=False)
isync.return_value = [result]
self.assertFalse(mod.check_isync(c, root))

View file

@ -11,17 +11,17 @@ class TestExists(TestCase):
def test_basic(self):
c = MagicMock()
mod.exists(c, '/foo')
c.run.assert_called_once_with('test -e /foo', warn=True)
mod.exists(c, "/foo")
c.run.assert_called_once_with("test -e /foo", warn=True)
class TestHomePath(TestCase):
def test_basic(self):
c = MagicMock()
c.run.return_value.stdout = '/home/foo'
path = mod.get_home_path(c, user='foo')
self.assertEqual(path, '/home/foo')
c.run.return_value.stdout = "/home/foo"
path = mod.get_home_path(c, user="foo")
self.assertEqual(path, "/home/foo")
class TestIsSymlink(TestCase):
@ -29,13 +29,13 @@ class TestIsSymlink(TestCase):
def test_yes(self):
c = MagicMock()
c.run.return_value.failed = False
self.assertTrue(mod.is_symlink(c, '/foo'))
self.assertTrue(mod.is_symlink(c, "/foo"))
c.run.assert_called_once_with('test -L "$(echo /foo)"', warn=True)
def test_no(self):
c = MagicMock()
c.run.return_value.failed = True
self.assertFalse(mod.is_symlink(c, '/foo'))
self.assertFalse(mod.is_symlink(c, "/foo"))
c.run.assert_called_once_with('test -L "$(echo /foo)"', warn=True)
@ -43,31 +43,39 @@ class TestMakoRenderer(TestCase):
def test_basic(self):
c = MagicMock()
renderer = mod.mako_renderer(c, env={'machine_is_live': True})
renderer = mod.mako_renderer(c, env={"machine_is_live": True})
here = os.path.dirname(__file__)
path = os.path.join(here, 'files', 'bar', 'baz')
path = os.path.join(here, "files", "bar", "baz")
rendered = renderer(path, vars={})
self.assertEqual(rendered, 'machine_is_live = True')
self.assertEqual(rendered, "machine_is_live = True")
class TestSetTimezone(TestCase):
def test_symlink(self):
c = MagicMock()
with patch.object(mod, 'is_symlink') as is_symlink:
with patch.object(mod, "is_symlink") as is_symlink:
is_symlink.return_value = True
mod.set_timezone(c, 'America/Chicago')
c.run.assert_has_calls([
mod.set_timezone(c, "America/Chicago")
c.run.assert_has_calls(
[
call("bash -c 'echo America/Chicago > /etc/timezone'"),
])
c.run.assert_called_with('ln -sf /usr/share/zoneinfo/America/Chicago /etc/localtime')
]
)
c.run.assert_called_with(
"ln -sf /usr/share/zoneinfo/America/Chicago /etc/localtime"
)
def test_not_symlink(self):
c = MagicMock()
with patch.object(mod, 'is_symlink') as is_symlink:
with patch.object(mod, "is_symlink") as is_symlink:
is_symlink.return_value = False
mod.set_timezone(c, 'America/Chicago')
c.run.assert_has_calls([
mod.set_timezone(c, "America/Chicago")
c.run.assert_has_calls(
[
call("bash -c 'echo America/Chicago > /etc/timezone'"),
])
c.run.assert_called_with('cp /usr/share/zoneinfo/America/Chicago /etc/localtime')
]
)
c.run.assert_called_with(
"cp /usr/share/zoneinfo/America/Chicago /etc/localtime"
)

View file

@ -12,15 +12,29 @@ class TestPurgeEmailSettings(TestCase):
c = MagicMock()
sql = MagicMock()
postgres = MagicMock(sql=sql)
with patch.object(mod, 'postgres', new=postgres):
mod.purge_email_settings(c, 'testy', appname='wuttatest')
sql.assert_has_calls([
call(c, "delete from setting where name like 'wuttatest.email.%.sender';",
database='testy'),
call(c, "delete from setting where name like 'wuttatest.email.%.to';",
database='testy'),
call(c, "delete from setting where name like 'wuttatest.email.%.cc';",
database='testy'),
call(c, "delete from setting where name like 'wuttatest.email.%.bcc';",
database='testy'),
])
with patch.object(mod, "postgres", new=postgres):
mod.purge_email_settings(c, "testy", appname="wuttatest")
sql.assert_has_calls(
[
call(
c,
"delete from setting where name like 'wuttatest.email.%.sender';",
database="testy",
),
call(
c,
"delete from setting where name like 'wuttatest.email.%.to';",
database="testy",
),
call(
c,
"delete from setting where name like 'wuttatest.email.%.cc';",
database="testy",
),
call(
c,
"delete from setting where name like 'wuttatest.email.%.bcc';",
database="testy",
),
]
)