Compare commits
No commits in common. "master" and "v0.1.0" have entirely different histories.
31 changed files with 63 additions and 1192 deletions
1
.gitignore
vendored
1
.gitignore
vendored
|
@ -1,5 +1,4 @@
|
|||
*~
|
||||
*.pyc
|
||||
.coverage
|
||||
dist/
|
||||
docs/_build/
|
|
@ -1,4 +0,0 @@
|
|||
# -*- mode: conf; -*-
|
||||
|
||||
[MESSAGES CONTROL]
|
||||
disable=fixme
|
26
CHANGELOG.md
26
CHANGELOG.md
|
@ -1,29 +1,3 @@
|
|||
|
||||
# Changelog
|
||||
All notable changes to WuttaMess will be documented in this file.
|
||||
|
||||
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
|
||||
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).
|
||||
|
||||
## v0.2.0 (2025-01-14)
|
||||
|
||||
### Feat
|
||||
|
||||
- add `util.get_home_path()` function
|
||||
- add `is_symlink()` and `set_timezone()` util functions
|
||||
- add `apt.is_installed()` function
|
||||
- add basic `postgres` module for db setup
|
||||
- add `ssh` module with `cache_host_key()` function
|
||||
- add `util.mako_renderer()` function
|
||||
- add `util` module with `exists()` function
|
||||
- add basic postfix config helpers
|
||||
|
||||
### Fix
|
||||
|
||||
- add `wutta.purge_email_settings()` for cloning prod DB to test
|
||||
- add `postgres.dump_db()` function
|
||||
- add `sync.make_selector()` convenience function
|
||||
|
||||
## v0.1.0 (2024-09-10)
|
||||
|
||||
### Feat
|
||||
|
|
|
@ -1,6 +0,0 @@
|
|||
|
||||
``wuttamess.postfix``
|
||||
=====================
|
||||
|
||||
.. automodule:: wuttamess.postfix
|
||||
:members:
|
|
@ -1,6 +0,0 @@
|
|||
|
||||
``wuttamess.postgres``
|
||||
======================
|
||||
|
||||
.. automodule:: wuttamess.postgres
|
||||
:members:
|
|
@ -1,6 +0,0 @@
|
|||
|
||||
``wuttamess.ssh``
|
||||
=================
|
||||
|
||||
.. automodule:: wuttamess.ssh
|
||||
:members:
|
|
@ -1,6 +0,0 @@
|
|||
|
||||
``wuttamess.util``
|
||||
==================
|
||||
|
||||
.. automodule:: wuttamess.util
|
||||
:members:
|
|
@ -1,6 +0,0 @@
|
|||
|
||||
``wuttamess.wutta``
|
||||
===================
|
||||
|
||||
.. automodule:: wuttamess.wutta
|
||||
:members:
|
28
docs/conf.py
28
docs/conf.py
|
@ -8,32 +8,32 @@
|
|||
|
||||
from importlib.metadata import version as get_version
|
||||
|
||||
project = "WuttaMess"
|
||||
copyright = "2024, Lance Edgar"
|
||||
author = "Lance Edgar"
|
||||
release = get_version("WuttaMess")
|
||||
project = 'WuttaMess'
|
||||
copyright = '2024, Lance Edgar'
|
||||
author = 'Lance Edgar'
|
||||
release = get_version('WuttaMess')
|
||||
|
||||
# -- General configuration ---------------------------------------------------
|
||||
# https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration
|
||||
|
||||
extensions = [
|
||||
"sphinx.ext.autodoc",
|
||||
"sphinx.ext.intersphinx",
|
||||
"sphinx.ext.viewcode",
|
||||
"sphinx.ext.todo",
|
||||
'sphinx.ext.autodoc',
|
||||
'sphinx.ext.intersphinx',
|
||||
'sphinx.ext.viewcode',
|
||||
'sphinx.ext.todo',
|
||||
]
|
||||
|
||||
templates_path = ["_templates"]
|
||||
exclude_patterns = ["_build", "Thumbs.db", ".DS_Store"]
|
||||
templates_path = ['_templates']
|
||||
exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store']
|
||||
|
||||
intersphinx_mapping = {
|
||||
"fabsync": ("https://fabsync.ignorare.dev/", None),
|
||||
"invoke": ("https://docs.pyinvoke.org/en/stable/", None),
|
||||
'fabsync': ('https://fabsync.ignorare.dev/', None),
|
||||
'invoke': ('https://docs.pyinvoke.org/en/stable/', None),
|
||||
}
|
||||
|
||||
|
||||
# -- Options for HTML output -------------------------------------------------
|
||||
# https://www.sphinx-doc.org/en/master/usage/configuration.html#options-for-html-output
|
||||
|
||||
html_theme = "furo"
|
||||
html_static_path = ["_static"]
|
||||
html_theme = 'furo'
|
||||
html_static_path = ['_static']
|
||||
|
|
|
@ -17,12 +17,6 @@ project.
|
|||
|
||||
.. _test coverage: https://buildbot.rattailproject.org/coverage/wuttamess/
|
||||
|
||||
.. image:: https://img.shields.io/badge/linting-pylint-yellowgreen
|
||||
:target: https://github.com/pylint-dev/pylint
|
||||
|
||||
.. image:: https://img.shields.io/badge/code%20style-black-000000.svg
|
||||
:target: https://github.com/psf/black
|
||||
|
||||
|
||||
.. toctree::
|
||||
:maxdepth: 2
|
||||
|
@ -37,9 +31,4 @@ project.
|
|||
|
||||
api/wuttamess
|
||||
api/wuttamess.apt
|
||||
api/wuttamess.postfix
|
||||
api/wuttamess.postgres
|
||||
api/wuttamess.ssh
|
||||
api/wuttamess.sync
|
||||
api/wuttamess.util
|
||||
api/wuttamess.wutta
|
||||
|
|
|
@ -52,15 +52,12 @@ merely a personal convention. You can define tasks however you need::
|
|||
"""
|
||||
|
||||
from fabric import task
|
||||
from wuttamess import apt, sync, util
|
||||
from wuttamess import apt, sync
|
||||
|
||||
|
||||
# nb. this is used below, for file sync
|
||||
root = sync.make_root('files')
|
||||
|
||||
# nb. this is for global mako template context etc.
|
||||
env = {'machine_is_live': False}
|
||||
|
||||
|
||||
@task
|
||||
def bootstrap_all(c):
|
||||
|
@ -77,13 +74,11 @@ merely a personal convention. You can define tasks however you need::
|
|||
"""
|
||||
Bootstrap the base system
|
||||
"""
|
||||
renderers = {'mako': util.mako_renderer(c, env)}
|
||||
|
||||
apt.dist_upgrade(c)
|
||||
|
||||
# postfix
|
||||
apt.install(c, 'postfix')
|
||||
if sync.check_isync(c, root, 'etc/postfix', renderers=renderers):
|
||||
if sync.check_isync(c, root, 'etc/postfix'):
|
||||
c.run('systemctl restart postfix')
|
||||
|
||||
|
||||
|
|
|
@ -6,7 +6,7 @@ build-backend = "hatchling.build"
|
|||
|
||||
[project]
|
||||
name = "WuttaMess"
|
||||
version = "0.2.0"
|
||||
version = "0.1.0"
|
||||
description = "Fabric Automation Helpers"
|
||||
readme = "README.md"
|
||||
authors = [{name = "Lance Edgar", email = "lance@wuttaproject.org"}]
|
||||
|
@ -31,20 +31,17 @@ requires-python = ">= 3.8"
|
|||
dependencies = [
|
||||
"fabric",
|
||||
"fabsync",
|
||||
"mako",
|
||||
"typing_extensions",
|
||||
]
|
||||
|
||||
|
||||
[project.optional-dependencies]
|
||||
docs = ["Sphinx", "furo"]
|
||||
tests = ["pylint", "pytest", "pytest-cov", "tox"]
|
||||
tests = ["pytest-cov", "tox"]
|
||||
|
||||
|
||||
[project.urls]
|
||||
Homepage = "https://wuttaproject.org/"
|
||||
Repository = "https://forgejo.wuttaproject.org/wutta/wuttamess"
|
||||
Issues = "https://forgejo.wuttaproject.org/wutta/wuttamess/issues"
|
||||
Changelog = "https://forgejo.wuttaproject.org/wutta/wuttamess/src/branch/master/CHANGELOG.md"
|
||||
|
||||
|
||||
|
|
|
@ -1,9 +1,6 @@
|
|||
# -*- coding: utf-8; -*-
|
||||
"""
|
||||
Package Version
|
||||
"""
|
||||
|
||||
from importlib.metadata import version
|
||||
|
||||
|
||||
__version__ = version("WuttaMess")
|
||||
__version__ = version('WuttaMess')
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
################################################################################
|
||||
#
|
||||
# WuttaMess -- Fabric Automation Helpers
|
||||
# Copyright © 2024-2025 Lance Edgar
|
||||
# Copyright © 2024 Lance Edgar
|
||||
#
|
||||
# This file is part of Wutta Framework.
|
||||
#
|
||||
|
@ -25,7 +25,7 @@ APT package management
|
|||
"""
|
||||
|
||||
|
||||
def dist_upgrade(c, frontend="noninteractive"):
|
||||
def dist_upgrade(c, frontend='noninteractive'):
|
||||
"""
|
||||
Run a full dist-upgrade for APT. Essentially this runs:
|
||||
|
||||
|
@ -46,22 +46,9 @@ def install(c, *packages, **kwargs):
|
|||
|
||||
apt install PKG [PKG ...]
|
||||
"""
|
||||
frontend = kwargs.pop("frontend", "noninteractive")
|
||||
packages = " ".join(packages)
|
||||
return c.run(f"DEBIAN_FRONTEND={frontend} apt-get --assume-yes install {packages}")
|
||||
|
||||
|
||||
def is_installed(c, package):
|
||||
"""
|
||||
Check if the given APT package is installed.
|
||||
|
||||
:param c: Fabric connection.
|
||||
|
||||
:param package: Name of package to be checked.
|
||||
|
||||
:returns: ``True`` if package is installed, else ``False``.
|
||||
"""
|
||||
return c.run(f"dpkg-query -s {package}", warn=True).ok
|
||||
frontend = kwargs.pop('frontend', 'noninteractive')
|
||||
packages = ' '.join(packages)
|
||||
return c.run(f'DEBIAN_FRONTEND={frontend} apt-get --assume-yes install {packages}')
|
||||
|
||||
|
||||
def update(c):
|
||||
|
@ -72,12 +59,10 @@ def update(c):
|
|||
|
||||
apt update
|
||||
"""
|
||||
c.run("apt-get update")
|
||||
c.run('apt-get update')
|
||||
|
||||
|
||||
def upgrade( # pylint: disable=redefined-outer-name
|
||||
c, dist_upgrade=False, frontend="noninteractive"
|
||||
):
|
||||
def upgrade(c, dist_upgrade=False, frontend='noninteractive'):
|
||||
"""
|
||||
Upgrade packages via APT. Essentially this runs:
|
||||
|
||||
|
@ -89,11 +74,8 @@ def upgrade( # pylint: disable=redefined-outer-name
|
|||
|
||||
apt dist-upgrade
|
||||
"""
|
||||
options = ""
|
||||
if frontend == "noninteractive":
|
||||
options = (
|
||||
'--option Dpkg::Options::="--force-confdef" '
|
||||
'--option Dpkg::Options::="--force-confold"'
|
||||
)
|
||||
upgrade = "dist-upgrade" if dist_upgrade else "upgrade"
|
||||
c.run(f"DEBIAN_FRONTEND={frontend} apt-get --assume-yes {options} {upgrade}")
|
||||
options = ''
|
||||
if frontend == 'noninteractive':
|
||||
options = '--option Dpkg::Options::="--force-confdef" --option Dpkg::Options::="--force-confold"'
|
||||
upgrade = 'dist-upgrade' if dist_upgrade else 'upgrade'
|
||||
c.run(f'DEBIAN_FRONTEND={frontend} apt-get --assume-yes {options} {upgrade}')
|
||||
|
|
|
@ -1,67 +0,0 @@
|
|||
# -*- coding: utf-8; -*-
|
||||
################################################################################
|
||||
#
|
||||
# WuttaMess -- Fabric Automation Helpers
|
||||
# Copyright © 2024 Lance Edgar
|
||||
#
|
||||
# This file is part of Wutta Framework.
|
||||
#
|
||||
# Wutta Framework is free software: you can redistribute it and/or modify it
|
||||
# under the terms of the GNU General Public License as published by the Free
|
||||
# Software Foundation, either version 3 of the License, or (at your option) any
|
||||
# later version.
|
||||
#
|
||||
# Wutta Framework is distributed in the hope that it will be useful, but
|
||||
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
|
||||
# more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License along with
|
||||
# Wutta Framework. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
################################################################################
|
||||
"""
|
||||
Postfix mail service
|
||||
"""
|
||||
|
||||
|
||||
def set_config(c, setting, value):
|
||||
"""
|
||||
Configure the given setting with the given value.
|
||||
"""
|
||||
c.run(f"postconf -e '{setting}={value}'")
|
||||
|
||||
|
||||
def set_myhostname(c, hostname):
|
||||
"""
|
||||
Configure the ``myhostname`` setting with the given string.
|
||||
"""
|
||||
set_config(c, "myhostname", hostname)
|
||||
|
||||
|
||||
def set_myorigin(c, origin):
|
||||
"""
|
||||
Configure the ``myorigin`` setting with the given string.
|
||||
"""
|
||||
set_config(c, "myorigin", origin)
|
||||
|
||||
|
||||
def set_mydestination(c, *destinations):
|
||||
"""
|
||||
Configure the ``mydestinations`` setting with the given strings.
|
||||
"""
|
||||
set_config(c, "mydestination", ", ".join(destinations))
|
||||
|
||||
|
||||
def set_mynetworks(c, *networks):
|
||||
"""
|
||||
Configure the ``mynetworks`` setting with the given strings.
|
||||
"""
|
||||
set_config(c, "mynetworks", " ".join(networks))
|
||||
|
||||
|
||||
def set_relayhost(c, relayhost):
|
||||
"""
|
||||
Configure the ``relayhost`` setting with the given string
|
||||
"""
|
||||
set_config(c, "relayhost", relayhost)
|
|
@ -1,199 +0,0 @@
|
|||
# -*- coding: utf-8; -*-
|
||||
################################################################################
|
||||
#
|
||||
# WuttaMess -- Fabric Automation Helpers
|
||||
# Copyright © 2024-2025 Lance Edgar
|
||||
#
|
||||
# This file is part of Wutta Framework.
|
||||
#
|
||||
# Wutta Framework is free software: you can redistribute it and/or modify it
|
||||
# under the terms of the GNU General Public License as published by the Free
|
||||
# Software Foundation, either version 3 of the License, or (at your option) any
|
||||
# later version.
|
||||
#
|
||||
# Wutta Framework is distributed in the hope that it will be useful, but
|
||||
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
|
||||
# more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License along with
|
||||
# Wutta Framework. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
################################################################################
|
||||
"""
|
||||
PostgreSQL DB utilities
|
||||
"""
|
||||
|
||||
|
||||
def sql(c, sql_, database="", port=None, **kwargs):
|
||||
"""
|
||||
Execute some SQL as the ``postgres`` user.
|
||||
|
||||
:param c: Fabric connection.
|
||||
|
||||
:param sql_: SQL string to execute.
|
||||
|
||||
:param database: Name of the database on which to execute the SQL.
|
||||
If not specified, default ``postgres`` is assumed.
|
||||
|
||||
:param port: Optional port for PostgreSQL; default is 5432.
|
||||
"""
|
||||
port = f" --port={port}" if port else ""
|
||||
return c.sudo(
|
||||
f'psql{port} --tuples-only --no-align --command="{sql_}" {database}',
|
||||
user="postgres",
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
|
||||
def user_exists(c, name, port=None):
|
||||
"""
|
||||
Determine if a given PostgreSQL user exists.
|
||||
|
||||
:param c: Fabric connection.
|
||||
|
||||
:param name: Username to check for.
|
||||
|
||||
:param port: Optional port for PostgreSQL; default is 5432.
|
||||
|
||||
:returns: ``True`` if user exists, else ``False``.
|
||||
"""
|
||||
user = sql(
|
||||
c, f"SELECT rolname FROM pg_roles WHERE rolname = '{name}'", port=port
|
||||
).stdout.strip()
|
||||
return bool(user)
|
||||
|
||||
|
||||
def create_user(c, name, password=None, port=None, checkfirst=True):
|
||||
"""
|
||||
Create a PostgreSQL user account.
|
||||
|
||||
:param c: Fabric connection.
|
||||
|
||||
:param name: Username to create.
|
||||
|
||||
:param password: Optional password for the new user. If set, will
|
||||
call :func:`set_user_password()`.
|
||||
|
||||
:param port: Optional port for PostgreSQL; default is 5432.
|
||||
|
||||
:param checkfirst: If true (the default), first check if user
|
||||
exists and skip creating if already present. If false, then
|
||||
try to create user with no check.
|
||||
"""
|
||||
if not checkfirst or not user_exists(c, name, port=port):
|
||||
portarg = f" --port={port}" if port else ""
|
||||
c.sudo(
|
||||
f"createuser{portarg} --no-createrole --no-superuser {name}",
|
||||
user="postgres",
|
||||
)
|
||||
if password:
|
||||
set_user_password(c, name, password, port=port)
|
||||
|
||||
|
||||
def set_user_password(c, name, password, port=None):
|
||||
"""
|
||||
Set the password for a PostgreSQL user account.
|
||||
|
||||
:param c: Fabric connection.
|
||||
|
||||
:param name: Username whose password is to be set.
|
||||
|
||||
:param password: Password for the new user.
|
||||
|
||||
:param port: Optional port for PostgreSQL; default is 5432.
|
||||
"""
|
||||
sql(
|
||||
c,
|
||||
f"ALTER USER \\\"{name}\\\" PASSWORD '{password}';",
|
||||
port=port,
|
||||
hide=True,
|
||||
echo=False,
|
||||
)
|
||||
|
||||
|
||||
def db_exists(c, name, port=None):
|
||||
"""
|
||||
Determine if a given PostgreSQL database exists.
|
||||
|
||||
:param c: Fabric connection.
|
||||
|
||||
:param name: Name of the database to check for.
|
||||
|
||||
:param port: Optional port for PostgreSQL; default is 5432.
|
||||
|
||||
:returns: ``True`` if database exists, else ``False``.
|
||||
"""
|
||||
db = sql(
|
||||
c, f"SELECT datname FROM pg_database WHERE datname = '{name}'", port=port
|
||||
).stdout.strip()
|
||||
return db == name
|
||||
|
||||
|
||||
def create_db(c, name, owner=None, port=None, checkfirst=True):
|
||||
"""
|
||||
Create a PostgreSQL database.
|
||||
|
||||
:param c: Fabric connection.
|
||||
|
||||
:param name: Name of the database to create.
|
||||
|
||||
:param owner: Optional role name to set as owner for the database.
|
||||
|
||||
:param port: Optional port for PostgreSQL; default is 5432.
|
||||
|
||||
:param checkfirst: If true (the default), first check if DB exists
|
||||
and skip creating if already present. If false, then try to
|
||||
create DB with no check.
|
||||
"""
|
||||
if not checkfirst or not db_exists(c, name, port=port):
|
||||
port = f" --port={port}" if port else ""
|
||||
owner = f" --owner={owner}" if owner else ""
|
||||
c.sudo(f"createdb{port}{owner} {name}", user="postgres")
|
||||
|
||||
|
||||
def drop_db(c, name, checkfirst=True):
|
||||
"""
|
||||
Drop a PostgreSQL database.
|
||||
|
||||
:param c: Fabric connection.
|
||||
|
||||
:param name: Name of the database to drop.
|
||||
|
||||
:param checkfirst: If true (the default), first check if DB exists
|
||||
and skip dropping if not present. If false, then try to drop
|
||||
DB with no check.
|
||||
"""
|
||||
if not checkfirst or db_exists(c, name):
|
||||
c.sudo(f"dropdb {name}", user="postgres")
|
||||
|
||||
|
||||
def dump_db(c, name):
|
||||
"""
|
||||
Dump a PostgreSQL database to file.
|
||||
|
||||
This uses the ``pg_dump`` and ``gzip`` commands to produce a
|
||||
compressed SQL dump. The filename returned is based on the
|
||||
``name`` provided, e.g. ``mydbname.sql.gz``.
|
||||
|
||||
:param c: Fabric connection.
|
||||
|
||||
:param name: Name of the database to dump.
|
||||
|
||||
:returns: Base name of the output file. We only return the
|
||||
filename and not the path, since the file is expected to exist
|
||||
in the connected user's home folder.
|
||||
"""
|
||||
sql_name = f"{name}.sql"
|
||||
gz_name = f"{sql_name}.gz"
|
||||
tmp_name = f"/tmp/{gz_name}"
|
||||
|
||||
# TODO: when pg_dump fails the command still succeeds! (would this work?)
|
||||
# cmd = f'set -e && pg_dump {name} | gzip -c > {tmp_name}'
|
||||
cmd = f"pg_dump {name} | gzip -c > {tmp_name}"
|
||||
|
||||
c.sudo(cmd, user="postgres")
|
||||
c.run(f"cp {tmp_name} {gz_name}")
|
||||
c.run(f"rm {tmp_name}")
|
||||
|
||||
return gz_name
|
|
@ -1,75 +0,0 @@
|
|||
# -*- coding: utf-8; -*-
|
||||
################################################################################
|
||||
#
|
||||
# WuttaMess -- Fabric Automation Helpers
|
||||
# Copyright © 2024 Lance Edgar
|
||||
#
|
||||
# This file is part of Wutta Framework.
|
||||
#
|
||||
# Wutta Framework is free software: you can redistribute it and/or modify it
|
||||
# under the terms of the GNU General Public License as published by the Free
|
||||
# Software Foundation, either version 3 of the License, or (at your option) any
|
||||
# later version.
|
||||
#
|
||||
# Wutta Framework is distributed in the hope that it will be useful, but
|
||||
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
|
||||
# more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License along with
|
||||
# Wutta Framework. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
################################################################################
|
||||
"""
|
||||
SSH Utilities
|
||||
"""
|
||||
|
||||
|
||||
def cache_host_key(c, host, port=None, user=None):
|
||||
"""
|
||||
Cache the SSH host key for the given host, for the given user.
|
||||
|
||||
:param c: Fabric connection.
|
||||
|
||||
:param host: Name or IP of the host whose key should be cached.
|
||||
|
||||
Note that you can specify a username along with the hostname if
|
||||
needed, e.g. any of these works:
|
||||
|
||||
* ``1.2.3.4``
|
||||
* ``foo@1.2.3.4``
|
||||
* ``example.com``
|
||||
* ``foo@example.com``
|
||||
|
||||
:param port: Optional SSH port for the ``host``; default is 22.
|
||||
|
||||
:param user: User on the fabric target whose SSH key cache should
|
||||
be updated to include the given ``host``.
|
||||
"""
|
||||
port = f"-p {port} " if port else ""
|
||||
|
||||
# first try to run a basic command over ssh
|
||||
cmd = f"ssh {port}{host} whoami"
|
||||
if user and user != "root":
|
||||
result = c.sudo(cmd, user=user, warn=True)
|
||||
else:
|
||||
result = c.run(cmd, warn=True)
|
||||
|
||||
# no need to update cache if command worked okay
|
||||
if not result.failed:
|
||||
return
|
||||
|
||||
# basic command failed, but in some cases that is simply b/c
|
||||
# normal commands are not allowed, although the ssh connection
|
||||
# itself was established okay. so here we check for that.
|
||||
if "Disallowed command" in result.stderr:
|
||||
return
|
||||
|
||||
# okay then we now think that the ssh connection itself
|
||||
# was not made, which presumably means we *do* need to
|
||||
# cache the host key, so try that now
|
||||
cmd = f"ssh -o StrictHostKeyChecking=no {port}{host} whoami"
|
||||
if user and user != "root":
|
||||
c.sudo(cmd, user=user, warn=True)
|
||||
else:
|
||||
c.run(cmd, warn=True)
|
|
@ -2,7 +2,7 @@
|
|||
################################################################################
|
||||
#
|
||||
# WuttaMess -- Fabric Automation Helpers
|
||||
# Copyright © 2024-2025 Lance Edgar
|
||||
# Copyright © 2024 Lance Edgar
|
||||
#
|
||||
# This file is part of Wutta Framework.
|
||||
#
|
||||
|
@ -29,7 +29,7 @@ See :doc:`/narr/usage` for a basic example.
|
|||
import fabsync
|
||||
|
||||
|
||||
def make_root(path, dest="/"):
|
||||
def make_root(path, dest='/'):
|
||||
"""
|
||||
Make and return a "root" object for use with future sync calls.
|
||||
|
||||
|
@ -44,56 +44,32 @@ def make_root(path, dest="/"):
|
|||
return fabsync.load(path, dest)
|
||||
|
||||
|
||||
def make_selector(subpath=None, **kwargs):
|
||||
"""
|
||||
Make and return an "item selector" for use with a sync call.
|
||||
|
||||
This is a convenience wrapper around
|
||||
:meth:`fabsync:fabsync.ItemSelector.new()`.
|
||||
|
||||
:param subpath: (Optional) Relative subpath of the file tree to
|
||||
sync, e.g. ``'etc/postfix'``.
|
||||
|
||||
:param tags: Optional iterable of tags to include; excluding any
|
||||
files which are not so tagged. E.g. ``{'foo'}``
|
||||
"""
|
||||
return fabsync.ItemSelector.new(subpath, **kwargs)
|
||||
|
||||
|
||||
def isync(c, root, selector=None, tags=None, echo=True, **kwargs):
|
||||
def isync(c, root, selector=None, echo=True, **kwargs):
|
||||
"""
|
||||
Sync files, yielding the result for each as it goes.
|
||||
|
||||
This is a convenience wrapper around
|
||||
:func:`fabsync:fabsync.isync()`.
|
||||
|
||||
:param c: Fabric connection.
|
||||
:param c: Connection object.
|
||||
|
||||
:param root: File tree "root" object as obtained from
|
||||
:func:`make_root()`.
|
||||
|
||||
:param selector: This can be a simple "subpath" string, indicating
|
||||
a section of the file tree (e.g. ``'etc/postfix'``). Or can be
|
||||
a :class:`fabsync.ItemSelector` instance.
|
||||
|
||||
:param tags: Optional iterable of tags to select. If ``selector``
|
||||
is a subpath string, and you specify ``tags`` then they will be
|
||||
included when creating the actual selector.
|
||||
a section of the file tree. For instance: ``'etc/postfix'``
|
||||
|
||||
:param echo: Flag indicating whether the path for each file synced
|
||||
should be echoed to stdout. Generally thought to be useful but
|
||||
may be disabled.
|
||||
|
||||
:param \\**kwargs: Any remaining kwargs are passed as-is to
|
||||
:param \**kwargs: Any remaining kwargs are passed as-is to
|
||||
:func:`fabsync:fabsync.isync()`.
|
||||
"""
|
||||
if selector:
|
||||
if not isinstance(selector, fabsync.ItemSelector):
|
||||
kw = {}
|
||||
if tags:
|
||||
kw["tags"] = tags
|
||||
selector = make_selector(selector, **kw)
|
||||
kwargs["selector"] = selector
|
||||
selector = fabsync.ItemSelector.new(selector)
|
||||
kwargs['selector'] = selector
|
||||
|
||||
for result in fabsync.isync(c, root, **kwargs):
|
||||
if echo:
|
||||
|
@ -111,6 +87,5 @@ def check_isync(c, root, selector=None, **kwargs):
|
|||
:returns: ``True`` if any sync result indicates a file was
|
||||
modified; otherwise ``False``.
|
||||
"""
|
||||
return any( # pylint: disable=use-a-generator
|
||||
[result.modified for result in isync(c, root, selector, **kwargs)]
|
||||
)
|
||||
return any([result.modified
|
||||
for result in isync(c, root, selector, **kwargs)])
|
||||
|
|
|
@ -1,124 +0,0 @@
|
|||
# -*- coding: utf-8; -*-
|
||||
################################################################################
|
||||
#
|
||||
# WuttaMess -- Fabric Automation Helpers
|
||||
# Copyright © 2024-2025 Lance Edgar
|
||||
#
|
||||
# This file is part of Wutta Framework.
|
||||
#
|
||||
# Wutta Framework is free software: you can redistribute it and/or modify it
|
||||
# under the terms of the GNU General Public License as published by the Free
|
||||
# Software Foundation, either version 3 of the License, or (at your option) any
|
||||
# later version.
|
||||
#
|
||||
# Wutta Framework is distributed in the hope that it will be useful, but
|
||||
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
|
||||
# more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License along with
|
||||
# Wutta Framework. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
################################################################################
|
||||
"""
|
||||
Misc. Utilities
|
||||
"""
|
||||
|
||||
from pathlib import Path
|
||||
from typing import Mapping
|
||||
from typing_extensions import Any
|
||||
|
||||
from mako.template import Template
|
||||
|
||||
|
||||
def exists(c, path):
|
||||
"""
|
||||
Returns ``True`` if given path exists on the host, otherwise ``False``.
|
||||
"""
|
||||
return not c.run(f"test -e {path}", warn=True).failed
|
||||
|
||||
|
||||
def get_home_path(c, user=None):
|
||||
"""
|
||||
Get the path to user's home folder on target machine.
|
||||
|
||||
:param c: Fabric connection.
|
||||
|
||||
:param user: Username whose home folder you want. If not
|
||||
specified, the username for the current connection is assumed.
|
||||
|
||||
:returns: Home folder path as string.
|
||||
"""
|
||||
user = user or c.user
|
||||
home = c.run(f"getent passwd {user} | cut -d: -f6").stdout.strip()
|
||||
home = home.rstrip("/")
|
||||
return home
|
||||
|
||||
|
||||
def is_symlink(c, path):
|
||||
"""
|
||||
Check if the given path is a symlink.
|
||||
|
||||
:param c: Fabric connection.
|
||||
|
||||
:param path: Path to check, on target machine.
|
||||
|
||||
:returns: ``True`` if path is a symlink, else ``False``.
|
||||
"""
|
||||
# nb. this function is derived from one copied from fabric v1
|
||||
cmd = f'test -L "$(echo {path})"'
|
||||
result = c.run(cmd, warn=True)
|
||||
return not result.failed
|
||||
|
||||
|
||||
def mako_renderer(c, env=None): # pylint: disable=unused-argument
|
||||
"""
|
||||
This returns a *function* suitable for use as a ``fabsync`` file
|
||||
renderer. The function assumes the file is a Mako template.
|
||||
|
||||
:param c: Fabric connection.
|
||||
|
||||
:param env: Environment dictionary to be used as Mako template
|
||||
context.
|
||||
|
||||
Typical usage is something like::
|
||||
|
||||
from fabric import task
|
||||
from wuttamess import sync, util
|
||||
|
||||
root = sync.make_root('files')
|
||||
env = {}
|
||||
|
||||
@task
|
||||
def foo(c):
|
||||
|
||||
# define possible renderers for fabsync
|
||||
renderers = {'mako': util.mako_renderer(c, env)}
|
||||
|
||||
sync.check_isync(c, root, 'etc/postfix', renderers=renderers)
|
||||
"""
|
||||
env = env or {}
|
||||
|
||||
def render( # pylint: disable=redefined-builtin,unused-argument
|
||||
path: Path, vars: Mapping[str, Any], **kwargs
|
||||
) -> bytes:
|
||||
return Template(filename=str(path)).render(**env)
|
||||
|
||||
return render
|
||||
|
||||
|
||||
def set_timezone(c, timezone):
|
||||
"""
|
||||
Set the system timezone.
|
||||
|
||||
:param c: Fabric connection.
|
||||
|
||||
:param timezone: Standard timezone name,
|
||||
e.g. ``'America/Chicago'``.
|
||||
"""
|
||||
c.run(f"bash -c 'echo {timezone} > /etc/timezone'")
|
||||
|
||||
if is_symlink(c, "/etc/localtime"):
|
||||
c.run(f"ln -sf /usr/share/zoneinfo/{timezone} /etc/localtime")
|
||||
else:
|
||||
c.run(f"cp /usr/share/zoneinfo/{timezone} /etc/localtime")
|
|
@ -1,69 +0,0 @@
|
|||
# -*- coding: utf-8; -*-
|
||||
################################################################################
|
||||
#
|
||||
# WuttaMess -- Fabric Automation Helpers
|
||||
# Copyright © 2024-2025 Lance Edgar
|
||||
#
|
||||
# This file is part of Wutta Framework.
|
||||
#
|
||||
# Wutta Framework is free software: you can redistribute it and/or modify it
|
||||
# under the terms of the GNU General Public License as published by the Free
|
||||
# Software Foundation, either version 3 of the License, or (at your option) any
|
||||
# later version.
|
||||
#
|
||||
# Wutta Framework is distributed in the hope that it will be useful, but
|
||||
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
|
||||
# more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License along with
|
||||
# Wutta Framework. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
################################################################################
|
||||
"""
|
||||
Utilities for Wutta Framework
|
||||
"""
|
||||
|
||||
from wuttamess import postgres
|
||||
|
||||
|
||||
def purge_email_settings(c, dbname, appname="wutta"):
|
||||
"""
|
||||
Purge production email settings for a database.
|
||||
|
||||
This can be used when cloning a production app DB to a test
|
||||
server. The general pattern is:
|
||||
|
||||
* setup test app on test server
|
||||
* config file should specify test email settings
|
||||
* clone the production DB to test server
|
||||
* call this function to purge email settings from test DB
|
||||
|
||||
So the end result should be, the test server app can run and send
|
||||
emails safely using only what is specified in config file(s),
|
||||
since none of the production email settings remain in the test DB.
|
||||
|
||||
:param dbname: Name of the database to be updated.
|
||||
|
||||
:param appname: The ``appname`` used to determine setting names.
|
||||
"""
|
||||
postgres.sql(
|
||||
c,
|
||||
f"delete from setting where name like '{appname}.email.%.sender';",
|
||||
database=dbname,
|
||||
)
|
||||
postgres.sql(
|
||||
c,
|
||||
f"delete from setting where name like '{appname}.email.%.to';",
|
||||
database=dbname,
|
||||
)
|
||||
postgres.sql(
|
||||
c,
|
||||
f"delete from setting where name like '{appname}.email.%.cc';",
|
||||
database=dbname,
|
||||
)
|
||||
postgres.sql(
|
||||
c,
|
||||
f"delete from setting where name like '{appname}.email.%.bcc';",
|
||||
database=dbname,
|
||||
)
|
24
tasks.py
24
tasks.py
|
@ -1,24 +0,0 @@
|
|||
# -*- coding: utf-8; -*-
|
||||
"""
|
||||
Tasks for WuttaMess
|
||||
"""
|
||||
|
||||
import os
|
||||
import shutil
|
||||
|
||||
from invoke import task
|
||||
|
||||
|
||||
@task
|
||||
def release(c, skip_tests=False):
|
||||
"""
|
||||
Release a new version of WuttaMess
|
||||
"""
|
||||
if not skip_tests:
|
||||
c.run("pytest")
|
||||
|
||||
if os.path.exists("dist"):
|
||||
shutil.rmtree("dist")
|
||||
|
||||
c.run("python -m build --sdist")
|
||||
c.run("twine upload dist/*")
|
|
@ -1,4 +0,0 @@
|
|||
|
||||
[files."baz"]
|
||||
renderer = 'mako'
|
||||
tags = ['baz']
|
|
@ -1 +0,0 @@
|
|||
machine_is_live = ${machine_is_live}
|
|
@ -10,38 +10,19 @@ class TestDistUpgrade(TestCase):
|
|||
|
||||
def test_basic(self):
|
||||
c = MagicMock()
|
||||
with patch.object(mod, "update") as update:
|
||||
with patch.object(mod, "upgrade") as upgrade:
|
||||
mod.dist_upgrade(c, frontend="whatever")
|
||||
with patch.object(mod, 'update') as update:
|
||||
with patch.object(mod, 'upgrade') as upgrade:
|
||||
mod.dist_upgrade(c, frontend='whatever')
|
||||
update.assert_called_once_with(c)
|
||||
upgrade.assert_called_once_with(
|
||||
c, dist_upgrade=True, frontend="whatever"
|
||||
)
|
||||
upgrade.assert_called_once_with(c, dist_upgrade=True, frontend='whatever')
|
||||
|
||||
|
||||
class TestInstall(TestCase):
|
||||
|
||||
def test_basic(self):
|
||||
c = MagicMock()
|
||||
mod.install(c, "postfix")
|
||||
c.run.assert_called_once_with(
|
||||
"DEBIAN_FRONTEND=noninteractive apt-get --assume-yes install postfix"
|
||||
)
|
||||
|
||||
|
||||
class TestIsInstalled(TestCase):
|
||||
|
||||
def test_already_installed(self):
|
||||
c = MagicMock()
|
||||
c.run.return_value.ok = True
|
||||
self.assertTrue(mod.is_installed(c, "postfix"))
|
||||
c.run.assert_called_once_with("dpkg-query -s postfix", warn=True)
|
||||
|
||||
def test_not_installed(self):
|
||||
c = MagicMock()
|
||||
c.run.return_value.ok = False
|
||||
self.assertFalse(mod.is_installed(c, "postfix"))
|
||||
c.run.assert_called_once_with("dpkg-query -s postfix", warn=True)
|
||||
mod.install(c, 'postfix')
|
||||
c.run.assert_called_once_with('DEBIAN_FRONTEND=noninteractive apt-get --assume-yes install postfix')
|
||||
|
||||
|
||||
class TestUpdate(TestCase):
|
||||
|
@ -49,7 +30,7 @@ class TestUpdate(TestCase):
|
|||
def test_basic(self):
|
||||
c = MagicMock()
|
||||
mod.update(c)
|
||||
c.run.assert_called_once_with("apt-get update")
|
||||
c.run.assert_called_once_with('apt-get update')
|
||||
|
||||
|
||||
class TestUpgrade(TestCase):
|
||||
|
@ -57,6 +38,4 @@ class TestUpgrade(TestCase):
|
|||
def test_basic(self):
|
||||
c = MagicMock()
|
||||
mod.upgrade(c)
|
||||
c.run.assert_called_once_with(
|
||||
'DEBIAN_FRONTEND=noninteractive apt-get --assume-yes --option Dpkg::Options::="--force-confdef" --option Dpkg::Options::="--force-confold" upgrade'
|
||||
)
|
||||
c.run.assert_called_once_with('DEBIAN_FRONTEND=noninteractive apt-get --assume-yes --option Dpkg::Options::="--force-confdef" --option Dpkg::Options::="--force-confold" upgrade')
|
||||
|
|
|
@ -1,56 +0,0 @@
|
|||
# -*- coding: utf-8; -*-
|
||||
|
||||
from unittest import TestCase
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
from wuttamess import postfix as mod
|
||||
|
||||
|
||||
class TestSetConfig(TestCase):
|
||||
|
||||
def test_basic(self):
|
||||
c = MagicMock()
|
||||
mod.set_config(c, "foo", "bar")
|
||||
c.run.assert_called_once_with("postconf -e 'foo=bar'")
|
||||
|
||||
|
||||
class TestSetMyhostname(TestCase):
|
||||
|
||||
def test_basic(self):
|
||||
c = MagicMock()
|
||||
mod.set_myhostname(c, "test.example.com")
|
||||
c.run.assert_called_once_with("postconf -e 'myhostname=test.example.com'")
|
||||
|
||||
|
||||
class TestSetMyorigin(TestCase):
|
||||
|
||||
def test_basic(self):
|
||||
c = MagicMock()
|
||||
mod.set_myorigin(c, "example.com")
|
||||
c.run.assert_called_once_with("postconf -e 'myorigin=example.com'")
|
||||
|
||||
|
||||
class TestSetMydestination(TestCase):
|
||||
|
||||
def test_basic(self):
|
||||
c = MagicMock()
|
||||
mod.set_mydestination(c, "example.com", "test.example.com", "localhost")
|
||||
c.run.assert_called_once_with(
|
||||
"postconf -e 'mydestination=example.com, test.example.com, localhost'"
|
||||
)
|
||||
|
||||
|
||||
class TestSetMynetworks(TestCase):
|
||||
|
||||
def test_basic(self):
|
||||
c = MagicMock()
|
||||
mod.set_mynetworks(c, "127.0.0.0/8", "[::1]/128")
|
||||
c.run.assert_called_once_with("postconf -e 'mynetworks=127.0.0.0/8 [::1]/128'")
|
||||
|
||||
|
||||
class TestSetRelayhost(TestCase):
|
||||
|
||||
def test_basic(self):
|
||||
c = MagicMock()
|
||||
mod.set_relayhost(c, "mail.example.com")
|
||||
c.run.assert_called_once_with("postconf -e 'relayhost=mail.example.com'")
|
|
@ -1,153 +0,0 @@
|
|||
# -*- coding: utf-8; -*-
|
||||
|
||||
from unittest import TestCase
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from wuttamess import postgres as mod
|
||||
|
||||
|
||||
class TestSql(TestCase):
|
||||
|
||||
def test_basic(self):
|
||||
c = MagicMock()
|
||||
mod.sql(c, "select @@version")
|
||||
c.sudo.assert_called_once_with(
|
||||
'psql --tuples-only --no-align --command="select @@version" ',
|
||||
user="postgres",
|
||||
)
|
||||
|
||||
|
||||
class TestUserExists(TestCase):
|
||||
|
||||
def test_user_exists(self):
|
||||
c = MagicMock()
|
||||
with patch.object(mod, "sql") as sql:
|
||||
sql.return_value.stdout = "foo"
|
||||
self.assertTrue(mod.user_exists(c, "foo"))
|
||||
sql.assert_called_once_with(
|
||||
c, "SELECT rolname FROM pg_roles WHERE rolname = 'foo'", port=None
|
||||
)
|
||||
|
||||
def test_user_does_not_exist(self):
|
||||
c = MagicMock()
|
||||
with patch.object(mod, "sql") as sql:
|
||||
sql.return_value.stdout = ""
|
||||
self.assertFalse(mod.user_exists(c, "foo"))
|
||||
sql.assert_called_once_with(
|
||||
c, "SELECT rolname FROM pg_roles WHERE rolname = 'foo'", port=None
|
||||
)
|
||||
|
||||
|
||||
class TestCreateUser(TestCase):
|
||||
|
||||
def test_basic(self):
|
||||
c = MagicMock()
|
||||
with patch.object(mod, "set_user_password") as set_user_password:
|
||||
mod.create_user(c, "foo", checkfirst=False)
|
||||
c.sudo.assert_called_once_with(
|
||||
"createuser --no-createrole --no-superuser foo", user="postgres"
|
||||
)
|
||||
set_user_password.assert_not_called()
|
||||
|
||||
def test_user_exists(self):
|
||||
c = MagicMock()
|
||||
|
||||
with patch.object(mod, "user_exists") as user_exists:
|
||||
user_exists.return_value = True
|
||||
|
||||
mod.create_user(c, "foo")
|
||||
user_exists.assert_called_once_with(c, "foo", port=None)
|
||||
c.sudo.assert_not_called()
|
||||
|
||||
def test_with_password(self):
|
||||
c = MagicMock()
|
||||
with patch.object(mod, "set_user_password") as set_user_password:
|
||||
mod.create_user(c, "foo", "foopass", checkfirst=False)
|
||||
c.sudo.assert_called_once_with(
|
||||
"createuser --no-createrole --no-superuser foo", user="postgres"
|
||||
)
|
||||
set_user_password.assert_called_once_with(c, "foo", "foopass", port=None)
|
||||
|
||||
|
||||
class TestSetUserPassword(TestCase):
|
||||
|
||||
def test_basic(self):
|
||||
c = MagicMock()
|
||||
with patch.object(mod, "sql") as sql:
|
||||
mod.set_user_password(c, "foo", "foopass")
|
||||
sql.assert_called_once_with(
|
||||
c,
|
||||
"ALTER USER \\\"foo\\\" PASSWORD 'foopass';",
|
||||
port=None,
|
||||
hide=True,
|
||||
echo=False,
|
||||
)
|
||||
|
||||
|
||||
class TestDbExists(TestCase):
|
||||
|
||||
def test_db_exists(self):
|
||||
c = MagicMock()
|
||||
with patch.object(mod, "sql") as sql:
|
||||
sql.return_value.stdout = "foo"
|
||||
self.assertTrue(mod.db_exists(c, "foo"))
|
||||
sql.assert_called_once_with(
|
||||
c, "SELECT datname FROM pg_database WHERE datname = 'foo'", port=None
|
||||
)
|
||||
|
||||
def test_db_does_not_exist(self):
|
||||
c = MagicMock()
|
||||
with patch.object(mod, "sql") as sql:
|
||||
sql.return_value.stdout = ""
|
||||
self.assertFalse(mod.db_exists(c, "foo"))
|
||||
sql.assert_called_once_with(
|
||||
c, "SELECT datname FROM pg_database WHERE datname = 'foo'", port=None
|
||||
)
|
||||
|
||||
|
||||
class TestCreateDb(TestCase):
|
||||
|
||||
def test_basic(self):
|
||||
c = MagicMock()
|
||||
mod.create_db(c, "foo", checkfirst=False)
|
||||
c.sudo.assert_called_once_with("createdb foo", user="postgres")
|
||||
|
||||
def test_db_exists(self):
|
||||
c = MagicMock()
|
||||
|
||||
with patch.object(mod, "db_exists") as db_exists:
|
||||
db_exists.return_value = True
|
||||
|
||||
mod.create_db(c, "foo")
|
||||
db_exists.assert_called_once_with(c, "foo", port=None)
|
||||
c.sudo.assert_not_called()
|
||||
|
||||
|
||||
class TestDropDb(TestCase):
|
||||
|
||||
def test_basic(self):
|
||||
c = MagicMock()
|
||||
mod.drop_db(c, "foo", checkfirst=False)
|
||||
c.sudo.assert_called_once_with("dropdb foo", user="postgres")
|
||||
|
||||
def test_db_does_not_exist(self):
|
||||
c = MagicMock()
|
||||
|
||||
with patch.object(mod, "db_exists") as db_exists:
|
||||
db_exists.return_value = False
|
||||
|
||||
mod.drop_db(c, "foo")
|
||||
db_exists.assert_called_once_with(c, "foo")
|
||||
c.sudo.assert_not_called()
|
||||
|
||||
|
||||
class TestDumpDb(TestCase):
|
||||
|
||||
def test_basic(self):
|
||||
c = MagicMock()
|
||||
result = mod.dump_db(c, "foo")
|
||||
self.assertEqual(result, "foo.sql.gz")
|
||||
c.sudo.assert_called_once_with(
|
||||
"pg_dump foo | gzip -c > /tmp/foo.sql.gz", user="postgres"
|
||||
)
|
||||
c.run.assert_called_with("rm /tmp/foo.sql.gz")
|
|
@ -1,65 +0,0 @@
|
|||
# -*- coding: utf-8; -*-
|
||||
|
||||
from unittest import TestCase
|
||||
from unittest.mock import MagicMock, call
|
||||
|
||||
from wuttamess import ssh as mod
|
||||
|
||||
|
||||
class TestCacheHostKey(TestCase):
|
||||
|
||||
def test_root_already_cached(self):
|
||||
c = MagicMock()
|
||||
|
||||
# assume the first command runs okay
|
||||
c.run.return_value.failed = False
|
||||
mod.cache_host_key(c, "example.com")
|
||||
c.run.assert_called_once_with("ssh example.com whoami", warn=True)
|
||||
|
||||
def test_root_commands_not_allowed(self):
|
||||
c = MagicMock()
|
||||
|
||||
# assume the first command fails b/c "disallowed"
|
||||
c.run.return_value.failed = True
|
||||
c.run.return_value.stderr = "Disallowed command"
|
||||
mod.cache_host_key(c, "example.com")
|
||||
c.run.assert_called_once_with("ssh example.com whoami", warn=True)
|
||||
|
||||
def test_root_cache_key(self):
|
||||
c = MagicMock()
|
||||
|
||||
# first command fails; second command caches host key
|
||||
c.run.return_value.failed = True
|
||||
mod.cache_host_key(c, "example.com")
|
||||
c.run.assert_has_calls([call("ssh example.com whoami", warn=True)])
|
||||
c.run.assert_called_with(
|
||||
"ssh -o StrictHostKeyChecking=no example.com whoami", warn=True
|
||||
)
|
||||
|
||||
def test_user_already_cached(self):
|
||||
c = MagicMock()
|
||||
|
||||
# assume the first command runs okay
|
||||
c.sudo.return_value.failed = False
|
||||
mod.cache_host_key(c, "example.com", user="foo")
|
||||
c.sudo.assert_called_once_with("ssh example.com whoami", user="foo", warn=True)
|
||||
|
||||
def test_user_commands_not_allowed(self):
|
||||
c = MagicMock()
|
||||
|
||||
# assume the first command fails b/c "disallowed"
|
||||
c.sudo.return_value.failed = True
|
||||
c.sudo.return_value.stderr = "Disallowed command"
|
||||
mod.cache_host_key(c, "example.com", user="foo")
|
||||
c.sudo.assert_called_once_with("ssh example.com whoami", user="foo", warn=True)
|
||||
|
||||
def test_user_cache_key(self):
|
||||
c = MagicMock()
|
||||
|
||||
# first command fails; second command caches host key
|
||||
c.sudo.return_value.failed = True
|
||||
mod.cache_host_key(c, "example.com", user="foo")
|
||||
c.sudo.assert_has_calls([call("ssh example.com whoami", user="foo", warn=True)])
|
||||
c.sudo.assert_called_with(
|
||||
"ssh -o StrictHostKeyChecking=no example.com whoami", user="foo", warn=True
|
||||
)
|
|
@ -12,26 +12,18 @@ from wuttamess import sync as mod
|
|||
class TestMakeRoot(TestCase):
|
||||
|
||||
def test_basic(self):
|
||||
root = mod.make_root("files")
|
||||
root = mod.make_root('files')
|
||||
self.assertIsInstance(root, SyncedRoot)
|
||||
self.assertEqual(root.src, Path("files"))
|
||||
self.assertEqual(root.dest, Path("/"))
|
||||
|
||||
|
||||
class TestMakeSelector(TestCase):
|
||||
|
||||
def test_basic(self):
|
||||
selector = mod.make_selector("etc/postfix")
|
||||
self.assertIsInstance(selector, ItemSelector)
|
||||
self.assertEqual(selector.subpath, Path("etc/postfix"))
|
||||
self.assertEqual(root.src, Path('files'))
|
||||
self.assertEqual(root.dest, Path('/'))
|
||||
|
||||
|
||||
class TestIsync(TestCase):
|
||||
|
||||
def test_basic(self):
|
||||
c = MagicMock()
|
||||
root = mod.make_root("files")
|
||||
with patch.object(mod, "fabsync") as fabsync:
|
||||
root = mod.make_root('files')
|
||||
with patch.object(mod, 'fabsync') as fabsync:
|
||||
fabsync.ItemSelector = ItemSelector
|
||||
|
||||
# nothing to sync
|
||||
|
@ -42,46 +34,34 @@ class TestIsync(TestCase):
|
|||
|
||||
# sync one file
|
||||
fabsync.isync.reset_mock()
|
||||
result = MagicMock(path="/foo", modified=True)
|
||||
result = MagicMock(path='/foo', modified=True)
|
||||
fabsync.isync.return_value = [result]
|
||||
results = list(mod.isync(c, root))
|
||||
self.assertEqual(results, [result])
|
||||
fabsync.isync.assert_called_once_with(c, root)
|
||||
|
||||
# sync with selector (subpath)
|
||||
# sync with selector
|
||||
fabsync.isync.reset_mock()
|
||||
result = MagicMock(path="/foo", modified=True)
|
||||
result = MagicMock(path='/foo', modified=True)
|
||||
fabsync.isync.return_value = [result]
|
||||
results = list(mod.isync(c, root, "foo"))
|
||||
results = list(mod.isync(c, root, 'foo'))
|
||||
self.assertEqual(results, [result])
|
||||
fabsync.isync.assert_called_once_with(
|
||||
c, root, selector=fabsync.ItemSelector.new("foo")
|
||||
)
|
||||
|
||||
# sync with selector (subpath + tags)
|
||||
fabsync.isync.reset_mock()
|
||||
result = MagicMock(path="/foo", modified=True)
|
||||
fabsync.isync.return_value = [result]
|
||||
results = list(mod.isync(c, root, "foo", tags={"bar"}))
|
||||
self.assertEqual(results, [result])
|
||||
fabsync.isync.assert_called_once_with(
|
||||
c, root, selector=fabsync.ItemSelector.new("foo", tags={"bar"})
|
||||
)
|
||||
fabsync.isync.assert_called_once_with(c, root, selector=fabsync.ItemSelector.new('foo'))
|
||||
|
||||
|
||||
class TestCheckIsync(TestCase):
|
||||
|
||||
def test_basic(self):
|
||||
c = MagicMock()
|
||||
root = mod.make_root("files")
|
||||
with patch.object(mod, "isync") as isync:
|
||||
root = mod.make_root('files')
|
||||
with patch.object(mod, 'isync') as isync:
|
||||
|
||||
# file(s) modified
|
||||
result = MagicMock(path="/foo", modified=True)
|
||||
result = MagicMock(path='/foo', modified=True)
|
||||
isync.return_value = [result]
|
||||
self.assertTrue(mod.check_isync(c, root))
|
||||
|
||||
# not modified
|
||||
result = MagicMock(path="/foo", modified=False)
|
||||
result = MagicMock(path='/foo', modified=False)
|
||||
isync.return_value = [result]
|
||||
self.assertFalse(mod.check_isync(c, root))
|
||||
|
|
|
@ -1,81 +0,0 @@
|
|||
# -*- coding: utf-8; -*-
|
||||
|
||||
import os
|
||||
from unittest import TestCase
|
||||
from unittest.mock import MagicMock, patch, call
|
||||
|
||||
from wuttamess import util as mod
|
||||
|
||||
|
||||
class TestExists(TestCase):
|
||||
|
||||
def test_basic(self):
|
||||
c = MagicMock()
|
||||
mod.exists(c, "/foo")
|
||||
c.run.assert_called_once_with("test -e /foo", warn=True)
|
||||
|
||||
|
||||
class TestHomePath(TestCase):
|
||||
|
||||
def test_basic(self):
|
||||
c = MagicMock()
|
||||
c.run.return_value.stdout = "/home/foo"
|
||||
path = mod.get_home_path(c, user="foo")
|
||||
self.assertEqual(path, "/home/foo")
|
||||
|
||||
|
||||
class TestIsSymlink(TestCase):
|
||||
|
||||
def test_yes(self):
|
||||
c = MagicMock()
|
||||
c.run.return_value.failed = False
|
||||
self.assertTrue(mod.is_symlink(c, "/foo"))
|
||||
c.run.assert_called_once_with('test -L "$(echo /foo)"', warn=True)
|
||||
|
||||
def test_no(self):
|
||||
c = MagicMock()
|
||||
c.run.return_value.failed = True
|
||||
self.assertFalse(mod.is_symlink(c, "/foo"))
|
||||
c.run.assert_called_once_with('test -L "$(echo /foo)"', warn=True)
|
||||
|
||||
|
||||
class TestMakoRenderer(TestCase):
|
||||
|
||||
def test_basic(self):
|
||||
c = MagicMock()
|
||||
renderer = mod.mako_renderer(c, env={"machine_is_live": True})
|
||||
here = os.path.dirname(__file__)
|
||||
path = os.path.join(here, "files", "bar", "baz")
|
||||
rendered = renderer(path, vars={})
|
||||
self.assertEqual(rendered, "machine_is_live = True")
|
||||
|
||||
|
||||
class TestSetTimezone(TestCase):
|
||||
|
||||
def test_symlink(self):
|
||||
c = MagicMock()
|
||||
with patch.object(mod, "is_symlink") as is_symlink:
|
||||
is_symlink.return_value = True
|
||||
mod.set_timezone(c, "America/Chicago")
|
||||
c.run.assert_has_calls(
|
||||
[
|
||||
call("bash -c 'echo America/Chicago > /etc/timezone'"),
|
||||
]
|
||||
)
|
||||
c.run.assert_called_with(
|
||||
"ln -sf /usr/share/zoneinfo/America/Chicago /etc/localtime"
|
||||
)
|
||||
|
||||
def test_not_symlink(self):
|
||||
c = MagicMock()
|
||||
with patch.object(mod, "is_symlink") as is_symlink:
|
||||
is_symlink.return_value = False
|
||||
mod.set_timezone(c, "America/Chicago")
|
||||
c.run.assert_has_calls(
|
||||
[
|
||||
call("bash -c 'echo America/Chicago > /etc/timezone'"),
|
||||
]
|
||||
)
|
||||
c.run.assert_called_with(
|
||||
"cp /usr/share/zoneinfo/America/Chicago /etc/localtime"
|
||||
)
|
|
@ -1,40 +0,0 @@
|
|||
# -*- coding: utf-8; -*-
|
||||
|
||||
from unittest import TestCase
|
||||
from unittest.mock import MagicMock, patch, call
|
||||
|
||||
from wuttamess import wutta as mod
|
||||
|
||||
|
||||
class TestPurgeEmailSettings(TestCase):
|
||||
|
||||
def test_basic(self):
|
||||
c = MagicMock()
|
||||
sql = MagicMock()
|
||||
postgres = MagicMock(sql=sql)
|
||||
with patch.object(mod, "postgres", new=postgres):
|
||||
mod.purge_email_settings(c, "testy", appname="wuttatest")
|
||||
sql.assert_has_calls(
|
||||
[
|
||||
call(
|
||||
c,
|
||||
"delete from setting where name like 'wuttatest.email.%.sender';",
|
||||
database="testy",
|
||||
),
|
||||
call(
|
||||
c,
|
||||
"delete from setting where name like 'wuttatest.email.%.to';",
|
||||
database="testy",
|
||||
),
|
||||
call(
|
||||
c,
|
||||
"delete from setting where name like 'wuttatest.email.%.cc';",
|
||||
database="testy",
|
||||
),
|
||||
call(
|
||||
c,
|
||||
"delete from setting where name like 'wuttatest.email.%.bcc';",
|
||||
database="testy",
|
||||
),
|
||||
]
|
||||
)
|
4
tox.ini
4
tox.ini
|
@ -6,10 +6,6 @@ envlist = py38, py39, py310, py311
|
|||
extras = tests
|
||||
commands = pytest {posargs}
|
||||
|
||||
[testenv:pylint]
|
||||
basepython = python3.11
|
||||
commands = pylint wuttamess
|
||||
|
||||
[testenv:coverage]
|
||||
basepython = python3.11
|
||||
commands = pytest --cov=wuttamess --cov-report=html --cov-fail-under=100
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue