diff --git a/docs/api/wuttamess.postgres.rst b/docs/api/wuttamess.postgres.rst new file mode 100644 index 0000000..742d239 --- /dev/null +++ b/docs/api/wuttamess.postgres.rst @@ -0,0 +1,6 @@ + +``wuttamess.postgres`` +====================== + +.. automodule:: wuttamess.postgres + :members: diff --git a/docs/index.rst b/docs/index.rst index a719218..6571493 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -32,6 +32,7 @@ project. api/wuttamess api/wuttamess.apt api/wuttamess.postfix + api/wuttamess.postgres api/wuttamess.ssh api/wuttamess.sync api/wuttamess.util diff --git a/src/wuttamess/postgres.py b/src/wuttamess/postgres.py new file mode 100644 index 0000000..bc5fd49 --- /dev/null +++ b/src/wuttamess/postgres.py @@ -0,0 +1,154 @@ +# -*- coding: utf-8; -*- +################################################################################ +# +# WuttaMess -- Fabric Automation Helpers +# Copyright © 2024 Lance Edgar +# +# This file is part of Wutta Framework. +# +# Wutta Framework is free software: you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) any +# later version. +# +# Wutta Framework is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# Wutta Framework. If not, see . +# +################################################################################ +""" +PostgreSQL DB utilities +""" + + +def sql(c, sql, database='', port=None, **kwargs): + """ + Execute some SQL as the ``postgres`` user. + + :param c: Fabric connection. + + :param sql: SQL string to execute. + + :param database: Name of the database on which to execute the SQL. + If not specified, default ``postgres`` is assumed. + + :param port: Optional port for PostgreSQL; default is 5432. + """ + port = f' --port={port}' if port else '' + return c.sudo(f'psql{port} --tuples-only --no-align --command="{sql}" {database}', + user='postgres', **kwargs) + + +def user_exists(c, name, port=None): + """ + Determine if a given PostgreSQL user exists. + + :param c: Fabric connection. + + :param name: Username to check for. + + :param port: Optional port for PostgreSQL; default is 5432. + + :returns: ``True`` if user exists, else ``False``. + """ + user = sql(c, f"SELECT rolname FROM pg_roles WHERE rolname = '{name}'", port=port).stdout.strip() + return bool(user) + + +def create_user(c, name, password=None, port=None, checkfirst=True): + """ + Create a PostgreSQL user account. + + :param c: Fabric connection. + + :param name: Username to create. + + :param password: Optional password for the new user. If set, will + call :func:`set_user_password()`. + + :param port: Optional port for PostgreSQL; default is 5432. + + :param checkfirst: If true (the default), first check if user + exists and skip creating if already present. If false, then + try to create user with no check. + """ + if not checkfirst or not user_exists(c, name, port=port): + portarg = f' --port={port}' if port else '' + c.sudo(f'createuser{portarg} --no-createrole --no-superuser {name}', + user='postgres') + if password: + set_user_password(c, name, password, port=port) + + +def set_user_password(c, name, password, port=None): + """ + Set the password for a PostgreSQL user account. + + :param c: Fabric connection. + + :param name: Username whose password is to be set. + + :param password: Password for the new user. + + :param port: Optional port for PostgreSQL; default is 5432. + """ + sql(c, f"ALTER USER \\\"{name}\\\" PASSWORD '{password}';", port=port, hide=True, echo=False) + + +def db_exists(c, name, port=None): + """ + Determine if a given PostgreSQL database exists. + + :param c: Fabric connection. + + :param name: Name of the database to check for. + + :param port: Optional port for PostgreSQL; default is 5432. + + :returns: ``True`` if database exists, else ``False``. + """ + db = sql(c, f"SELECT datname FROM pg_database WHERE datname = '{name}'", port=port).stdout.strip() + return db == name + + +def create_db(c, name, owner=None, port=None, checkfirst=True): + """ + Create a PostgreSQL database. + + :param c: Fabric connection. + + :param name: Name of the database to create. + + :param owner: Optional role name to set as owner for the database. + + :param port: Optional port for PostgreSQL; default is 5432. + + :param checkfirst: If true (the default), first check if DB exists + and skip creating if already present. If false, then try to + create DB with no check. + """ + if not checkfirst or not db_exists(c, name, port=port): + port = f' --port={port}' if port else '' + owner = f' --owner={owner}' if owner else '' + c.sudo(f'createdb{port}{owner} {name}', + user='postgres') + + +def drop_db(c, name, checkfirst=True): + """ + Drop a PostgreSQL database. + + :param c: Fabric connection. + + :param name: Name of the database to drop. + + :param checkfirst: If true (the default), first check if DB exists + and skip dropping if not present. If false, then try to drop + DB with no check. + """ + if not checkfirst or db_exists(c, name): + c.sudo(f'dropdb {name}', user='postgres') diff --git a/tests/test_postgres.py b/tests/test_postgres.py new file mode 100644 index 0000000..b6d0299 --- /dev/null +++ b/tests/test_postgres.py @@ -0,0 +1,124 @@ +# -*- coding: utf-8; -*- + +from unittest import TestCase +from unittest.mock import MagicMock, patch + +from wuttamess import postgres as mod + + +class TestSql(TestCase): + + def test_basic(self): + c = MagicMock() + mod.sql(c, "select @@version") + c.sudo.assert_called_once_with('psql --tuples-only --no-align --command="select @@version" ', + user='postgres') + + +class TestUserExists(TestCase): + + def test_user_exists(self): + c = MagicMock() + with patch.object(mod, 'sql') as sql: + sql.return_value.stdout = 'foo' + self.assertTrue(mod.user_exists(c, 'foo')) + sql.assert_called_once_with(c, "SELECT rolname FROM pg_roles WHERE rolname = 'foo'", port=None) + + def test_user_does_not_exist(self): + c = MagicMock() + with patch.object(mod, 'sql') as sql: + sql.return_value.stdout = '' + self.assertFalse(mod.user_exists(c, 'foo')) + sql.assert_called_once_with(c, "SELECT rolname FROM pg_roles WHERE rolname = 'foo'", port=None) + + +class TestCreateUser(TestCase): + + def test_basic(self): + c = MagicMock() + with patch.object(mod, 'set_user_password') as set_user_password: + mod.create_user(c, 'foo', checkfirst=False) + c.sudo.assert_called_once_with('createuser --no-createrole --no-superuser foo', + user='postgres') + set_user_password.assert_not_called() + + def test_user_exists(self): + c = MagicMock() + + with patch.object(mod, 'user_exists') as user_exists: + user_exists.return_value = True + + mod.create_user(c, 'foo') + user_exists.assert_called_once_with(c, 'foo', port=None) + c.sudo.assert_not_called() + + def test_with_password(self): + c = MagicMock() + with patch.object(mod, 'set_user_password') as set_user_password: + mod.create_user(c, 'foo', 'foopass', checkfirst=False) + c.sudo.assert_called_once_with('createuser --no-createrole --no-superuser foo', + user='postgres') + set_user_password.assert_called_once_with(c, 'foo', 'foopass', port=None) + + +class TestSetUserPassword(TestCase): + + def test_basic(self): + c = MagicMock() + with patch.object(mod, 'sql') as sql: + mod.set_user_password(c, 'foo', 'foopass') + sql.assert_called_once_with(c, "ALTER USER \\\"foo\\\" PASSWORD 'foopass';", + port=None, hide=True, echo=False) + + +class TestDbExists(TestCase): + + def test_db_exists(self): + c = MagicMock() + with patch.object(mod, 'sql') as sql: + sql.return_value.stdout = 'foo' + self.assertTrue(mod.db_exists(c, 'foo')) + sql.assert_called_once_with(c, "SELECT datname FROM pg_database WHERE datname = 'foo'", port=None) + + def test_db_does_not_exist(self): + c = MagicMock() + with patch.object(mod, 'sql') as sql: + sql.return_value.stdout = '' + self.assertFalse(mod.db_exists(c, 'foo')) + sql.assert_called_once_with(c, "SELECT datname FROM pg_database WHERE datname = 'foo'", port=None) + + +class TestCreateDb(TestCase): + + def test_basic(self): + c = MagicMock() + mod.create_db(c, 'foo', checkfirst=False) + c.sudo.assert_called_once_with('createdb foo', user='postgres') + + def test_db_exists(self): + c = MagicMock() + + with patch.object(mod, 'db_exists') as db_exists: + db_exists.return_value = True + + mod.create_db(c, 'foo') + db_exists.assert_called_once_with(c, 'foo', port=None) + c.sudo.assert_not_called() + + +class TestDropDb(TestCase): + + def test_basic(self): + c = MagicMock() + mod.drop_db(c, 'foo', checkfirst=False) + c.sudo.assert_called_once_with('dropdb foo', user='postgres') + + def test_db_does_not_exist(self): + c = MagicMock() + + with patch.object(mod, 'db_exists') as db_exists: + db_exists.return_value = False + + mod.drop_db(c, 'foo') + db_exists.assert_called_once_with(c, 'foo') + c.sudo.assert_not_called()