3
0
Fork 0

Compare commits

..

No commits in common. "0688ed4dd1522dd3b230f910e227d5023f2b3af0" and "a6bb538ce94cb1f64e7408f3a4bd5c577c5ca403" have entirely different histories.

10 changed files with 152 additions and 145 deletions

View file

@ -1,4 +1,11 @@
# -*- mode: conf; -*- # -*- mode: conf; -*-
[MESSAGES CONTROL] [MESSAGES CONTROL]
disable=fixme disable=
attribute-defined-outside-init,
fixme,
import-outside-toplevel,
too-many-arguments,
too-many-branches,
too-many-locals,
too-many-positional-arguments,

View file

@ -25,12 +25,11 @@ WuttJamaican - app handler
""" """
# pylint: disable=too-many-lines # pylint: disable=too-many-lines
import importlib
import logging import logging
import os import os
import sys import sys
import warnings import warnings
import importlib
from importlib.metadata import version
import humanize import humanize
@ -143,7 +142,7 @@ class AppHandler: # pylint: disable=too-many-public-methods
return self.get_enum() return self.get_enum()
if name == "providers": if name == "providers":
self.__dict__["providers"] = self.get_all_providers() self.providers = self.get_all_providers()
return self.providers return self.providers
for provider in self.providers.values(): for provider in self.providers.values():
@ -283,13 +282,9 @@ class AppHandler: # pylint: disable=too-many-public-methods
pkgname = modpath.split(".")[0] pkgname = modpath.split(".")[0]
try: try:
from importlib.metadata import ( # pylint: disable=import-outside-toplevel from importlib.metadata import packages_distributions
packages_distributions,
)
except ImportError: # python < 3.10 except ImportError: # python < 3.10
from importlib_metadata import ( # pylint: disable=import-outside-toplevel from importlib_metadata import packages_distributions
packages_distributions,
)
pkgmap = packages_distributions() pkgmap = packages_distributions()
if pkgname in pkgmap: if pkgname in pkgmap:
@ -311,6 +306,8 @@ class AppHandler: # pylint: disable=too-many-public-methods
:returns: Version as string. :returns: Version as string.
""" """
from importlib.metadata import version
if not dist: if not dist:
dist = self.get_distribution(obj=obj) dist = self.get_distribution(obj=obj)
if dist: if dist:
@ -340,7 +337,7 @@ class AppHandler: # pylint: disable=too-many-public-methods
usedb=False, usedb=False,
default=self.default_model_spec, default=self.default_model_spec,
) )
self.__dict__["model"] = importlib.import_module(spec) self.model = importlib.import_module(spec)
return self.model return self.model
def get_enum(self): def get_enum(self):
@ -364,7 +361,7 @@ class AppHandler: # pylint: disable=too-many-public-methods
spec = self.config.get( spec = self.config.get(
f"{self.appname}.enum_spec", usedb=False, default=self.default_enum_spec f"{self.appname}.enum_spec", usedb=False, default=self.default_enum_spec
) )
self.__dict__["enum"] = importlib.import_module(spec) self.enum = importlib.import_module(spec)
return self.enum return self.enum
def load_object(self, spec): def load_object(self, spec):
@ -499,7 +496,7 @@ class AppHandler: # pylint: disable=too-many-public-methods
:returns: SQLAlchemy session for the app DB. :returns: SQLAlchemy session for the app DB.
""" """
from .db import Session # pylint: disable=import-outside-toplevel from .db import Session
return Session(**kwargs) return Session(**kwargs)
@ -585,7 +582,7 @@ class AppHandler: # pylint: disable=too-many-public-methods
associated. Simple convenience wrapper around associated. Simple convenience wrapper around
:func:`sqlalchemy:sqlalchemy.orm.object_session()`. :func:`sqlalchemy:sqlalchemy.orm.object_session()`.
""" """
from sqlalchemy import orm # pylint: disable=import-outside-toplevel from sqlalchemy import orm
return orm.object_session(obj) return orm.object_session(obj)
@ -600,7 +597,7 @@ class AppHandler: # pylint: disable=too-many-public-methods
this method will provide a default factory in the form of this method will provide a default factory in the form of
:meth:`make_session`. :meth:`make_session`.
""" """
from .db import short_session # pylint: disable=import-outside-toplevel from .db import short_session
if "factory" not in kwargs and "config" not in kwargs: if "factory" not in kwargs and "config" not in kwargs:
kwargs["factory"] = self.make_session kwargs["factory"] = self.make_session
@ -628,7 +625,7 @@ class AppHandler: # pylint: disable=too-many-public-methods
:returns: Setting value as string, or ``None``. :returns: Setting value as string, or ``None``.
""" """
from .db import get_setting # pylint: disable=import-outside-toplevel from .db import get_setting
return get_setting(session, name) return get_setting(session, name)
@ -1117,6 +1114,7 @@ class AppProvider: # pylint: disable=too-few-public-methods
""" """
def __init__(self, config): def __init__(self, config):
if isinstance(config, AppHandler): if isinstance(config, AppHandler):
warnings.warn( warnings.warn(
"passing app handler to app provider is deprecated; " "passing app handler to app provider is deprecated; "
@ -1154,7 +1152,6 @@ class GenericHandler:
self.config = config self.config = config
self.app = self.config.get_app() self.app = self.config.get_app()
self.modules = {} self.modules = {}
self.classes = {}
@property @property
def appname(self): def appname(self):

View file

@ -108,7 +108,7 @@ class AuthHandler(GenericHandler): # pylint: disable=too-many-public-methods
:returns: :class:`~wuttjamaican.db.model.auth.User` instance, :returns: :class:`~wuttjamaican.db.model.auth.User` instance,
or ``None``. or ``None``.
""" """
from sqlalchemy import orm # pylint: disable=import-outside-toplevel from sqlalchemy import orm
model = self.app.model model = self.app.model
@ -170,6 +170,7 @@ class AuthHandler(GenericHandler): # pylint: disable=too-many-public-methods
return role return role
else: # assuming it is a string else: # assuming it is a string
# try to match on Role.uuid # try to match on Role.uuid
try: try:
role = session.get(model.Role, _uuid.UUID(key)) role = session.get(model.Role, _uuid.UUID(key))
@ -225,6 +226,7 @@ class AuthHandler(GenericHandler): # pylint: disable=too-many-public-methods
# nb. these lookups require a db session # nb. these lookups require a db session
if session: if session:
# or maybe it is a uuid # or maybe it is a uuid
if isinstance(obj, _uuid.UUID): if isinstance(obj, _uuid.UUID):
user = session.get(model.User, obj) user = session.get(model.User, obj)
@ -233,6 +235,7 @@ class AuthHandler(GenericHandler): # pylint: disable=too-many-public-methods
# or maybe it is a string # or maybe it is a string
elif isinstance(obj, str): elif isinstance(obj, str):
# try to match on User.uuid # try to match on User.uuid
try: try:
user = session.get(model.User, _uuid.UUID(obj)) user = session.get(model.User, _uuid.UUID(obj))
@ -512,7 +515,7 @@ class AuthHandler(GenericHandler): # pylint: disable=too-many-public-methods
return cache return cache
def has_permission( # pylint: disable=too-many-arguments,too-many-positional-arguments def has_permission(
self, self,
session, session,
principal, principal,

View file

@ -189,11 +189,10 @@ class WuttaConfig: # pylint: disable=too-many-instance-attributes
See also :ref:`where-config-settings-come-from`. See also :ref:`where-config-settings-come-from`.
""" """
_app = None
default_app_handler_spec = "wuttjamaican.app:AppHandler" default_app_handler_spec = "wuttjamaican.app:AppHandler"
default_engine_maker_spec = "wuttjamaican.db.conf:make_engine_from_config" default_engine_maker_spec = "wuttjamaican.db.conf:make_engine_from_config"
def __init__( # pylint: disable=too-many-arguments,too-many-positional-arguments def __init__(
self, self,
files=None, files=None,
defaults=None, defaults=None,
@ -241,10 +240,7 @@ class WuttaConfig: # pylint: disable=too-many-instance-attributes
# configure main app DB if applicable, or disable usedb flag # configure main app DB if applicable, or disable usedb flag
try: try:
from wuttjamaican.db import ( # pylint: disable=import-outside-toplevel from wuttjamaican.db import Session, get_engines
Session,
get_engines,
)
except ImportError: except ImportError:
if self.usedb: if self.usedb:
log.warning( log.warning(
@ -276,8 +272,20 @@ class WuttaConfig: # pylint: disable=too-many-instance-attributes
raise FileNotFoundError(f"could not read required config file: {path}") raise FileNotFoundError(f"could not read required config file: {path}")
return return
# write config to temp file # load all values into (yet another) temp config
temp_path = self._write_temp_config_file(config) temp_config = configparser.RawConfigParser()
for section in config.sections():
temp_config.add_section(section)
# nb. must interpolate most values but *not* for logging formatters
raw = section.startswith("formatter_")
for option in config.options(section):
temp_config.set(section, option, config.get(section, option, raw=raw))
# re-write as temp file with "final" values
fd, temp_path = tempfile.mkstemp(suffix=".ini")
os.close(fd)
with open(temp_path, "wt", encoding="utf_8") as f:
temp_config.write(f)
# and finally, load that into our main config # and finally, load that into our main config
config = configuration.config_from_ini(temp_path, read_from_file=True) config = configuration.config_from_ini(temp_path, read_from_file=True)
@ -297,24 +305,6 @@ class WuttaConfig: # pylint: disable=too-many-instance-attributes
for p in self.parse_list(includes): for p in self.parse_list(includes):
self._load_ini_configs(p, configs, require=False) self._load_ini_configs(p, configs, require=False)
def _write_temp_config_file(self, config):
# load all values into (yet another) temp config
temp_config = configparser.RawConfigParser()
for section in config.sections():
temp_config.add_section(section)
# nb. must interpolate most values but *not* for logging formatters
raw = section.startswith("formatter_")
for option in config.options(section):
temp_config.set(section, option, config.get(section, option, raw=raw))
# re-write as temp file with "final" values
fd, temp_path = tempfile.mkstemp(suffix=".ini")
os.close(fd)
with open(temp_path, "wt", encoding="utf_8") as f:
temp_config.write(f)
return temp_path
def get_prioritized_files(self): def get_prioritized_files(self):
""" """
Returns list of config files in order of priority. Returns list of config files in order of priority.
@ -343,7 +333,7 @@ class WuttaConfig: # pylint: disable=too-many-instance-attributes
# get current value, sans db # get current value, sans db
return self.get(key, usedb=False) return self.get(key, usedb=False)
def get( # pylint: disable=too-many-arguments,too-many-positional-arguments def get(
self, self,
key, key,
default=UNSPECIFIED, default=UNSPECIFIED,
@ -458,6 +448,7 @@ class WuttaConfig: # pylint: disable=too-many-instance-attributes
# read from defaults + INI files # read from defaults + INI files
value = self.configuration.get(key) value = self.configuration.get(key)
if value is not None: if value is not None:
# nb. if the "value" corresponding to the given key is in # nb. if the "value" corresponding to the given key is in
# fact a subset/dict of more config values, then we must # fact a subset/dict of more config values, then we must
# "ignore" that. so only return the value if it is *not* # "ignore" that. so only return the value if it is *not*
@ -620,6 +611,7 @@ class WuttaConfig: # pylint: disable=too-many-instance-attributes
os.remove(path) os.remove(path)
def _write_logging_config_file(self): def _write_logging_config_file(self):
# load all current values into configparser # load all current values into configparser
parser = configparser.RawConfigParser() parser = configparser.RawConfigParser()
for section, values in self.configuration.items(): for section, values in self.configuration.items():
@ -641,7 +633,7 @@ class WuttaConfig: # pylint: disable=too-many-instance-attributes
See also :doc:`/narr/handlers/app`. See also :doc:`/narr/handlers/app`.
""" """
if not self._app: if not hasattr(self, "_app"):
spec = self.get( spec = self.get(
f"{self.appname}.app.handler", f"{self.appname}.app.handler",
usedb=False, usedb=False,
@ -718,10 +710,7 @@ def generic_default_files(appname):
if sys.platform == "win32": if sys.platform == "win32":
# use pywin32 to fetch official defaults # use pywin32 to fetch official defaults
try: try:
from win32com.shell import ( # pylint: disable=import-outside-toplevel from win32com.shell import shell, shellcon
shell,
shellcon,
)
except ImportError: except ImportError:
return [] return []
@ -760,7 +749,7 @@ def generic_default_files(appname):
] ]
def get_config_paths( # pylint: disable=too-many-arguments,too-many-positional-arguments def get_config_paths(
files=None, files=None,
plus_files=None, plus_files=None,
appname="wutta", appname="wutta",
@ -884,7 +873,28 @@ def get_config_paths( # pylint: disable=too-many-arguments,too-many-positional-
# first identify any "primary" config files # first identify any "primary" config files
if files is None: if files is None:
files = _get_primary_config_files(appname, env, env_files_name, default_files) if not env_files_name:
env_files_name = f"{appname.upper()}_CONFIG_FILES"
files = env.get(env_files_name)
if files is not None:
files = files.split(os.pathsep)
elif default_files:
if callable(default_files):
files = default_files(appname) or []
elif isinstance(default_files, str):
files = [default_files]
else:
files = list(default_files)
files = [path for path in files if os.path.exists(path)]
else:
files = []
for path in generic_default_files(appname):
if os.path.exists(path):
files.append(path)
elif isinstance(files, str): elif isinstance(files, str):
files = [files] files = [files]
else: else:
@ -916,48 +926,19 @@ def get_config_paths( # pylint: disable=too-many-arguments,too-many-positional-
# which config file as part of the actual service definition in # which config file as part of the actual service definition in
# windows, so the service name is used for magic lookup here. # windows, so the service name is used for magic lookup here.
if winsvc: if winsvc:
files = _get_winsvc_config_files(appname, winsvc, files) config = configparser.ConfigParser()
config.read(files)
section = f"{appname}.config"
if config.has_section(section):
option = f"winsvc.{winsvc}"
if config.has_option(section, option):
# replace file paths with whatever config value says
files = parse_list(config.get(section, option))
return files return files
def _get_primary_config_files(appname, env, env_files_name, default_files): def make_config(
if not env_files_name:
env_files_name = f"{appname.upper()}_CONFIG_FILES"
files = env.get(env_files_name)
if files is not None:
return files.split(os.pathsep)
if default_files:
if callable(default_files):
files = default_files(appname) or []
elif isinstance(default_files, str):
files = [default_files]
else:
files = list(default_files)
return [path for path in files if os.path.exists(path)]
files = []
for path in generic_default_files(appname):
if os.path.exists(path):
files.append(path)
return files
def _get_winsvc_config_files(appname, winsvc, files):
config = configparser.ConfigParser()
config.read(files)
section = f"{appname}.config"
if config.has_section(section):
option = f"winsvc.{winsvc}"
if config.has_option(section, option):
# replace file paths with whatever config value says
files = parse_list(config.get(section, option))
return files
def make_config( # pylint: disable=too-many-arguments,too-many-positional-arguments,too-many-locals
files=None, files=None,
plus_files=None, plus_files=None,
appname="wutta", appname="wutta",

View file

@ -172,7 +172,7 @@ class Message: # pylint: disable=too-many-instance-attributes
List of file attachments for the message. List of file attachments for the message.
""" """
def __init__( # pylint: disable=too-many-arguments,too-many-positional-arguments def __init__(
self, self,
key=None, key=None,
sender=None, sender=None,
@ -277,6 +277,7 @@ class EmailHandler(GenericHandler): # pylint: disable=too-many-public-methods
# prefer configured list of template lookup paths, if set # prefer configured list of template lookup paths, if set
templates = self.config.get_list(f"{self.config.appname}.email.templates") templates = self.config.get_list(f"{self.config.appname}.email.templates")
if not templates: if not templates:
# otherwise use all available paths, from app providers # otherwise use all available paths, from app providers
available = [] available = []
for provider in self.app.providers.values(): for provider in self.app.providers.values():
@ -323,8 +324,8 @@ class EmailHandler(GenericHandler): # pylint: disable=too-many-public-methods
This calls :meth:`get_email_modules()` and for each module, it This calls :meth:`get_email_modules()` and for each module, it
discovers all the email settings it contains. discovers all the email settings it contains.
""" """
if "email_settings" not in self.classes: if not hasattr(self, "_email_settings"):
self.classes["email_settings"] = {} self._email_settings = {}
for module in self.get_email_modules(): for module in self.get_email_modules():
for name in dir(module): for name in dir(module):
obj = getattr(module, name) obj = getattr(module, name)
@ -333,9 +334,9 @@ class EmailHandler(GenericHandler): # pylint: disable=too-many-public-methods
and obj is not EmailSetting and obj is not EmailSetting
and issubclass(obj, EmailSetting) and issubclass(obj, EmailSetting)
): ):
self.classes["email_settings"][obj.__name__] = obj self._email_settings[obj.__name__] = obj
return self.classes["email_settings"] return self._email_settings
def get_email_setting(self, key, instance=True): def get_email_setting(self, key, instance=True):
""" """
@ -454,7 +455,7 @@ class EmailHandler(GenericHandler): # pylint: disable=too-many-public-methods
# fall back to global default, if present # fall back to global default, if present
return self.config.get(f"{self.config.appname}.email.default.replyto") return self.config.get(f"{self.config.appname}.email.default.replyto")
def get_auto_subject( # pylint: disable=too-many-arguments,too-many-positional-arguments def get_auto_subject(
self, key, context=None, rendered=True, setting=None, default=None self, key, context=None, rendered=True, setting=None, default=None
): ):
""" """
@ -755,7 +756,7 @@ class EmailHandler(GenericHandler): # pylint: disable=too-many-public-methods
f"{self.config.appname}.mail.send_emails", default=False f"{self.config.appname}.mail.send_emails", default=False
) )
def send_email( # pylint: disable=too-many-arguments,too-many-positional-arguments def send_email(
self, key=None, context=None, message=None, sender=None, recips=None, **kwargs self, key=None, context=None, message=None, sender=None, recips=None, **kwargs
): ):
""" """

View file

@ -85,7 +85,6 @@ class InstallHandler(GenericHandler):
app_title = None app_title = None
pypi_name = None pypi_name = None
egg_name = None egg_name = None
schema_installed = False
def __init__(self, config, **kwargs): def __init__(self, config, **kwargs):
super().__init__(config) super().__init__(config)
@ -101,6 +100,24 @@ class InstallHandler(GenericHandler):
if not self.egg_name: if not self.egg_name:
self.egg_name = self.pypi_name.replace("-", "_") self.egg_name = self.pypi_name.replace("-", "_")
def run(self):
"""
Run the interactive command-line installer.
This does the following:
* check for ``prompt_toolkit`` and maybe ask to install it
* define the template lookup paths, for making config files
* call :meth:`show_welcome()`
* call :meth:`sanity_check()`
* call :meth:`do_install_steps()`
* call :meth:`show_goodbye()`
Although if a problem is encountered then not all calls may
happen.
"""
self.require_prompt_toolkit()
paths = [ paths = [
self.app.resource_path("wuttjamaican:templates/install"), self.app.resource_path("wuttjamaican:templates/install"),
] ]
@ -114,22 +131,6 @@ class InstallHandler(GenericHandler):
self.templates = TemplateLookup(directories=paths) self.templates = TemplateLookup(directories=paths)
def run(self):
"""
Run the interactive command-line installer.
This does the following:
* check for ``prompt_toolkit`` and maybe ask to install it
* call :meth:`show_welcome()`
* call :meth:`sanity_check()`
* call :meth:`do_install_steps()`
* call :meth:`show_goodbye()`
Although if a problem is encountered then not all calls may
happen.
"""
self.require_prompt_toolkit()
self.show_welcome() self.show_welcome()
self.sanity_check() self.sanity_check()
self.schema_installed = False self.schema_installed = False
@ -237,9 +238,9 @@ class InstallHandler(GenericHandler):
def make_db_url( def make_db_url(
self, dbtype, dbhost, dbport, dbname, dbuser, dbpass self, dbtype, dbhost, dbport, dbname, dbuser, dbpass
): # pylint: disable=empty-docstring,too-many-arguments,too-many-positional-arguments ): # pylint: disable=empty-docstring
""" """ """ """
from sqlalchemy.engine import URL # pylint: disable=import-outside-toplevel from sqlalchemy.engine import URL
if dbtype == "mysql": if dbtype == "mysql":
drivername = "mysql+mysqlconnector" drivername = "mysql+mysqlconnector"
@ -257,7 +258,7 @@ class InstallHandler(GenericHandler):
def test_db_connection(self, url): # pylint: disable=empty-docstring def test_db_connection(self, url): # pylint: disable=empty-docstring
""" """ """ """
import sqlalchemy as sa # pylint: disable=import-outside-toplevel import sqlalchemy as sa
engine = sa.create_engine(url) engine = sa.create_engine(url)
@ -441,9 +442,7 @@ class InstallHandler(GenericHandler):
:param db_url: :class:`sqlalchemy:sqlalchemy.engine.URL` :param db_url: :class:`sqlalchemy:sqlalchemy.engine.URL`
instance. instance.
""" """
from alembic.util.messaging import ( # pylint: disable=import-outside-toplevel from alembic.util.messaging import obfuscate_url_pw
obfuscate_url_pw,
)
if not self.prompt_bool("install db schema?", True): if not self.prompt_bool("install db schema?", True):
return False return False
@ -489,7 +488,7 @@ class InstallHandler(GenericHandler):
def require_prompt_toolkit(self, answer=None): # pylint: disable=empty-docstring def require_prompt_toolkit(self, answer=None): # pylint: disable=empty-docstring
""" """ """ """
try: try:
import prompt_toolkit # pylint: disable=unused-import,import-outside-toplevel import prompt_toolkit # pylint: disable=unused-import
except ImportError: except ImportError:
value = answer or input( value = answer or input(
"\nprompt_toolkit is not installed. shall i install it? [Yn] " "\nprompt_toolkit is not installed. shall i install it? [Yn] "
@ -504,7 +503,7 @@ class InstallHandler(GenericHandler):
) )
# nb. this should now succeed # nb. this should now succeed
import prompt_toolkit # pylint: disable=import-outside-toplevel import prompt_toolkit
def rprint(self, *args, **kwargs): def rprint(self, *args, **kwargs):
""" """
@ -514,9 +513,7 @@ class InstallHandler(GenericHandler):
def get_prompt_style(self): # pylint: disable=empty-docstring def get_prompt_style(self): # pylint: disable=empty-docstring
""" """ """ """
from prompt_toolkit.styles import ( # pylint: disable=import-outside-toplevel from prompt_toolkit.styles import Style
Style,
)
# message formatting styles # message formatting styles
return Style.from_dict( return Style.from_dict(
@ -526,7 +523,7 @@ class InstallHandler(GenericHandler):
} }
) )
def prompt_generic( # pylint: disable=too-many-arguments,too-many-positional-arguments def prompt_generic(
self, self,
info, info,
default=None, default=None,
@ -557,7 +554,7 @@ class InstallHandler(GenericHandler):
unless ``is_bool`` was requested in which case ``True`` or unless ``is_bool`` was requested in which case ``True`` or
``False``. ``False``.
""" """
from prompt_toolkit import prompt # pylint: disable=import-outside-toplevel from prompt_toolkit import prompt
# build prompt message # build prompt message
message = [ message = [

View file

@ -157,8 +157,8 @@ class ReportHandler(GenericHandler):
This calls :meth:`get_report_modules()` and for each module, This calls :meth:`get_report_modules()` and for each module,
it discovers all the reports it contains. it discovers all the reports it contains.
""" """
if "reports" not in self.classes: if not hasattr(self, "_reports"):
self.classes["reports"] = {} self._reports = {}
for module in self.get_report_modules(): for module in self.get_report_modules():
for name in dir(module): for name in dir(module):
obj = getattr(module, name) obj = getattr(module, name)
@ -167,9 +167,9 @@ class ReportHandler(GenericHandler):
and obj is not Report and obj is not Report
and issubclass(obj, Report) and issubclass(obj, Report)
): ):
self.classes["reports"][obj.report_key] = obj self._reports[obj.report_key] = obj
return self.classes["reports"] return self._reports
def get_report(self, key, instance=True): def get_report(self, key, instance=True):
""" """

View file

@ -101,9 +101,9 @@ def load_entry_points(group, ignore_errors=False):
try: try:
# nb. this package was added in python 3.8 # nb. this package was added in python 3.8
import importlib.metadata as importlib_metadata # pylint: disable=import-outside-toplevel import importlib.metadata as importlib_metadata
except ImportError: except ImportError:
import importlib_metadata # pylint: disable=import-outside-toplevel import importlib_metadata
eps = importlib_metadata.entry_points() eps = importlib_metadata.entry_points()
if not hasattr(eps, "select"): if not hasattr(eps, "select"):
@ -306,7 +306,9 @@ def progress_loop(func, items, factory, message=None):
progress = factory(message, count) progress = factory(message, count)
for i, item in enumerate(items, 1): for i, item in enumerate(items, 1):
func(item, i) func(item, i)
if progress: if progress:
progress.update(i) progress.update(i)
@ -341,17 +343,12 @@ def resource_path(path):
:returns: Absolute file path to the resource. :returns: Absolute file path to the resource.
""" """
if not os.path.isabs(path) and ":" in path: if not os.path.isabs(path) and ":" in path:
try: try:
# nb. these were added in python 3.9 # nb. these were added in python 3.9
from importlib.resources import ( # pylint: disable=import-outside-toplevel from importlib.resources import files, as_file
files,
as_file,
)
except ImportError: # python < 3.9 except ImportError: # python < 3.9
from importlib_resources import ( # pylint: disable=import-outside-toplevel from importlib_resources import files, as_file
files,
as_file,
)
package, filename = path.split(":") package, filename = path.split(":")
ref = files(package) / filename ref = files(package) / filename

View file

@ -17,6 +17,7 @@ from wuttjamaican.testing import FileTestCase, ConfigTestCase
class TestWuttaConfig(FileTestCase): class TestWuttaConfig(FileTestCase):
def make_config(self, **kwargs): def make_config(self, **kwargs):
return mod.WuttaConfig(**kwargs) return mod.WuttaConfig(**kwargs)
@ -290,6 +291,7 @@ configure_logging = true
) )
with patch.object(conf.WuttaConfig, "_configure_logging") as method: with patch.object(conf.WuttaConfig, "_configure_logging") as method:
# no logging config by default # no logging config by default
config = conf.WuttaConfig() config = conf.WuttaConfig()
method.assert_not_called() method.assert_not_called()
@ -317,6 +319,7 @@ configure_logging = true
) )
with patch("wuttjamaican.conf.logging") as logging: with patch("wuttjamaican.conf.logging") as logging:
# basic constructor attempts logging config # basic constructor attempts logging config
config = conf.WuttaConfig(configure_logging=True) config = conf.WuttaConfig(configure_logging=True)
logging.config.fileConfig.assert_called_once() logging.config.fileConfig.assert_called_once()
@ -335,14 +338,14 @@ configure_logging = true
logging.config.fileConfig.assert_called_once() logging.config.fileConfig.assert_called_once()
def test_config_has_no_app_after_init(self): def test_config_has_no_app_after_init(self):
# initial config should *not* have an app yet, otherwise # initial config should *not* have an app yet, otherwise
# extensions cannot specify a default app handler # extensions cannot specify a default app handler
config = conf.WuttaConfig() config = conf.WuttaConfig()
self.assertIsNone(config._app) self.assertFalse(hasattr(config, "_app"))
# but after that we can get an app okay # but after that we can get an app okay
app = config.get_app() app = config.get_app()
self.assertIsNotNone(app)
self.assertIs(app, config._app) self.assertIs(app, config._app)
def test_setdefault(self): def test_setdefault(self):
@ -625,6 +628,7 @@ configure_logging = true
self.assertEqual(value[2], "baz") self.assertEqual(value[2], "baz")
def test_get_app(self): def test_get_app(self):
# default handler # default handler
config = conf.WuttaConfig() config = conf.WuttaConfig()
self.assertEqual(config.default_app_handler_spec, "wuttjamaican.app:AppHandler") self.assertEqual(config.default_app_handler_spec, "wuttjamaican.app:AppHandler")
@ -682,6 +686,7 @@ def custom_make_engine_from_config():
class TestWuttaConfigExtension(TestCase): class TestWuttaConfigExtension(TestCase):
def test_basic(self): def test_basic(self):
config = conf.WuttaConfig() config = conf.WuttaConfig()
ext = conf.WuttaConfigExtension() ext = conf.WuttaConfigExtension()
@ -690,6 +695,7 @@ class TestWuttaConfigExtension(TestCase):
class TestGenericDefaultFiles(TestCase): class TestGenericDefaultFiles(TestCase):
def test_linux(self): def test_linux(self):
files = conf.generic_default_files("wuttatest") files = conf.generic_default_files("wuttatest")
self.assertIsInstance(files, list) self.assertIsInstance(files, list)
@ -701,6 +707,7 @@ class TestGenericDefaultFiles(TestCase):
win32com.shell.SHGetSpecialFolderPath.return_value = r"C:" + os.sep win32com.shell.SHGetSpecialFolderPath.return_value = r"C:" + os.sep
with patch.dict("sys.modules", **{"win32com.shell": win32com}): with patch.dict("sys.modules", **{"win32com.shell": win32com}):
with patch("wuttjamaican.conf.sys", platform="win32"): with patch("wuttjamaican.conf.sys", platform="win32"):
files = conf.generic_default_files("wuttatest") files = conf.generic_default_files("wuttatest")
self.assertIsInstance(files, list) self.assertIsInstance(files, list)
self.assertTrue(len(files) > 1) self.assertTrue(len(files) > 1)
@ -716,12 +723,14 @@ class TestGenericDefaultFiles(TestCase):
with patch("builtins.__import__", side_effect=mock_import): with patch("builtins.__import__", side_effect=mock_import):
with patch("wuttjamaican.conf.sys", platform="win32"): with patch("wuttjamaican.conf.sys", platform="win32"):
files = conf.generic_default_files("wuttatest") files = conf.generic_default_files("wuttatest")
self.assertIsInstance(files, list) self.assertIsInstance(files, list)
self.assertEqual(len(files), 0) self.assertEqual(len(files), 0)
class TestGetConfigPaths(FileTestCase): class TestGetConfigPaths(FileTestCase):
def test_winsvc(self): def test_winsvc(self):
myconf = self.write_file( myconf = self.write_file(
"my.conf", "my.conf",
@ -744,6 +753,7 @@ winsvc.RattailFileMonitor = /path/to/other/file
class TestMakeConfig(FileTestCase): class TestMakeConfig(FileTestCase):
# nb. we use appname='wuttatest' in this suite to avoid any # nb. we use appname='wuttatest' in this suite to avoid any
# "valid" default config files, env vars etc. which may be present # "valid" default config files, env vars etc. which may be present
# on the dev machine # on the dev machine
@ -753,6 +763,7 @@ class TestMakeConfig(FileTestCase):
with patch("wuttjamaican.conf.generic_default_files") as generic_default_files: with patch("wuttjamaican.conf.generic_default_files") as generic_default_files:
with patch("wuttjamaican.conf.WuttaConfig") as WuttaConfig: with patch("wuttjamaican.conf.WuttaConfig") as WuttaConfig:
# generic files are used if nothing is specified # generic files are used if nothing is specified
generic_default_files.return_value = [generic] generic_default_files.return_value = [generic]
config = conf.make_config(appname="wuttatest") config = conf.make_config(appname="wuttatest")
@ -777,6 +788,7 @@ class TestMakeConfig(FileTestCase):
with patch("wuttjamaican.conf.generic_default_files") as generic_default_files: with patch("wuttjamaican.conf.generic_default_files") as generic_default_files:
with patch("wuttjamaican.conf.WuttaConfig") as WuttaConfig: with patch("wuttjamaican.conf.WuttaConfig") as WuttaConfig:
# generic defaults are used if nothing specified # generic defaults are used if nothing specified
generic_default_files.return_value = [generic] generic_default_files.return_value = [generic]
config = conf.make_config(appname="wuttatest") config = conf.make_config(appname="wuttatest")
@ -820,6 +832,7 @@ class TestMakeConfig(FileTestCase):
with patch("wuttjamaican.conf.generic_default_files") as generic_default_files: with patch("wuttjamaican.conf.generic_default_files") as generic_default_files:
with patch("wuttjamaican.conf.WuttaConfig") as WuttaConfig: with patch("wuttjamaican.conf.WuttaConfig") as WuttaConfig:
generic_default_files.return_value = [generic] generic_default_files.return_value = [generic]
# no plus files by default # no plus files by default
@ -864,6 +877,7 @@ class TestMakeConfig(FileTestCase):
with patch("wuttjamaican.conf.generic_default_files") as generic_default_files: with patch("wuttjamaican.conf.generic_default_files") as generic_default_files:
with patch("wuttjamaican.conf.WuttaConfig") as WuttaConfig: with patch("wuttjamaican.conf.WuttaConfig") as WuttaConfig:
generic_default_files.return_value = [generic] generic_default_files.return_value = [generic]
# generic files by default # generic files by default
@ -908,6 +922,7 @@ class TestMakeConfig(FileTestCase):
with patch.object(mod, "WuttaConfig") as WuttaConfig: with patch.object(mod, "WuttaConfig") as WuttaConfig:
with patch.object(mod, "load_entry_points") as load_entry_points: with patch.object(mod, "load_entry_points") as load_entry_points:
# no entry points loaded if extend=False # no entry points loaded if extend=False
config = mod.make_config(appname="wuttatest", extend=False) config = mod.make_config(appname="wuttatest", extend=False)
WuttaConfig.assert_called_once_with( WuttaConfig.assert_called_once_with(
@ -952,6 +967,7 @@ class TestMakeConfig(FileTestCase):
class TestWuttaConfigProfile(ConfigTestCase): class TestWuttaConfigProfile(ConfigTestCase):
def make_profile(self, key): def make_profile(self, key):
return mod.WuttaConfigProfile(self.config, key) return mod.WuttaConfigProfile(self.config, key)

View file

@ -12,6 +12,7 @@ from wuttjamaican.testing import ConfigTestCase
class TestInstallHandler(ConfigTestCase): class TestInstallHandler(ConfigTestCase):
def make_handler(self, **kwargs): def make_handler(self, **kwargs):
return mod.InstallHandler(self.config, **kwargs) return mod.InstallHandler(self.config, **kwargs)
@ -37,6 +38,7 @@ class TestInstallHandler(ConfigTestCase):
with patch.object(mod, "sys") as sys: with patch.object(mod, "sys") as sys:
with patch.object(handler, "rprint") as rprint: with patch.object(handler, "rprint") as rprint:
with patch.object(handler, "prompt_bool") as prompt_bool: with patch.object(handler, "prompt_bool") as prompt_bool:
# user continues # user continues
prompt_bool.return_value = True prompt_bool.return_value = True
handler.show_welcome() handler.show_welcome()
@ -52,6 +54,7 @@ class TestInstallHandler(ConfigTestCase):
with patch.object(mod, "sys") as sys: with patch.object(mod, "sys") as sys:
with patch.object(mod, "os") as os: with patch.object(mod, "os") as os:
with patch.object(handler, "rprint") as rprint: with patch.object(handler, "rprint") as rprint:
# pretend appdir does not exist # pretend appdir does not exist
os.path.exists.return_value = False os.path.exists.return_value = False
handler.sanity_check() handler.sanity_check()
@ -76,9 +79,10 @@ class TestInstallHandler(ConfigTestCase):
with patch.object(handler, "get_dbinfo", return_value=dbinfo): with patch.object(handler, "get_dbinfo", return_value=dbinfo):
with patch.object(handler, "make_appdir") as make_appdir: with patch.object(handler, "make_appdir") as make_appdir:
with patch.object(handler, "install_db_schema") as install_db_schema: with patch.object(handler, "install_db_schema") as install_db_schema:
# nb. just for sanity/coverage # nb. just for sanity/coverage
install_db_schema.return_value = True install_db_schema.return_value = True
self.assertFalse(handler.schema_installed) self.assertFalse(hasattr(handler, "schema_installed"))
handler.do_install_steps() handler.do_install_steps()
self.assertTrue(make_appdir.called) self.assertTrue(make_appdir.called)
self.assertTrue(handler.schema_installed) self.assertTrue(handler.schema_installed)
@ -105,6 +109,7 @@ class TestInstallHandler(ConfigTestCase):
with patch.object(handler, "prompt_generic", side_effect=prompt_generic): with patch.object(handler, "prompt_generic", side_effect=prompt_generic):
with patch.object(handler, "test_db_connection") as test_db_connection: with patch.object(handler, "test_db_connection") as test_db_connection:
with patch.object(handler, "rprint") as rprint: with patch.object(handler, "rprint") as rprint:
# bad dbinfo # bad dbinfo
test_db_connection.return_value = "bad dbinfo" test_db_connection.return_value = "bad dbinfo"
sys.exit.side_effect = RuntimeError sys.exit.side_effect = RuntimeError
@ -215,6 +220,7 @@ default.url = {db_url}
db_url = sa.create_engine(db_url).url db_url = sa.create_engine(db_url).url
with patch.object(mod, "subprocess") as subprocess: with patch.object(mod, "subprocess") as subprocess:
# user declines offer to install schema # user declines offer to install schema
with patch.object(handler, "prompt_bool", return_value=False): with patch.object(handler, "prompt_bool", return_value=False):
self.assertFalse(handler.install_db_schema(db_url, appdir=self.tempdir)) self.assertFalse(handler.install_db_schema(db_url, appdir=self.tempdir))
@ -315,6 +321,7 @@ default.url = {db_url}
with patch("builtins.__import__", side_effect=mock_import): with patch("builtins.__import__", side_effect=mock_import):
with patch.object(handler, "get_prompt_style", return_value=style): with patch.object(handler, "get_prompt_style", return_value=style):
with patch.object(handler, "rprint") as rprint: with patch.object(handler, "rprint") as rprint:
# no input or default value # no input or default value
mock_prompt.return_value = "" mock_prompt.return_value = ""
result = handler.prompt_generic("foo") result = handler.prompt_generic("foo")
@ -445,6 +452,7 @@ default.url = {db_url}
with patch("builtins.__import__", side_effect=mock_import): with patch("builtins.__import__", side_effect=mock_import):
with patch.object(handler, "rprint") as rprint: with patch.object(handler, "rprint") as rprint:
# no default; true input # no default; true input
mock_prompt.reset_mock() mock_prompt.reset_mock()
mock_prompt.return_value = "Y" mock_prompt.return_value = "Y"