3
0
Fork 0

feat: add problem checks + handler feature

the basic idea is to run nightly checks and send email if problems are
found.  it should also support variations on that theme,
e.g. configuring a check to only run on certain weekdays.
This commit is contained in:
Lance Edgar 2025-08-10 11:07:30 -05:00
parent eb6ad9884c
commit 7550a7a860
12 changed files with 900 additions and 1 deletions

View file

@ -0,0 +1,6 @@
``wuttjamaican.cli.problems``
=============================
.. automodule:: wuttjamaican.cli.problems
:members:

View file

@ -0,0 +1,6 @@
``wuttjamaican.problems``
=========================
.. automodule:: wuttjamaican.problems
:members:

View file

@ -243,6 +243,24 @@ Glossary
modules etc. which is installed via ``pip``. See also
:doc:`narr/install/pkg`.
problem check
This refers to a special "report" which runs (usually) on a
nighty basis. Such a report is only looking for "problems"
and if any are found, an email notification is sent.
Apps can define custom problem checks (based on
:class:`~wuttjamaican.problems.ProblemCheck`), which can then be
ran via the :term:`problem handler`.
problem handler
The :term:`handler` responsible for finding and reporting on
"problems" with the data or system. Most typically this runs
nightly :term:`checks <problem check>` and will send email if
problems are found.
Default handler is
:class:`~wuttjamaican.problems.ProblemHandler`.
provider
Python object which "provides" extra functionality to some
portion of the :term:`app`. Similar to a "plugin" concept; see

View file

@ -70,6 +70,7 @@ Contents
api/wuttjamaican.cli.base
api/wuttjamaican.cli.make_appdir
api/wuttjamaican.cli.make_uuid
api/wuttjamaican.cli.problems
api/wuttjamaican.conf
api/wuttjamaican.db
api/wuttjamaican.db.conf
@ -86,6 +87,7 @@ Contents
api/wuttjamaican.exc
api/wuttjamaican.install
api/wuttjamaican.people
api/wuttjamaican.problems
api/wuttjamaican.progress
api/wuttjamaican.reports
api/wuttjamaican.testing

View file

@ -51,3 +51,15 @@ Print a new universally-unique identifier to standard output.
Defined in: :mod:`wuttjamaican.cli.make_uuid`
.. program-output:: wutta make-uuid --help
.. _wutta-problems:
``wutta problems``
------------------
Find and report on problems with the data or system.
Defined in: :mod:`wuttjamaican.cli.problems`
.. program-output:: wutta problems --help

View file

@ -26,6 +26,7 @@ WuttJamaican - app handler
import datetime
import importlib
import logging
import os
import sys
import warnings
@ -37,6 +38,9 @@ from wuttjamaican.util import (load_entry_points, load_object,
progress_loop, resource_path, simple_error)
log = logging.getLogger(__name__)
class AppHandler:
"""
Base class and default implementation for top-level :term:`app
@ -88,6 +92,7 @@ class AppHandler:
default_email_handler_spec = 'wuttjamaican.email:EmailHandler'
default_install_handler_spec = 'wuttjamaican.install:InstallHandler'
default_people_handler_spec = 'wuttjamaican.people:PeopleHandler'
default_problem_handler_spec = 'wuttjamaican.problems:ProblemHandler'
default_report_handler_spec = 'wuttjamaican.reports:ReportHandler'
def __init__(self, config):
@ -989,6 +994,20 @@ class AppHandler:
self.handlers['people'] = factory(self.config, **kwargs)
return self.handlers['people']
def get_problem_handler(self, **kwargs):
"""
Get the configured :term:`problem handler`.
:rtype: :class:`~wuttjamaican.problems.ProblemHandler`
"""
if 'problems' not in self.handlers:
spec = self.config.get(f'{self.appname}.problems.handler',
default=self.default_problem_handler_spec)
log.debug("problem_handler spec is: %s", spec)
factory = self.load_object(spec)
self.handlers['problems'] = factory(self.config, **kwargs)
return self.handlers['problems']
def get_report_handler(self, **kwargs):
"""
Get the configured :term:`report handler`.

View file

@ -2,7 +2,7 @@
################################################################################
#
# WuttJamaican -- Base package for Wutta Framework
# Copyright © 2023-2024 Lance Edgar
# Copyright © 2023-2025 Lance Edgar
#
# This file is part of Wutta Framework.
#
@ -36,6 +36,7 @@ from .base import wutta_typer, make_typer
# nb. must bring in all modules for discovery to work
from . import make_appdir
from . import make_uuid
from . import problems
# discover more commands, installed via other packages
from .base import typer_eager_imports

View file

@ -0,0 +1,88 @@
# -*- coding: utf-8; -*-
################################################################################
#
# WuttJamaican -- Base package for Wutta Framework
# Copyright © 2023-2025 Lance Edgar
#
# This file is part of Wutta Framework.
#
# Wutta Framework is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# Wutta Framework is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License along with
# Wutta Framework. If not, see <http://www.gnu.org/licenses/>.
#
################################################################################
"""
See also: :ref:`wutta-problems`
"""
import sys
from typing import List
import rich
import typer
from typing_extensions import Annotated
from .base import wutta_typer
@wutta_typer.command()
def problems(
ctx: typer.Context,
systems: Annotated[
List[str],
typer.Option('--system', '-s',
help="System for which to perform checks; can be specified more "
"than once. If not specified, all systems are assumed.")] = None,
problems: Annotated[
List[str],
typer.Option('--problem', '-p',
help="Identify a particular problem check; can be specified "
"more than once. If not specified, all checks are assumed.")] = None,
list_checks: Annotated[
bool,
typer.Option('--list', '-l',
help="List available problem checks; optionally filtered per --system and --problem")] = False,
):
"""
Find and report on problems with the data or system.
"""
config = ctx.parent.wutta_config
app = config.get_app()
handler = app.get_problem_handler()
# try to warn user if unknown system is specified; but otherwise ignore
supported = handler.get_supported_systems()
for key in (systems or []):
if key not in supported:
rich.print(f"\n[bold yellow]No problem reports exist for system: {key}[/bold yellow]")
checks = handler.filter_problem_checks(systems=systems, problems=problems)
if list_checks:
count = 0
organized = handler.organize_problem_checks(checks)
for system in sorted(organized):
rich.print(f"\n[bold]{system}[/bold]")
sys.stdout.write("-------------------------\n")
for problem in sorted(organized[system]):
sys.stdout.write(f"{problem}\n")
count += 1
sys.stdout.write("\n")
sys.stdout.write(f"found {count} problem checks\n")
else:
handler.run_problem_checks(checks)

View file

@ -0,0 +1,418 @@
# -*- coding: utf-8; -*-
################################################################################
#
# WuttJamaican -- Base package for Wutta Framework
# Copyright © 2023-2025 Lance Edgar
#
# This file is part of Wutta Framework.
#
# Wutta Framework is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# Wutta Framework is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License along with
# Wutta Framework. If not, see <http://www.gnu.org/licenses/>.
#
################################################################################
"""
Problem Checks + Handler
"""
import calendar
import datetime
import importlib
import logging
from wuttjamaican.app import GenericHandler
log = logging.getLogger(__name__)
class ProblemCheck:
"""
Base class for :term:`problem checks <problem check>`.
Each subclass must define logic for discovery of problems,
according to its purpose; see :meth:`find_problems()`.
If the check does find problems, and an email is to be sent, the
check instance is also able to affect that email somewhat, e.g. by
adding an attachment. See :meth:`get_email_context()` and
:meth:`make_email_attachments()`.
:param config: App :term:`config object`.
"""
def __init__(self, config):
self.config = config
self.app = self.config.get_app()
@property
def system_key(self):
"""
Key identifying which "system" the check pertains to.
Many apps may only have one "system" which corresponds to the
app itself. However some apps may integrate with other
systems and have ability/need to check for problems on those
systems as well.
See also :attr:`problem_key` and :attr:`title`.
"""
raise AttributeError(f"system_key not defined for {self.__class__}")
@property
def problem_key(self):
"""
Key identifying this problem check.
This key must be unique within the context of the "system" it
pertains to.
See also :attr:`system_key` and :attr:`title`.
"""
raise AttributeError(f"problem_key not defined for {self.__class__}")
@property
def title(self):
"""
Display title for the problem check.
See also :attr:`system_key` and :attr:`problem_key`.
"""
raise AttributeError(f"title not defined for {self.__class__}")
def find_problems(self):
"""
Find all problems relevant to this check.
This should always return a list, although no constraint is
made on what type of elements it contains.
:returns: List of problems found.
"""
return []
def get_email_context(self, problems, **kwargs):
"""
This can be used to add extra context for a specific check's
report email template.
:param problems: List of problems found.
:returns: Context dict for email template.
"""
return kwargs
def make_email_attachments(self, context):
"""
Optionally generate some attachment(s) for the report email.
:param context: Context dict for the report email. In
particular see ``context['problems']`` for main data.
:returns: List of attachments, if applicable.
"""
class ProblemHandler(GenericHandler):
"""
Base class and default implementation for the :term:`problem
handler`.
There is normally no need to instantiate this yourself; instead
call :meth:`~wuttjamaican.app.AppHandler.get_problem_handler()` on
the :term:`app handler`.
The problem handler can be used to discover and run :term:`problem
checks <problem check>`. In particular see:
* :meth:`get_all_problem_checks()`
* :meth:`filter_problem_checks()`
* :meth:`run_problem_checks()`
"""
def get_all_problem_checks(self):
"""
Return a list of all :term:`problem checks <problem check>`
which are "available" according to config.
See also :meth:`filter_problem_checks()`.
:returns: List of :class:`ProblemCheck` classes.
"""
checks = []
modules = self.config.get_list(f'{self.config.appname}.problems.modules',
default=['wuttjamaican.problems'])
for module_path in modules:
module = importlib.import_module(module_path)
for name in dir(module):
obj = getattr(module, name)
if (isinstance(obj, type) and
issubclass(obj, ProblemCheck) and
obj is not ProblemCheck):
checks.append(obj)
return checks
def filter_problem_checks(self, systems=None, problems=None):
"""
Return a list of all :term:`problem checks <problem check>`
which match the given criteria.
This first calls :meth:`get_all_problem_checks()` and then
filters the result according to params.
:param systems: Optional list of "system keys" which a problem check
must match, in order to be included in return value.
:param problems: Optional list of "problem keys" which a problem check
must match, in order to be included in return value.
:returns: List of :class:`ProblemCheck` classes; may be an
empty list.
"""
all_checks = self.get_all_problem_checks()
if not (systems or problems):
return all_checks
matches = []
for check in all_checks:
if not systems or check.system_key in systems:
if not problems or check.problem_key in problems:
matches.append(check)
return matches
def get_supported_systems(self, checks=None):
"""
Returns list of keys for all systems which are supported by
any of the problem checks.
:param checks: Optional list of :class:`ProblemCheck` classes.
If not specified, calls :meth:`get_all_problem_checks()`.
:returns: List of system keys.
"""
checks = self.get_all_problem_checks()
return sorted(set([check.system_key for check in checks]))
def get_system_title(self, system_key):
"""
Returns the display title for a given system.
The default logic returns the ``system_key`` as-is; subclass
may override as needed.
:param system_key: Key identifying a checked system.
:returns: Display title for the system.
"""
return system_key
def is_enabled(self, check):
"""
Returns boolean indicating if the given problem check is
enabled, per config.
:param check: :class:`ProblemCheck` class or instance.
:returns: ``True`` if enabled; ``False`` if not.
"""
key = f'{check.system_key}.{check.problem_key}'
enabled = self.config.get_bool(f'{self.config.appname}.problems.{key}.enabled')
if enabled is not None:
return enabled
return True
def should_run_for_weekday(self, check, weekday):
"""
Returns boolean indicating if the given problem check is
configured to run for the given weekday.
:param check: :class:`ProblemCheck` class or instance.
:param weekday: Integer corresponding to a particular weekday.
Uses the same conventions as Python itself, i.e. Monday is
represented as 0 and Sunday as 6.
:returns: ``True`` if check should run; ``False`` if not.
"""
key = f'{check.system_key}.{check.problem_key}'
enabled = self.config.get_bool(f'{self.config.appname}.problems.{key}.day{weekday}')
if enabled is not None:
return enabled
return True
def organize_problem_checks(self, checks):
"""
Organize the problem checks by grouping them according to
their :attr:`~ProblemCheck.system_key`.
:param checks: List of :class:`ProblemCheck` classes.
:returns: Dict with "system" keys; each value is a list of
problem checks pertaining to that system.
"""
organized = {}
for check in checks:
system = organized.setdefault(check.system_key, {})
system[check.problem_key] = check
return organized
def run_problem_checks(self, checks, force=False):
"""
Run the given problem checks.
This calls :meth:`run_problem_check()` for each, so config is
consulted to determine if each check should actually run -
unless ``force=True``.
:param checks: List of :class:`ProblemCheck` classes.
:param force: If true, run the checks regardless of whether
each is configured to run.
"""
organized = self.organize_problem_checks(checks)
for system_key in sorted(organized):
system = organized[system_key]
for problem_key in sorted(system):
check = system[problem_key]
self.run_problem_check(check, force=force)
def run_problem_check(self, check, force=False):
"""
Run the given problem check, if it is enabled and configured
to run for the current weekday.
Running a check involves calling :meth:`find_problems()` and
possibly :meth:`send_problem_report()`.
See also :meth:`run_problem_checks()`.
:param check: :class:`ProblemCheck` class.
:param force: If true, run the check regardless of whether it
is configured to run.
"""
key = f'{check.system_key}.{check.problem_key}'
log.info("running problem check: %s", key)
if not self.is_enabled(check):
log.debug("problem check is not enabled: %s", key)
if not force:
return
weekday = datetime.date.today().weekday()
if not self.should_run_for_weekday(check, weekday):
log.debug("problem check is not scheduled for %s: %s",
calendar.day_name[weekday], key)
if not force:
return
check_instance = check(self.config)
problems = self.find_problems(check_instance)
log.info("found %s problems", len(problems))
if problems:
self.send_problem_report(check_instance, problems)
return problems
def find_problems(self, check):
"""
Execute the given check to find relevant problems.
This mostly calls :meth:`ProblemCheck.find_problems()`
although subclass may override if needed.
This should always return a list, although no constraint is
made on what type of elements it contains.
:param check: :class:`ProblemCheck` instance.
:returns: List of problems found.
"""
return check.find_problems() or []
def get_email_key(self, check):
"""
Return the "email key" to be used when sending report email
resulting from the given problem check.
This follows a convention using the check's
:attr:`~ProblemCheck.system_key` and
:attr:`~ProblemCheck.problem_key`.
This is called by :meth:`send_problem_report()`.
:param check: :class:`ProblemCheck` class or instance.
:returns: Config key for problem report email message.
"""
return f'{check.system_key}_problems_{check.problem_key}'
def send_problem_report(self, check, problems):
"""
Send an email with details of the given problem check report.
This calls :meth:`get_email_key()` to determine which key to
use for sending email.
It also calls :meth:`get_global_email_context()` and
:meth:`get_check_email_context()` to build the email template
context.
And it calls :meth:`ProblemCheck.make_email_attachments()` to
allow the check to provide message attachments.
:param check: :class:`ProblemCheck` instance.
:param problems: List of problems found.
"""
context = self.get_global_email_context()
context = self.get_check_email_context(check, problems, **context)
context.update({
'config': self.config,
'app': self.app,
'check': check,
'problems': problems,
})
email_key = self.get_email_key(check)
attachments = check.make_email_attachments(context)
self.app.send_email(email_key, context,
default_subject=check.title,
attachments=attachments)
def get_global_email_context(self, **kwargs):
"""
This can be used to add extra context for all email report
templates, regardless of which problem check is involved.
:returns: Context dict for all email templates.
"""
return kwargs
def get_check_email_context(self, check, problems, **kwargs):
"""
This can be used to add extra context for a specific check's
report email template.
Note that this calls :meth:`ProblemCheck.get_email_context()`
and in many cases that is where customizations should live.
:param check: :class:`ProblemCheck` instance.
:param problems: List of problems found.
:returns: Context dict for email template.
"""
kwargs['system_title'] = self.get_system_title(check.system_key)
kwargs = check.get_email_context(problems, **kwargs)
return kwargs

View file

@ -0,0 +1,53 @@
# -*- coding: utf-8; -*-
from unittest.mock import Mock, patch
from wuttjamaican.testing import ConfigTestCase
from wuttjamaican.cli import problems as mod
from wuttjamaican.problems import ProblemHandler, ProblemCheck
class FakeCheck(ProblemCheck):
system_key = 'wuttatest'
problem_key = 'fake_check'
title = "Fake problem check"
class TestProblems(ConfigTestCase):
def test_basic(self):
ctx = Mock()
ctx.parent.wutta_config = self.config
# nb. avoid printing to console
with patch.object(mod.rich, 'print') as rich_print:
# nb. use fake check
with patch.object(ProblemHandler, 'get_all_problem_checks', return_value=[FakeCheck]):
with patch.object(ProblemHandler, 'run_problem_checks') as run_problem_checks:
# list problem checks
orig_organize = ProblemHandler.organize_problem_checks
def mock_organize(checks):
return orig_organize(None, checks)
with patch.object(ProblemHandler, 'organize_problem_checks', side_effect=mock_organize) as organize_problem_checks:
mod.problems(ctx, list_checks=True)
organize_problem_checks.assert_called_once_with([FakeCheck])
run_problem_checks.assert_not_called()
# warning if unknown system key requested
rich_print.reset_mock()
# nb. just --list for convenience
# note that since we also specify invalid --system, no checks will
# match and hence nothing significant will be printed to stdout
mod.problems(ctx, list_checks=True, systems=['craziness'])
rich_print.assert_called_once()
self.assertEqual(len(rich_print.call_args.args), 1)
self.assertIn("No problem reports exist for system", rich_print.call_args.args[0])
self.assertEqual(len(rich_print.call_args.kwargs), 0)
run_problem_checks.assert_not_called()
# run problem checks
mod.problems(ctx)
run_problem_checks.assert_called_once_with([FakeCheck])

View file

@ -634,6 +634,12 @@ app_title = WuttaTest
people = self.app.get_people_handler()
self.assertIsInstance(people, PeopleHandler)
def test_get_problem_handler(self):
from wuttjamaican.problems import ProblemHandler
handler = self.app.get_problem_handler()
self.assertIsInstance(handler, ProblemHandler)
def test_get_report_handler(self):
from wuttjamaican.reports import ReportHandler

270
tests/test_problems.py Normal file
View file

@ -0,0 +1,270 @@
# -*- coding: utf-8; -*-
import datetime
from unittest.mock import patch
from wuttjamaican import problems as mod
from wuttjamaican.testing import ConfigTestCase
class TestProblemCheck(ConfigTestCase):
def make_check(self):
return mod.ProblemCheck(self.config)
def test_system_key(self):
check = self.make_check()
self.assertRaises(AttributeError, getattr, check, 'system_key')
def test_problem_key(self):
check = self.make_check()
self.assertRaises(AttributeError, getattr, check, 'problem_key')
def test_title(self):
check = self.make_check()
self.assertRaises(AttributeError, getattr, check, 'title')
def test_find_problems(self):
check = self.make_check()
problems = check.find_problems()
self.assertEqual(problems, [])
def test_get_email_context(self):
check = self.make_check()
problems = check.find_problems()
context = check.get_email_context(problems)
self.assertEqual(context, {})
def test_make_email_attachments(self):
check = self.make_check()
problems = check.find_problems()
context = check.get_email_context(problems)
attachments = check.make_email_attachments(context)
self.assertIsNone(attachments)
class FakeProblemCheck(mod.ProblemCheck):
system_key = 'wuttatest'
problem_key = 'fake_check'
title = "Fake problem check"
# def find_problems(self):
# return [{'foo': 'bar'}]
class TestProblemHandler(ConfigTestCase):
def setUp(self):
super().setUp()
self.handler = self.make_handler()
def make_handler(self):
return mod.ProblemHandler(self.config)
def test_get_all_problem_checks(self):
# no checks by default
checks = self.handler.get_all_problem_checks()
self.assertIsInstance(checks, list)
self.assertEqual(len(checks), 0)
# but let's configure our fake check
self.config.setdefault('wutta.problems.modules', 'tests.test_problems')
checks = self.handler.get_all_problem_checks()
self.assertIsInstance(checks, list)
self.assertEqual(len(checks), 1)
def test_filtered_problem_checks(self):
# no checks by default
checks = self.handler.filter_problem_checks()
self.assertIsInstance(checks, list)
self.assertEqual(len(checks), 0)
# but let's configure our fake check
self.config.setdefault('wutta.problems.modules', 'tests.test_problems')
checks = self.handler.filter_problem_checks()
self.assertIsInstance(checks, list)
self.assertEqual(len(checks), 1)
# filter by system_key
checks = self.handler.filter_problem_checks(systems=['wuttatest'])
self.assertEqual(len(checks), 1)
checks = self.handler.filter_problem_checks(systems=['something_else'])
self.assertEqual(len(checks), 0)
# filter by problem_key
checks = self.handler.filter_problem_checks(problems=['fake_check'])
self.assertEqual(len(checks), 1)
checks = self.handler.filter_problem_checks(problems=['something_else'])
self.assertEqual(len(checks), 0)
# filter by both
checks = self.handler.filter_problem_checks(systems=['wuttatest'], problems=['fake_check'])
self.assertEqual(len(checks), 1)
checks = self.handler.filter_problem_checks(systems=['wuttatest'], problems=['bad_check'])
self.assertEqual(len(checks), 0)
def test_get_supported_systems(self):
# no checks by default
systems = self.handler.get_supported_systems()
self.assertIsInstance(systems, list)
self.assertEqual(len(systems), 0)
# but let's configure our fake check
self.config.setdefault('wutta.problems.modules', 'tests.test_problems')
systems = self.handler.get_supported_systems()
self.assertIsInstance(systems, list)
self.assertEqual(systems, ['wuttatest'])
def test_get_system_title(self):
title = self.handler.get_system_title('wutta')
self.assertEqual(title, 'wutta')
def test_is_enabled(self):
check = FakeProblemCheck(self.config)
# enabled by default
self.assertTrue(self.handler.is_enabled(check))
# config can disable
self.config.setdefault('wutta.problems.wuttatest.fake_check.enabled', 'false')
self.assertFalse(self.handler.is_enabled(check))
def test_should_run_for_weekday(self):
check = FakeProblemCheck(self.config)
# should run by default
for weekday in range(7):
self.assertTrue(self.handler.should_run_for_weekday(check, weekday))
# config can disable, e.g. for weekends
self.config.setdefault('wutta.problems.wuttatest.fake_check.day5', 'false')
self.config.setdefault('wutta.problems.wuttatest.fake_check.day6', 'false')
for weekday in range(5):
self.assertTrue(self.handler.should_run_for_weekday(check, weekday))
for weekday in (5, 6):
self.assertFalse(self.handler.should_run_for_weekday(check, weekday))
def test_organize_problem_checks(self):
checks = [FakeProblemCheck]
organized = self.handler.organize_problem_checks(checks)
self.assertIsInstance(organized, dict)
self.assertEqual(list(organized), ['wuttatest'])
self.assertIsInstance(organized['wuttatest'], dict)
self.assertEqual(list(organized['wuttatest']), ['fake_check'])
self.assertIs(organized['wuttatest']['fake_check'], FakeProblemCheck)
def test_find_problems(self):
check = FakeProblemCheck(self.config)
problems = self.handler.find_problems(check)
self.assertEqual(problems, [])
def test_get_email_key(self):
check = FakeProblemCheck(self.config)
key = self.handler.get_email_key(check)
self.assertEqual(key, 'wuttatest_problems_fake_check')
def test_get_global_email_context(self):
context = self.handler.get_global_email_context()
self.assertEqual(context, {})
def test_get_check_email_context(self):
check = FakeProblemCheck(self.config)
problems = []
context = self.handler.get_check_email_context(check, problems)
self.assertEqual(context, {'system_title': 'wuttatest'})
def test_send_problem_report(self):
check = FakeProblemCheck(self.config)
problems = []
with patch.object(self.app, 'send_email') as send_email:
self.handler.send_problem_report(check, problems)
send_email.assert_called_once_with('wuttatest_problems_fake_check', {
'system_title': 'wuttatest',
'config': self.config,
'app': self.app,
'check': check,
'problems': problems,
}, default_subject="Fake problem check", attachments=None)
def test_run_problem_check(self):
with patch.object(FakeProblemCheck, 'find_problems') as find_problems:
with patch.object(self.handler, 'send_problem_report') as send_problem_report:
# check runs by default
find_problems.return_value = [{'foo': 'bar'}]
problems = self.handler.run_problem_check(FakeProblemCheck)
self.assertEqual(problems, [{'foo': 'bar'}])
find_problems.assert_called_once_with()
send_problem_report.assert_called_once()
# does not run if generally disabled
find_problems.reset_mock()
send_problem_report.reset_mock()
with patch.object(self.handler, 'is_enabled', return_value=False):
problems = self.handler.run_problem_check(FakeProblemCheck)
self.assertIsNone(problems)
find_problems.assert_not_called()
send_problem_report.assert_not_called()
# unless caller gives force flag
problems = self.handler.run_problem_check(FakeProblemCheck, force=True)
self.assertEqual(problems, [{'foo': 'bar'}])
find_problems.assert_called_once_with()
send_problem_report.assert_called_once()
# does not run if disabled for weekday
find_problems.reset_mock()
send_problem_report.reset_mock()
weekday = datetime.date.today().weekday()
self.config.setdefault(f'wutta.problems.wuttatest.fake_check.day{weekday}', 'false')
problems = self.handler.run_problem_check(FakeProblemCheck)
self.assertIsNone(problems)
find_problems.assert_not_called()
send_problem_report.assert_not_called()
# unless caller gives force flag
problems = self.handler.run_problem_check(FakeProblemCheck, force=True)
self.assertEqual(problems, [{'foo': 'bar'}])
find_problems.assert_called_once_with()
send_problem_report.assert_called_once()
def test_run_problem_checks(self):
with patch.object(FakeProblemCheck, 'find_problems') as find_problems:
with patch.object(self.handler, 'send_problem_report') as send_problem_report:
# check runs by default
find_problems.return_value = [{'foo': 'bar'}]
self.handler.run_problem_checks([FakeProblemCheck])
find_problems.assert_called_once_with()
send_problem_report.assert_called_once()
# does not run if generally disabled
find_problems.reset_mock()
send_problem_report.reset_mock()
with patch.object(self.handler, 'is_enabled', return_value=False):
self.handler.run_problem_checks([FakeProblemCheck])
find_problems.assert_not_called()
send_problem_report.assert_not_called()
# unless caller gives force flag
self.handler.run_problem_checks([FakeProblemCheck], force=True)
find_problems.assert_called_once_with()
send_problem_report.assert_called_once()
# does not run if disabled for weekday
find_problems.reset_mock()
send_problem_report.reset_mock()
weekday = datetime.date.today().weekday()
self.config.setdefault(f'wutta.problems.wuttatest.fake_check.day{weekday}', 'false')
self.handler.run_problem_checks([FakeProblemCheck])
find_problems.assert_not_called()
send_problem_report.assert_not_called()
# unless caller gives force flag
self.handler.run_problem_checks([FakeProblemCheck], force=True)
find_problems.assert_called_once_with()
send_problem_report.assert_called_once()